diff --git a/lib/config/types.go b/lib/config/types.go index 59fe936fa..d9247b22f 100644 --- a/lib/config/types.go +++ b/lib/config/types.go @@ -41,6 +41,11 @@ type SharedDestinationSettings struct { TruncateExceededValues bool `yaml:"truncateExceededValues"` } +type Reporting struct { + Sentry *Sentry `yaml:"sentry"` + EmitExecutionTime bool `yaml:"emitExecutionTime"` +} + type Config struct { Mode Mode `yaml:"mode"` Output constants.DestinationKind `yaml:"outputSource"` @@ -63,12 +68,8 @@ type Config struct { S3 *S3Settings `yaml:"s3,omitempty"` SharedDestinationSettings SharedDestinationSettings `yaml:"sharedDestinationSettings"` - - Reporting struct { - Sentry *Sentry `yaml:"sentry"` - } - - Telemetry struct { + Reporting Reporting `yaml:"reporting"` + Telemetry struct { Metrics struct { Provider constants.ExporterKind `yaml:"provider"` Settings map[string]any `yaml:"settings,omitempty"` diff --git a/models/event/event.go b/models/event/event.go index c25cfb8fb..70a786df2 100644 --- a/models/event/event.go +++ b/models/event/event.go @@ -16,6 +16,7 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/stringutil" + "github.com/artie-labs/transfer/lib/telemetry/metrics/base" "github.com/artie-labs/transfer/lib/typing" "github.com/artie-labs/transfer/lib/typing/columns" "github.com/artie-labs/transfer/models" @@ -28,10 +29,11 @@ type Event struct { OptionalSchema map[string]typing.KindDetails Columns *columns.Columns - ExecutionTime time.Time // When the SQL command was executed Deleted bool - mode config.Mode + // When the database event was executed + executionTime time.Time + mode config.Mode } func hashData(data map[string]any, tc kafkalib.TopicConfig) map[string]any { @@ -86,16 +88,29 @@ func ToMemoryEvent(event cdc.Event, pkMap map[string]any, tc kafkalib.TopicConfi return Event{}, err } - return Event{ + _event := Event{ + executionTime: event.GetExecutionTime(), mode: cfgMode, Table: tblName, PrimaryKeyMap: pkMap, - ExecutionTime: event.GetExecutionTime(), OptionalSchema: optionalSchema, Columns: cols, Data: hashData(evtData, tc), Deleted: event.DeletePayload(), - }, nil + } + + return _event, nil +} + +// EmitExecutionTimeLag - This will check against the current time and the event execution time and emit the lag. +func (e *Event) EmitExecutionTimeLag(metricsClient base.Client) { + metricsClient.GaugeWithSample( + "row.execution_time_lag", + float64(time.Since(e.executionTime).Milliseconds()), + map[string]string{ + "mode": e.mode.String(), + "table": e.Table, + }, 0.5) } func (e *Event) Validate() error { @@ -247,7 +262,7 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali td.PartitionsToLastMessage[message.Partition()] = append(td.PartitionsToLastMessage[message.Partition()], message) } - td.LatestCDCTs = e.ExecutionTime + td.LatestCDCTs = e.executionTime flush, flushReason := td.ShouldFlush(cfg) return flush, flushReason, nil } diff --git a/processes/consumer/process.go b/processes/consumer/process.go index 2df7cabbf..9e2fd60e5 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -63,6 +63,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo tags["what"] = "to_mem_event_err" return "", fmt.Errorf("cannot convert to memory event: %w", err) } + // Table name is only available after event has been cast tags["table"] = evt.Table if topicConfig.tc.ShouldSkip(_event.Operation()) { @@ -72,6 +73,10 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo return evt.Table, nil } + if cfg.Reporting.EmitExecutionTime { + evt.EmitExecutionTimeLag(metricsClient) + } + shouldFlush, flushReason, err := evt.Save(cfg, inMemDB, topicConfig.tc, p.Msg) if err != nil { tags["what"] = "save_fail"