Skip to content

Commit

Permalink
Store events in a Redis stream instead of a list
Browse files Browse the repository at this point in the history
This commit updates the method of storing events in
Redis for later consumption.
Events are now added to a Redis Stream, allowing
different types of consumers to process the items for
various purposes. The trade-off is that the stream has a
fixed length, causing older items to be deleted when
new items are added.

Signed-off-by: Jose Javier Merchante <[email protected]>
  • Loading branch information
jjmerchante committed Dec 4, 2024
1 parent 34f4c36 commit 6e5e0d9
Show file tree
Hide file tree
Showing 7 changed files with 827 additions and 321 deletions.
639 changes: 324 additions & 315 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions src/grimoirelab/core/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,7 @@
#

Q_PERCEVAL_JOBS = os.environ.get('GRIMOIRELAB_Q_PERCEVAL_JOBS', 'default')
Q_STORAGE_ITEMS = os.environ.get('GRIMOIRELAB_Q_STORAGE_ITEMS', 'items')
Q_EVENTS = os.environ.get('GRIMOIRELAB_Q_EVENTS', 'events')
Q_STORAGE_JOBS = os.environ.get('GRIMOIRELAB_Q_STORAGE_JOBS', 'storage_jobs')

_RQ_DATABASE = {
'HOST': os.environ.get('GRIMOIRELAB_REDIS_HOST', '127.0.0.1'),
Expand All @@ -286,10 +285,12 @@

RQ_QUEUES = {
Q_PERCEVAL_JOBS: _RQ_DATABASE,
Q_STORAGE_ITEMS: _RQ_DATABASE,
Q_EVENTS: _RQ_DATABASE,
Q_STORAGE_JOBS: _RQ_DATABASE,
}

EVENTS_STREAM = os.environ.get('GRIMOIRELAB_EVENTS_STREAM', 'events')
STREAM_MAX_LENGTH = int(os.environ.get('GRIMOIRELAB_STREAM_MAX_LENGTH', 2 * 10 ** 6))

RQ = {
'JOB_CLASS': 'grimoirelab.core.scheduler.jobs.GrimoireLabJob',
'WORKER_CLASS': 'grimoirelab.core.scheduler.worker.GrimoireLabWorker',
Expand Down
17 changes: 17 additions & 0 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import logging
import os
import typing

Expand Down Expand Up @@ -107,3 +108,19 @@ def eventizers(workers: int):
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
num_workers=workers
)


@run.command()
@click.option('--workers',
default=10,
show_default=True,
help="Number of workers to run in the pool.")
def archivists(workers: int):
"""Start a pool of archivist workers."""

from django.conf import settings

django.core.management.call_command(
'rqworker-pool', settings.Q_STORAGE_JOBS,
num_workers=workers
)
317 changes: 317 additions & 0 deletions src/grimoirelab/core/scheduler/tasks/archivist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) GrimoireLab Contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#


from __future__ import annotations

import json
import logging
import typing

import redis
import rq.job

from grimoirelab_toolkit.datetime import str_to_datetime


if typing.TYPE_CHECKING:
from typing import Any
from datetime import datetime


logger = logging.getLogger(__name__)


def archivist_job(
storage_type: str,
storage_url: str,
storage_db_name: str,
redis_group: str,
consumer_name: str,
events_queue: str,
limit: int = 5000,
) -> ArchivistProgress:
"""Fetch and archive events.
It will fetch events from a Redis stream and store them in a
storage system.
:param storage_type: type of the storage system (e.g., 'opensearch')
:param storage_url: URL of the storage system
:param storage_db_name: Name of the database to use
:param redis_group: Redis group name to use for fetching events
:param consumer_name: Name of the consumer
:param events_queue: Redis stream where the events are fetched
:param limit: Maximum number of events to fetch and store
"""

rq_job = rq.get_current_job()

Storage = get_storage_backend(storage_type)
storage = Storage(url=storage_url, db_name=storage_db_name)

events = events_consumer(rq_job.connection,
consumer_name,
events_queue,
redis_group,
limit)

total = storage.store(events)

return ArchivistProgress(
job_id=rq_job.id,
backend=storage_type,
group=redis_group,
summary={'total': total}
)


