From ef1e428fb343c9b96ce785c5da04eab5dd2bd2ea Mon Sep 17 00:00:00 2001 From: Alex Ioannidis Date: Wed, 4 Sep 2024 16:45:07 +0200 Subject: [PATCH] user_moderation: use search for faster actions * Use search results to determine the user's list of records. * Use a TaskOp and Unit of Work to avoid sending Celery tasks immediately. * Add a cleanup task that will perform a more thorough check using the DB to lookup the user's records. --- .../requests/user_moderation/actions.py | 64 +++++++++---------- .../requests/user_moderation/tasks.py | 40 ++++++++++++ .../requests/user_moderation/utils.py | 48 ++++++++++++++ .../services/communities/components.py | 3 +- .../services/components/verified.py | 4 +- .../requests/test_user_moderation_actions.py | 6 +- 6 files changed, 128 insertions(+), 37 deletions(-) create mode 100644 invenio_rdm_records/requests/user_moderation/utils.py diff --git a/invenio_rdm_records/requests/user_moderation/actions.py b/invenio_rdm_records/requests/user_moderation/actions.py index 43a1442c4..c443af72c 100644 --- a/invenio_rdm_records/requests/user_moderation/actions.py +++ b/invenio_rdm_records/requests/user_moderation/actions.py @@ -8,38 +8,18 @@ """RDM user moderation action.""" from invenio_access.permissions import system_identity -from invenio_db import db from invenio_pidstore.errors import PIDDoesNotExistError +from invenio_records_resources.services.uow import TaskOp from invenio_vocabularies.proxies import current_service from ...proxies import current_rdm_records_service -from .tasks import delete_record, restore_record - - -def _get_records_for_user(user_id): - """Helper function for getting all the records of the user. - - Note: This function performs DB queries yielding all records for a given - user (which is not hard-limited in the system) and performs service calls - on each of them. Thus, this function has the potential of being a very - heavy operation, and should not be called as part of the handling of an - HTTP request! - """ - record_cls = current_rdm_records_service.record_cls - model_cls = record_cls.model_cls - parent_cls = record_cls.parent_record_cls - parent_model_cls = parent_cls.model_cls - - records = ( - db.session.query(model_cls.json["id"].as_string()) - .join(parent_model_cls) - .filter( - parent_model_cls.json["access"]["owned_by"]["user"].as_string() - == str(user_id), - ) - ).yield_per(1000) - - return records +from .tasks import ( + delete_record, + restore_record, + user_block_cleanup, + user_restore_cleanup, +) +from .utils import get_user_records def on_block(user_id, uow=None, **kwargs): @@ -63,8 +43,18 @@ def on_block(user_id, uow=None, **kwargs): pass # soft-delete all the published records of that user - for (recid,) in _get_records_for_user(user_id): - delete_record.delay(recid, tombstone_data) + for recid in get_user_records(user_id): + uow.register(TaskOp(delete_record, recid=recid, tombstone_data=tombstone_data)) + + # Send cleanup task to make sure all records are deleted + uow.register( + TaskOp.for_async_apply( + user_block_cleanup, + kwargs=dict(user_id=user_id, tombstone_data=tombstone_data), + # wait for 10 minutes before starting the cleanup + countdown=10 * 60, + ) + ) def on_restore(user_id, uow=None, **kwargs): @@ -77,8 +67,18 @@ def on_restore(user_id, uow=None, **kwargs): user_id = str(user_id) # restore all the deleted records of that user - for (recid,) in _get_records_for_user(user_id): - restore_record.delay(recid) + for recid in get_user_records(user_id): + uow.register(TaskOp(restore_record, recid=recid)) + + # Send cleanup task to make sure all records are restored + uow.register( + TaskOp.for_async_apply( + user_restore_cleanup, + kwargs=dict(user_id=user_id), + # wait for 10 minutes before starting the cleanup + countdown=10 * 60, + ) + ) def on_approve(user_id, uow=None, **kwargs): diff --git a/invenio_rdm_records/requests/user_moderation/tasks.py b/invenio_rdm_records/requests/user_moderation/tasks.py index a5c4fde7b..20921cdbb 100644 --- a/invenio_rdm_records/requests/user_moderation/tasks.py +++ b/invenio_rdm_records/requests/user_moderation/tasks.py @@ -9,10 +9,50 @@ from celery import shared_task from invenio_access.permissions import system_identity +from invenio_users_resources.records.api import UserAggregate from invenio_rdm_records.proxies import current_rdm_records_service +from invenio_rdm_records.records.systemfields.deletion_status import ( + RecordDeletionStatusEnum, +) from invenio_rdm_records.services.errors import DeletionStatusException +from .utils import get_user_records + + +@shared_task(ignore_result=True) +def user_block_cleanup(user_id, tombstone_data): + """User block action cleanup.""" + user = UserAggregate.get_record(user_id) + # Bail out if the user is not blocked (i.e. we restored him before the task ran) + if not user.blocked: + return + + for recid in get_user_records( + user_id, + from_db=True, + # Only fetch published records that might have not been deleted yet. + status=[RecordDeletionStatusEnum.PUBLISHED], + ): + delete_record.delay(recid, tombstone_data) + + +@shared_task(ignore_result=True) +def user_restore_cleanup(user_id): + """User restore action cleanup.""" + user = UserAggregate.get_record(user_id) + # Bail out if the user is blocked (i.e. we blocked him before the task ran) + if user.blocked: + return + + for recid in get_user_records( + user_id, + from_db=True, + # Only fetch deleted records that might have not been restored yet. + status=[RecordDeletionStatusEnum.DELETED], + ): + restore_record.delay(recid) + @shared_task(ignore_result=True) def delete_record(recid, tombstone_data): diff --git a/invenio_rdm_records/requests/user_moderation/utils.py b/invenio_rdm_records/requests/user_moderation/utils.py new file mode 100644 index 000000000..319eb9ba6 --- /dev/null +++ b/invenio_rdm_records/requests/user_moderation/utils.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024 CERN. +# Copyright (C) 2023 TU Wien. +# +# Invenio-RDM-Records is free software; you can redistribute it and/or modify +# it under the terms of the MIT License; see LICENSE file for more details. + +"""RDM user moderation utilities.""" + +from invenio_db import db +from invenio_search.api import RecordsSearchV2 + +from ...proxies import current_rdm_records_service + + +def get_user_records(user_id, from_db=False, status=None): + """Helper function for getting all the records of the user.""" + record_cls = current_rdm_records_service.record_cls + model_cls = record_cls.model_cls + parent_cls = record_cls.parent_record_cls + parent_model_cls = parent_cls.model_cls + + if from_db: + query = ( + db.session.query(model_cls.json["id"].as_string()) + .join(parent_model_cls) + .filter( + parent_model_cls.json["access"]["owned_by"]["user"].as_string() + == str(user_id), + ) + ) + if status: + query = query.filter(model_cls.deletion_status.in_(status)) + + return (row[0] for row in query.yield_per(1000)) + else: + search = ( + RecordsSearchV2(index=record_cls.index._name) + .filter("term", **{"parent.access.owned_by.user": user_id}) + .source(["id"]) + ) + if status: + if not isinstance(status, (tuple, list)): + status = [status] + status = [s.value for s in status] + search = search.filter("terms", deletion_status=status) + return (hit["id"] for hit in search.scan()) diff --git a/invenio_rdm_records/services/communities/components.py b/invenio_rdm_records/services/communities/components.py index d1e450ffc..0c9922258 100644 --- a/invenio_rdm_records/services/communities/components.py +++ b/invenio_rdm_records/services/communities/components.py @@ -30,6 +30,7 @@ MetadataComponent, RelationsComponent, ) +from invenio_records_resources.services.uow import TaskOp from invenio_requests.tasks import request_moderation from invenio_search.engine import dsl @@ -81,7 +82,7 @@ def create(self, identity, data=None, record=None, **kwargs): if not is_verified: # Spawn a task to request moderation. - request_moderation.delay(identity.id) + self.uow.register(TaskOp(request_moderation, user_id=identity.id)) CommunityServiceComponents = [ diff --git a/invenio_rdm_records/services/components/verified.py b/invenio_rdm_records/services/components/verified.py index 34e848875..65e123bae 100644 --- a/invenio_rdm_records/services/components/verified.py +++ b/invenio_rdm_records/services/components/verified.py @@ -9,6 +9,7 @@ from flask import current_app from invenio_access.permissions import system_identity from invenio_drafts_resources.services.records.components import ServiceComponent +from invenio_records_resources.services.uow import TaskOp from invenio_requests.tasks import request_moderation @@ -27,4 +28,5 @@ def publish(self, identity, draft=None, record=None): if not is_verified: # Spawn a task to request moderation. - request_moderation.delay(record.parent.access.owner.owner_id) + owner_id = record.parent.access.owner.owner_id + self.uow.register(TaskOp(request_moderation, user_id=owner_id)) diff --git a/tests/requests/test_user_moderation_actions.py b/tests/requests/test_user_moderation_actions.py index 984b0d85b..cb024fb0c 100644 --- a/tests/requests/test_user_moderation_actions.py +++ b/tests/requests/test_user_moderation_actions.py @@ -21,8 +21,8 @@ class MockRequestModerationTask(Task): """Mock celery task for moderation request.""" - def delay(*args): - user_id = args[0] + def apply_async(self, args=None, kwargs=None, **kwargs_): + user_id = kwargs["user_id"] with db.session.begin_nested(): try: current_user_moderation_service.request_moderation( @@ -55,7 +55,7 @@ def test_user_moderation_approve( # This is a patch for tests only. mocker.patch( "invenio_rdm_records.services.components.verified.request_moderation", - MockRequestModerationTask, + MockRequestModerationTask(), ) new_record = records_service.publish( identity=unverified_user.identity, id_=new_version.id