Skip to content

Commit

Permalink
user_moderation: use search for faster actions
Browse files Browse the repository at this point in the history
* Use search results to determine the user's list of records.
* Use a TaskOp and Unit of Work to avoid sending Celery tasks immediately.
* Add a cleanup task that will perform a more thorough check using the
  DB to lookup the user's records.
  • Loading branch information
slint committed Oct 30, 2024
1 parent ebb77d1 commit ef1e428
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 37 deletions.
64 changes: 32 additions & 32 deletions invenio_rdm_records/requests/user_moderation/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,18 @@
"""RDM user moderation action."""

from invenio_access.permissions import system_identity
from invenio_db import db
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_records_resources.services.uow import TaskOp
from invenio_vocabularies.proxies import current_service

from ...proxies import current_rdm_records_service
from .tasks import delete_record, restore_record


def _get_records_for_user(user_id):
"""Helper function for getting all the records of the user.
Note: This function performs DB queries yielding all records for a given
user (which is not hard-limited in the system) and performs service calls
on each of them. Thus, this function has the potential of being a very
heavy operation, and should not be called as part of the handling of an
HTTP request!
"""
record_cls = current_rdm_records_service.record_cls
model_cls = record_cls.model_cls
parent_cls = record_cls.parent_record_cls
parent_model_cls = parent_cls.model_cls

records = (
db.session.query(model_cls.json["id"].as_string())
.join(parent_model_cls)
.filter(
parent_model_cls.json["access"]["owned_by"]["user"].as_string()
== str(user_id),
)
).yield_per(1000)

return records
from .tasks import (
delete_record,
restore_record,
user_block_cleanup,
user_restore_cleanup,
)
from .utils import get_user_records


def on_block(user_id, uow=None, **kwargs):
Expand All @@ -63,8 +43,18 @@ def on_block(user_id, uow=None, **kwargs):
pass

# soft-delete all the published records of that user
for (recid,) in _get_records_for_user(user_id):
delete_record.delay(recid, tombstone_data)
for recid in get_user_records(user_id):
uow.register(TaskOp(delete_record, recid=recid, tombstone_data=tombstone_data))

# Send cleanup task to make sure all records are deleted
uow.register(
TaskOp.for_async_apply(
user_block_cleanup,
kwargs=dict(user_id=user_id, tombstone_data=tombstone_data),
# wait for 10 minutes before starting the cleanup
countdown=10 * 60,
)
)


def on_restore(user_id, uow=None, **kwargs):
Expand All @@ -77,8 +67,18 @@ def on_restore(user_id, uow=None, **kwargs):
user_id = str(user_id)

# restore all the deleted records of that user
for (recid,) in _get_records_for_user(user_id):
restore_record.delay(recid)
for recid in get_user_records(user_id):
uow.register(TaskOp(restore_record, recid=recid))

# Send cleanup task to make sure all records are restored
uow.register(
TaskOp.for_async_apply(
user_restore_cleanup,
kwargs=dict(user_id=user_id),
# wait for 10 minutes before starting the cleanup
countdown=10 * 60,
)
)


def on_approve(user_id, uow=None, **kwargs):
Expand Down
40 changes: 40 additions & 0 deletions invenio_rdm_records/requests/user_moderation/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,50 @@

from celery import shared_task
from invenio_access.permissions import system_identity
from invenio_users_resources.records.api import UserAggregate

from invenio_rdm_records.proxies import current_rdm_records_service
from invenio_rdm_records.records.systemfields.deletion_status import (
RecordDeletionStatusEnum,
)
from invenio_rdm_records.services.errors import DeletionStatusException

from .utils import get_user_records


@shared_task(ignore_result=True)
def user_block_cleanup(user_id, tombstone_data):
"""User block action cleanup."""
user = UserAggregate.get_record(user_id)
# Bail out if the user is not blocked (i.e. we restored him before the task ran)
if not user.blocked:
return

for recid in get_user_records(
user_id,
from_db=True,
# Only fetch published records that might have not been deleted yet.
status=[RecordDeletionStatusEnum.PUBLISHED],
):
delete_record.delay(recid, tombstone_data)


