-
-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(anomaly detection): Maintain historical timeseries #1670
Changes from 5 commits
a23ef39
d1c5345
9c6e830
605baaf
02a9949
a49c9fc
eebe61e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
"""Migration | ||
|
||
Revision ID: 5a7ff418f726 | ||
Revises: 3914e6cdb818 | ||
Create Date: 2024-12-30 23:36:51.280449 | ||
|
||
""" | ||
|
||
import sqlalchemy as sa | ||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "5a7ff418f726" | ||
down_revision = "3914e6cdb818" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.create_table( | ||
"dynamic_alert_time_series_history", | ||
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), | ||
sa.Column("alert_id", sa.BigInteger(), nullable=False), | ||
sa.Column("timestamp", sa.DateTime(), nullable=False), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are cleaning up history based on timestamp, do we need an index on this field? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The id column is to avoid duplicate values in the rows since the other columns can be the same including the timestamped ones if they are added at the same time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was asking about the timestamp column. Since the where clause of the SQL will have timestamp, we need an index (not primary index as id will still be the primary index). |
||
sa.Column("value", sa.Float(), nullable=False), | ||
sa.Column("anomaly_type", sa.String(), nullable=False), | ||
sa.Column("saved_at", sa.DateTime(), nullable=False), | ||
sa.PrimaryKeyConstraint("id", "alert_id"), | ||
) | ||
# ### end Alembic commands ### | ||
|
||
|
||
def downgrade(): | ||
# ### commands auto generated by Alembic - please adjust! ### | ||
op.drop_table("dynamic_alert_time_series_history") | ||
# ### end Alembic commands ### |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import logging | ||
from datetime import datetime, timedelta | ||
from operator import and_, or_ | ||
from typing import List | ||
|
||
import numpy as np | ||
import sentry_sdk | ||
|
@@ -10,7 +11,13 @@ | |
from seer.anomaly_detection.detectors.anomaly_detectors import MPBatchAnomalyDetector | ||
from seer.anomaly_detection.models import AlgoConfig, TimeSeries | ||
from seer.anomaly_detection.models.external import AnomalyDetectionConfig | ||
from seer.db import DbDynamicAlert, Session, TaskStatus | ||
from seer.db import ( | ||
DbDynamicAlert, | ||
DbDynamicAlertTimeSeries, | ||
DbDynamicAlertTimeSeriesHistory, | ||
Session, | ||
TaskStatus, | ||
) | ||
from seer.dependency_injection import inject, injected | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -68,12 +75,30 @@ def delete_old_timeseries_points(alert: DbDynamicAlert, date_threshold: float): | |
for ts in alert.timeseries: | ||
if ts.timestamp.timestamp() < date_threshold: | ||
to_remove.append(ts) | ||
|
||
# Save history records before removing | ||
save_timeseries_history(alert, to_remove) | ||
|
||
for ts in to_remove: | ||
alert.timeseries.remove(ts) | ||
deleted_count += 1 | ||
return deleted_count | ||
|
||
|
||
def save_timeseries_history(alert: DbDynamicAlert, timeseries: List[DbDynamicAlertTimeSeries]): | ||
with Session() as session: | ||
for ts in timeseries: | ||
history_record = DbDynamicAlertTimeSeriesHistory( | ||
alert_id=alert.external_alert_id, | ||
timestamp=ts.timestamp, | ||
anomaly_type=ts.anomaly_type, | ||
value=ts.value, | ||
saved_at=datetime.now(), | ||
) | ||
session.add(history_record) | ||
session.commit() | ||
|
||
|
||
@inject | ||
def update_matrix_profiles( | ||
alert: DbDynamicAlert, | ||
|
@@ -163,3 +188,17 @@ def cleanup_disabled_alerts(): | |
|
||
session.commit() | ||
logger.info(f"Deleted {deleted_count} alerts") | ||
|
||
|
||
@celery_app.task | ||
@sentry_sdk.trace | ||
def cleanup_old_timeseries_history(): | ||
date_threshold = datetime.now() - timedelta(days=90) | ||
with Session() as session: | ||
deleted_count = ( | ||
session.query(DbDynamicAlertTimeSeriesHistory) | ||
.filter(DbDynamicAlertTimeSeriesHistory.saved_at < date_threshold) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This condition is applied across all alerts, so can be a costly operation. Also, should we be filtering on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I am filtering on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regarding the costly operation, is there another way to do this since this table doesn't have any specific relationships to other tables aside from it being a storage table? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking of having 90 days of time series data regardless of when it was saved to seer. As to the deletion operation, can you check if sqlalchemy has any batch deletion feature that we can leverage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filtering by |
||
.delete() | ||
) | ||
session.commit() | ||
logger.info(f"Deleted {deleted_count} timeseries history records older than 90 days") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious how come alert_id is biginteger but this id is integer? We will be having more rows in this table than the alert table, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch -- fixed