def _create_consumer_group(
connection: redis.Redis,
stream_name: str,
group_name: str
) -> None:
"""Create a consumer group if it does not exist
:param connection: Redis connection
:param stream_name: Name of the stream
:param group_name: Name of the group
"""
try:
connection.xgroup_create(stream_name, group_name, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if str(e) != 'BUSYGROUP Consumer Group name already exists':
raise


def _recover_stream_entries(
connection: redis.Redis,
consumer_name: str,
stream_name: str,
group_name: str
) -> dict:
"""
Transfers ownership of pending stream entries idle
for 5m that match the specified criteria
:param connection: Redis connection
:param consumer_name: Name of the consumer
:param stream_name: Name of the stream
"""
logging.info(f"Recovering events from '{stream_name}' group '{group_name}'")

while True:
response = connection.xautoclaim(name=stream_name,
groupname=group_name,
consumername=consumer_name,
min_idle_time=5 * 60 * 1000,
count=10)

# The response contains an array with the following contents
# 1) "0-0" (stream ID to be used as the start argument for the next call)
# 2) 1) 1) "1609338752495-0" (successfully claimed messages)
# 2) 1) "field"
# 2) "value"
# 3) (empty array) (message IDs that no longer exist in the stream)

messages = response[1]
for message in messages:
message_id = message[0]
message_data = message[1][b'data']

yield json.loads(message_data)

connection.xack(stream_name, group_name, message_id)

if response[0] == b"0-0":
break


def events_consumer(
connection: redis.Redis,
consumer_name: str,
stream_name: str,
group_name: str,
limit: int,
) -> iter(dict):
"""Get items from a Redis stream given a group and a consumer name
:param connection: Redis connection
:param consumer_name: Name of the consumer
:param stream_name: Name of the stream
:param group_name: Name of the group
:param limit: Maximum number of items to fetch
"""
_create_consumer_group(connection, stream_name, group_name)

yield from _recover_stream_entries(connection=connection,
consumer_name=consumer_name,
group_name=group_name,
stream_name=stream_name)

logging.info(f"Fetching events from '{stream_name}' group "
f"'{group_name}' as '{consumer_name}'")

total = 0
while True:
try:
response = connection.xreadgroup(groupname=group_name,
consumername=consumer_name,
streams={stream_name: '>'},
count=10,
block=60000)

# The response contains an array with the following contents
# 1) 1) "mystream" (name of the stream)
# 2) 1) 1) "1-0" (array of arrays containing the key and the entries)
# 2) 1) "field"
# 2) "value"
if response:
messages = response[0][1]
for message in messages:
total += 1
message_id = message[0]
message_data = message[1][b'data']

yield json.loads(message_data)

connection.xack(stream_name, group_name, message_id)

else:
logging.info(f"No new messages for '{stream_name}:{group_name}:{consumer_name}'.")
break

if total >= limit:
logging.info(f"{total} items inserted. Stop the job.")
break

except Exception as e:
logging.error(f"Error consuming messages: {e}")
raise e


class ArchivistProgress:
"""Class to store the progress of an Archivist job.
It stores the summary of the job.
:param job_id: job identifier
:param backend: backend used to store the events
:param group: group used to fetch the events
"""
def __init__(self,
job_id: str,
backend: str,
group: str,
summary: dict | None) -> None:
self.job_id = job_id
self.backend = backend
self.group = group
self.summary = summary

@classmethod
def from_dict(cls, data: dict[str, Any]) -> ArchivistProgress:
"""Create a new instance from a dictionary."""

def convert_to_datetime(dt: str) -> datetime | None:
return str_to_datetime(dt) if dt else None

return cls(
data['job_id'],
data['backend'],
data['group'],
None # TODO: Add summary from the job
)

def to_dict(self) -> dict[str, str | int]:
"""Convert object to a dict."""

result = {
'job_id': self.job_id,
'backend': self.backend,
'group': self.group,
'summary': self.summary
}

return result


class StorageBackend:
"""Base class for storage backends.
This class defines the methods that should be implemented by
a storage backend.
:param url: URL of the storage backend
"""
def __init__(self, url: str, db_name: str) -> None:
self.url = url
self.db_name = db_name

def store(self, data: dict[str, Any]) -> int:
"""Store data in the storage backend.
:param data: Data to store
:return: Number of items stored
"""
raise NotImplementedError

def close(self) -> None:
"""Close the connection to the storage backend."""
raise NotImplementedError


class OpenSearchStorage(StorageBackend):
"""Storage backend for OpenSearch.
This class implements the methods to store data in an OpenSearch
instance.
:param url: URL of the OpenSearch instance
"""
def __init__(self, url: str, db_name: str) -> None:
from grimoire_elk.elastic import ElasticSearch

super().__init__(url, db_name)
self.opensearch = ElasticSearch(url, index=db_name)

def store(self, events: iter) -> None:
"""Store data in the OpenSearch instance.
:param events: Events to store
"""
new_items = self.opensearch.bulk_upload(events, field_id='id')

return new_items

def close(self) -> None:
"""Close the connection to the OpenSearch instance."""

pass


def get_storage_backend(storage_type: str) -> typing.Type[StorageBackend]:
"""Get the storage backend based on the type.
:param storage_type: Type of the storage backend
"""
if storage_type == 'opensearch':
return OpenSearchStorage
else:
raise ValueError(f"Storage type '{storage_type}' not supported")
7 changes: 6 additions & 1 deletion src/grimoirelab/core/scheduler/tasks/chronicler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def chronicler_job(
datasource_type: str,
datasource_category: str,
events_queue: str,
stream_max_length: int,
job_args: dict[str, Any] = None
) -> ChroniclerProgress:
"""Fetch and eventize data.
Expand Down Expand Up @@ -95,7 +96,11 @@ def chronicler_job(
perceval_gen.items)
for event in events:
data = cloudevents.conversion.to_json(event)
rq_job.connection.rpush(events_queue, data)
message = {
'data': data
}
rq_job.connection.xadd(events_queue, message,
maxlen=stream_max_length)
finally:
progress.summary = perceval_gen.summary

Expand Down
Loading

0 comments on commit 6e5e0d9

Please sign in to comment.