@shared_task(ignore_result=True)
def user_restore_cleanup(user_id):
"""User restore action cleanup."""
user = UserAggregate.get_record(user_id)
# Bail out if the user is blocked (i.e. we blocked him before the task ran)
if user.blocked:
return

for recid in get_user_records(
user_id,
from_db=True,
# Only fetch deleted records that might have not been restored yet.
status=[RecordDeletionStatusEnum.DELETED],
):
restore_record.delay(recid)


@shared_task(ignore_result=True)
def delete_record(recid, tombstone_data):
Expand Down
48 changes: 48 additions & 0 deletions invenio_rdm_records/requests/user_moderation/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
# Copyright (C) 2023 TU Wien.
#
# Invenio-RDM-Records is free software; you can redistribute it and/or modify
# it under the terms of the MIT License; see LICENSE file for more details.

"""RDM user moderation utilities."""

from invenio_db import db
from invenio_search.api import RecordsSearchV2

from ...proxies import current_rdm_records_service


def get_user_records(user_id, from_db=False, status=None):
"""Helper function for getting all the records of the user."""
record_cls = current_rdm_records_service.record_cls
model_cls = record_cls.model_cls
parent_cls = record_cls.parent_record_cls
parent_model_cls = parent_cls.model_cls

if from_db:
query = (
db.session.query(model_cls.json["id"].as_string())
.join(parent_model_cls)
.filter(
parent_model_cls.json["access"]["owned_by"]["user"].as_string()
== str(user_id),
)
)
if status:
query = query.filter(model_cls.deletion_status.in_(status))

return (row[0] for row in query.yield_per(1000))
else:
search = (
RecordsSearchV2(index=record_cls.index._name)
.filter("term", **{"parent.access.owned_by.user": user_id})
.source(["id"])
)
if status:
if not isinstance(status, (tuple, list)):
status = [status]
status = [s.value for s in status]
search = search.filter("terms", deletion_status=status)
return (hit["id"] for hit in search.scan())
3 changes: 2 additions & 1 deletion invenio_rdm_records/services/communities/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
MetadataComponent,
RelationsComponent,
)
from invenio_records_resources.services.uow import TaskOp
from invenio_requests.tasks import request_moderation
from invenio_search.engine import dsl

Expand Down Expand Up @@ -81,7 +82,7 @@ def create(self, identity, data=None, record=None, **kwargs):

if not is_verified:
# Spawn a task to request moderation.
request_moderation.delay(identity.id)
self.uow.register(TaskOp(request_moderation, user_id=identity.id))


CommunityServiceComponents = [
Expand Down
4 changes: 3 additions & 1 deletion invenio_rdm_records/services/components/verified.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from flask import current_app
from invenio_access.permissions import system_identity
from invenio_drafts_resources.services.records.components import ServiceComponent
from invenio_records_resources.services.uow import TaskOp
from invenio_requests.tasks import request_moderation


Expand All @@ -27,4 +28,5 @@ def publish(self, identity, draft=None, record=None):

if not is_verified:
# Spawn a task to request moderation.
request_moderation.delay(record.parent.access.owner.owner_id)
owner_id = record.parent.access.owner.owner_id
self.uow.register(TaskOp(request_moderation, user_id=owner_id))
6 changes: 3 additions & 3 deletions tests/requests/test_user_moderation_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
class MockRequestModerationTask(Task):
"""Mock celery task for moderation request."""

def delay(*args):
user_id = args[0]
def apply_async(self, args=None, kwargs=None, **kwargs_):
user_id = kwargs["user_id"]
with db.session.begin_nested():
try:
current_user_moderation_service.request_moderation(
Expand Down Expand Up @@ -55,7 +55,7 @@ def test_user_moderation_approve(
# This is a patch for tests only.
mocker.patch(
"invenio_rdm_records.services.components.verified.request_moderation",
MockRequestModerationTask,
MockRequestModerationTask(),
)
new_record = records_service.publish(
identity=unverified_user.identity, id_=new_version.id
Expand Down

0 comments on commit ef1e428

Please sign in to comment.