Skip to content

Commit

Permalink
Add log and metrics to workflow termination events (cadence-workflow#…
Browse files Browse the repository at this point in the history
…6146)

* Add log and metrics to workflow termination events

**What changed?**
Added logs to workflow termination events, containing the reason for the termination, tagged with the `domainName`, `workflowID`, `terminationReason`, and `runID`.
Added metrics to workflow termination events, using a counter per domain `WorkflowTerminateCounterPerDomain` under the `HistoryTerminateWorkflowExecutionScope` scope, with `WorkflowTerminationReasonTag`

**Why?**
Improve workflow termination visibility, allowing Cadence and clients to easily find terminated workflows. This is particularly important to provide better information for workflows terminated during failovers.

**How did you test it?**
Unit tests.

**Potential risks**
The risks are associated with the changes in functions parameters being passed. Need to ensure that the parameters are correct and that they do not contain `nil` values.

**Release notes**

**Documentation Changes**
  • Loading branch information
fimanishi authored Jul 11, 2024
1 parent e758b78 commit 3248455
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 3 deletions.
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ func IsWorkflowOpen(isOpen bool) Tag {
return newBoolTag("is-workflow-open", isOpen)
}

// WorkflowTerminationReason returns a tag to report a workflow's termination reason
func WorkflowTerminationReason(reason string) Tag {
return newStringTag("wf-termination-reason", reason)
}

// domain related

// WorkflowDomainID returns tag for WorkflowDomainID
Expand Down
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,7 @@ const (
TaskQueueLatencyPerDomain
TransferTaskMissingEventCounterPerDomain
ReplicationTasksAppliedPerDomain
WorkflowTerminateCounterPerDomain

TaskRedispatchQueuePendingTasksTimer

Expand Down Expand Up @@ -2898,6 +2899,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskQueueLatencyPerDomain: {metricName: "task_latency_queue_per_domain", metricRollupName: "task_latency_queue", metricType: Timer},
TransferTaskMissingEventCounterPerDomain: {metricName: "transfer_task_missing_event_counter_per_domain", metricRollupName: "transfer_task_missing_event_counter", metricType: Counter},
ReplicationTasksAppliedPerDomain: {metricName: "replication_tasks_applied_per_domain", metricRollupName: "replication_tasks_applied", metricType: Counter},
WorkflowTerminateCounterPerDomain: {metricName: "workflow_terminate_counter_per_domain", metricRollupName: "workflow_terminate_counter", metricType: Counter},

TaskBatchCompleteCounter: {metricName: "task_batch_complete_counter", metricType: Counter},
TaskBatchCompleteFailure: {metricName: "task_batch_complete_error", metricType: Counter},
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package metrics

import (
"fmt"
"regexp"
"strconv"
)

Expand Down Expand Up @@ -62,11 +63,16 @@ const (
globalRatelimitKey = "global_ratelimit_key"
globalRatelimitType = "global_ratelimit_type"
globalRatelimitCollectionName = "global_ratelimit_collection"
workflowTerminationReason = "workflow_termination_reason"

allValue = "all"
unknownValue = "_unknown_"
)

var (
safeAlphaNumericStringRE = regexp.MustCompile(`[^a-zA-Z0-9]`)
)

// Tag is an interface to define metrics tags
type (
Tag interface {
Expand Down Expand Up @@ -262,6 +268,12 @@ func GlobalRatelimiterCollectionName(value string) Tag {
return simpleMetric{key: globalRatelimitCollectionName, value: value}
}

// WorkflowTerminationReasonTag reports the reason for workflow termination
func WorkflowTerminationReasonTag(value string) Tag {
value = safeAlphaNumericStringRE.ReplaceAllString(value, "_")
return simpleMetric{key: workflowTerminationReason, value: value}
}

// PartitionConfigTags returns a list of partition config tags
func PartitionConfigTags(partitionConfig map[string]string) []Tag {
tags := make([]Tag, 0, len(partitionConfig))
Expand Down
16 changes: 16 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,22 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
if err := e.ReplicateWorkflowExecutionTerminatedEvent(firstEventID, event); err != nil {
return nil, err
}

domainName := e.GetDomainEntry().GetInfo().Name

e.logger.Info(
"Workflow execution terminated.",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(e.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(e.GetExecutionInfo().RunID),
tag.WorkflowTerminationReason(reason),
)

scopeWithDomainTag := e.metricsClient.Scope(metrics.HistoryTerminateWorkflowExecutionScope).
Tagged(metrics.DomainTag(domainName)).
Tagged(metrics.WorkflowTerminationReasonTag(reason))
scopeWithDomainTag.IncCounter(metrics.WorkflowTerminateCounterPerDomain)

return event, nil
}

Expand Down
7 changes: 4 additions & 3 deletions service/history/execution/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
IdentityHistoryService = "history-service"
// WorkflowTerminationIdentity is the component which decides to terminate the workflow
WorkflowTerminationIdentity = "worker-service"
// WorkflowTerminationReason is the reason for terminating workflow due to version conflit
// WorkflowTerminationReason is the reason for terminating workflow due to version conflict
WorkflowTerminationReason = "Terminate Workflow Due To Version Conflict."
)

Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *workflowImpl) SuppressBy(
currentCluster := r.clusterMetadata.GetCurrentClusterName()

if currentCluster == lastWriteCluster {
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion)
return TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion, WorkflowTerminationReason)
}
return TransactionPolicyPassive, r.zombiefyWorkflow()
}
Expand Down Expand Up @@ -251,6 +251,7 @@ func (r *workflowImpl) failDecision(
func (r *workflowImpl) terminateWorkflow(
lastWriteVersion int64,
incomingLastWriteVersion int64,
terminationReason string,
) error {

eventBatchFirstEventID := r.GetMutableState().GetNextEventID()
Expand All @@ -265,7 +266,7 @@ func (r *workflowImpl) terminateWorkflow(

_, err := r.mutableState.AddWorkflowExecutionTerminatedEvent(
eventBatchFirstEventID,
WorkflowTerminationReason,
terminationReason,
[]byte(fmt.Sprintf("terminated by version: %v", incomingLastWriteVersion)),
WorkflowTerminationIdentity,
)
Expand Down

0 comments on commit 3248455

Please sign in to comment.