Skip to content

Commit

Permalink
Store events in a Redis stream instead of a list
Browse files Browse the repository at this point in the history
This commit updates the method of storing events in
Redis for later consumption.
Events are now added to a Redis Stream, allowing
different types of consumers to process the items for
various purposes. The trade-off is that the stream has a
fixed length, causing older items to be deleted when
new items are added.

Signed-off-by: Jose Javier Merchante <[email protected]>
  • Loading branch information
jjmerchante committed Jan 10, 2025
1 parent 59de076 commit 7a47edc
Show file tree
Hide file tree
Showing 11 changed files with 1,276 additions and 338 deletions.
675 changes: 359 additions & 316 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ perceval = {version = "^1.0.2", allow-prereleases = true}
grimoirelab-chronicler = {git = "https://github.com/grimoirelab/grimoirelab-chronicler.git", allow-prereleases = true}
django-cors-headers = "^4.6.0"
djangorestframework = "^3.15.2"
opensearch-py = "^2.8.0"

[tool.poetry.group.dev.dependencies]
fakeredis = "^2.0.0"
Expand Down
25 changes: 19 additions & 6 deletions src/grimoirelab/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,8 @@
# https://github.com/rq/django-rq
#

Q_PERCEVAL_JOBS = os.environ.get('GRIMOIRELAB_Q_PERCEVAL_JOBS', 'default')
Q_STORAGE_ITEMS = os.environ.get('GRIMOIRELAB_Q_STORAGE_ITEMS', 'items')
Q_EVENTS = os.environ.get('GRIMOIRELAB_Q_EVENTS', 'events')
Q_EVENTIZER_JOBS = os.environ.get('GRIMOIRELAB_Q_EVENTIZER_JOBS', 'default')
Q_ARCHIVIST_JOBS = os.environ.get('GRIMOIRELAB_Q_ARCHIVIST_JOBS', 'storage_jobs')

_RQ_DATABASE = {
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
Expand All @@ -285,11 +284,13 @@
}

RQ_QUEUES = {
Q_PERCEVAL_JOBS: _RQ_DATABASE,
Q_STORAGE_ITEMS: _RQ_DATABASE,
Q_EVENTS: _RQ_DATABASE,
Q_EVENTIZER_JOBS: _RQ_DATABASE,
Q_ARCHIVIST_JOBS: _RQ_DATABASE,
}

EVENTS_STREAM = os.environ.get('GRIMOIRELAB_EVENTS_STREAM', 'events')
STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6))

RQ = {
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
'WORKER_CLASS': 'grimoirelab.core.scheduler.worker.GrimoireLabWorker',
Expand All @@ -305,3 +306,15 @@
GRIMOIRELAB_JOB_TIMEOUT = int(os.environ.get('GRIMOIRELAB_JOB_TIMEOUT', -1))

GIT_STORAGE_PATH = os.environ.get('GRIMOIRELAB_GIT_PATH', '~/.perceval')

#
# Archivist configuration
#
GRIMOIRELAB_ARCHIVIST = {
'WORKERS': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_WORKERS', 10)),
'STORAGE_TYPE': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_TYPE', 'opensearch'),
'STORAGE_URL': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_URL', 'https://admin:admin@localhost:9200'),
'STORAGE_INDEX': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_INDEX', 'events'),
'STORAGE_VERIFY_CERT': os.environ.get('GRIMOIRELAB_ARCHIVIST_STORAGE_VERIFY_CERT', 'False').lower() in ('true', '1'),
'EVENTS_PER_JOB': int(os.environ.get('GRIMOIRELAB_ARCHIVIST_EVENTS_PER_JOB', 10000)),
}
95 changes: 84 additions & 11 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from __future__ import annotations

import os
import sys
import typing

import click
Expand All @@ -27,7 +28,6 @@

from django.conf import settings

from grimoirelab.core.scheduler.scheduler import maintain_tasks

if typing.TYPE_CHECKING:
from click import Context
Expand All @@ -45,9 +45,13 @@ def run(ctx: Context):
is_flag=True,
default=False,
help="Run the service in developer mode.")
@click.option("--clear-tasks",
is_flag=True,
default=False,
help="Clear background tasks.")
@run.command()
@click.pass_context
def server(ctx: Context, devel: bool):
def server(ctx: Context, devel: bool, clear_tasks: bool):
"""Start the GrimoireLab core server.
GrimoireLab server allows to schedule tasks and fetch data from
Expand All @@ -58,6 +62,8 @@ def server(ctx: Context, devel: bool):
should be run with a reverse proxy. If you activate the '--dev' flag,
a HTTP server will be run instead.
"""
create_background_tasks(clear_tasks)

env = os.environ

env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
Expand All @@ -83,6 +89,8 @@ def server(ctx: Context, devel: bool):
env["UWSGI_SINGLE_INTERPRETER"] = "true"

# Run maintenance tasks
from grimoirelab.core.scheduler.scheduler import maintain_tasks

_ = django.core.wsgi.get_wsgi_application()
maintain_tasks()

Expand All @@ -91,24 +99,89 @@ def server(ctx: Context, devel: bool):


@run.command()
@click.argument('task-types', nargs=-1)
@click.option('--workers',
default=10,
show_default=True,
help="Number of workers to run in the pool.")
def eventizers(workers: int):
"""Start a pool of eventizer workers.
def worker_pool(task_types: str, workers: int):
"""Start a pool of workers that run specific tasks.
The workers on the pool will run tasks to fetch data from software
development repositories. Data will be processed in form of events,
and published in the events queue.
If multiple tasks share the same queue, they will run in the same
pool of workers. The tasks to run are defined as arguments to the
command.
The number of workers running in the pool can be defined with the
parameter '--workers'.
Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the
configuration file.
"""
from grimoirelab.core.scheduler.models import (get_registered_task_model,
get_all_registered_task_names)

available_tasks = get_all_registered_task_names()

queues = []
for task in task_types:
try:
Task = get_registered_task_model(task)[0]
except KeyError:
click.echo(f"Task '{task}' is not a valid task. "
f"Options: {available_tasks}", err=True)
sys.exit(1)
queues.append(Task().default_job_queue)

if not queues:
click.echo(f"You must define at least one valid task. "
f"Options: {available_tasks}", err=True)
sys.exit(1)

django.core.management.call_command(
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
'rqworker-pool', queues,
num_workers=workers
)


def create_background_tasks(clear_tasks: bool):
"""
Create background tasks before starting the server.
:param clear_tasks: clear tasks before creating new ones.
:return:
"""
from grimoirelab.core.scheduler.scheduler import schedule_task
from grimoirelab.core.scheduler.tasks.models import StorageTask

workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
storage_db_name = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX']
storage_type = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']
verify_certs = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']
events_per_job = settings.GRIMOIRELAB_ARCHIVIST['EVENTS_PER_JOB']

if clear_tasks:
StorageTask.objects.all().delete()
click.echo("Removing old background tasks.")

current = StorageTask.objects.filter(burst=False).count()
if workers == current:
click.echo("Background tasks already created. Skipping.")
return

task_args = {
'storage_url': storage_url,
'storage_db_name': storage_db_name,
'storage_verify_certs': verify_certs,
'redis_group': 'archivist',
'limit': events_per_job
}
if workers > current:
for _ in range(workers - current):
schedule_task(
task_type=StorageTask.TASK_TYPE,
storage_type=storage_type,
task_args=task_args,
job_interval=1,
job_max_retries=10
)
click.echo(f"Created {workers} background tasks.")
elif workers < current:
tasks = StorageTask.objects.all()[workers:]
tasks.update(burst=True)
8 changes: 8 additions & 0 deletions src/grimoirelab/core/scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,11 @@ def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]:
job classes.
"""
return iter(GRIMOIRELAB_TASK_MODELS.values())


def get_all_registered_task_names() -> list[str]:
"""Return all registered task names.
:returns: a list with all registered task names.
"""
return list(GRIMOIRELAB_TASK_MODELS.keys())
Loading

0 comments on commit 7a47edc

Please sign in to comment.