Skip to content

Commit

Permalink
Added internal count metrics in addition to the external counts that (c…
Browse files Browse the repository at this point in the history
…adence-workflow#6135)

are already there

What changed?
Added new metric which gives the max per workflow ID rps we have seen for the domain every second

Why?
This will help users understand how close they are to the per workflow ID rate limits

How did you test it?
Unit tests

Potential risks
Just adding a metric, so should be safe

Release notes

Documentation Changes
  • Loading branch information
jakobht authored Jun 14, 2024
1 parent ceff8ca commit f43f917
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2476,6 +2476,7 @@ const (
WorkflowIDCacheSizeGauge
WorkflowIDCacheRequestsExternalRatelimitedCounter
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer
WorkflowIDCacheRequestsInternalRatelimitedCounter
NumHistoryMetrics
)
Expand Down Expand Up @@ -3115,6 +3116,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
WorkflowIDCacheSizeGauge: {metricName: "workflow_id_cache_size", metricType: Gauge},
WorkflowIDCacheRequestsExternalRatelimitedCounter: {metricName: "workflow_id_external_requests_ratelimited", metricType: Counter},
WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_external_requests_max_requests_per_seconds", metricType: Timer},
WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer: {metricName: "workflow_id_internal_requests_max_requests_per_seconds", metricType: Timer},
WorkflowIDCacheRequestsInternalRatelimitedCounter: {metricName: "workflow_id_internal_requests_ratelimited", metricType: Counter},
},
Matching: {
Expand Down
6 changes: 4 additions & 2 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type cacheKey struct {
type cacheValue struct {
externalRateLimiter quotas.Limiter
internalRateLimiter quotas.Limiter
countMetric workflowIDCountMetric
externalCountMetric workflowIDCountMetric
internalCountMetric workflowIDCountMetric
}

// Params is the parameters for a new WFCache
Expand Down Expand Up @@ -144,13 +145,14 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi

switch rateLimitType {
case external:
value.countMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient)
value.externalCountMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient, metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer)
if !value.externalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "external", metrics.WorkflowIDCacheRequestsExternalRatelimitedCounter)
return false
}
return true
case internal:
value.internalCountMetric.updatePerDomainMaxWFRequestCount(domainName, c.timeSource, c.metricsClient, metrics.WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer)
if !value.internalRateLimiter.Allow() {
c.emitRateLimitMetrics(domainID, workflowID, domainName, "internal", metrics.WorkflowIDCacheRequestsInternalRatelimitedCounter)
return false
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflowcache/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount(
domainName string,
timeSource clock.TimeSource,
metricsClient metrics.Client,
metric int,
) {
cm.Lock()
defer cm.Unlock()
Expand All @@ -61,5 +62,5 @@ func (cm *workflowIDCountMetric) updatePerDomainMaxWFRequestCount(

// We can just use the upper of the metric, so it is not an issue to emit all the counts
metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)).
RecordTimer(metrics.WorkflowIDCacheRequestsExternalMaxRequestsPerSecondsTimer, time.Duration(cm.count))
RecordTimer(metric, time.Duration(cm.count))
}
21 changes: 11 additions & 10 deletions service/history/workflowcache/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
domainName := "some domain name"
metric := metrics.WorkflowIDCacheRequestsInternalMaxRequestsPerSecondsTimer

cases := []struct {
name string
Expand All @@ -45,23 +46,23 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
name: "Single workflowID",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2
},
expecetMetrics: []time.Duration{1, 2},
},
{
name: "Separate workflowIDs",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1

workflowID2 := &workflowIDCountMetric{}
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 3
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2
workflowID2.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 3

workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2

},
expecetMetrics: []time.Duration{1, 1, 2, 3, 2},
Expand All @@ -70,11 +71,11 @@ func TestUpdatePerDomainMaxWFRequestCount(t *testing.T) {
name: "Reset",
updatePerDomainMaxWFRequestCount: func(metricsClient metrics.Client, timeSource clock.MockedTimeSource) {
workflowID1 := &workflowIDCountMetric{}
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 2
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 2

timeSource.Advance(1100 * time.Millisecond)
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient) // Emits 1
workflowID1.updatePerDomainMaxWFRequestCount(domainName, timeSource, metricsClient, metric) // Emits 1
},
expecetMetrics: []time.Duration{1, 2, 1},
},
Expand Down

0 comments on commit f43f917

Please sign in to comment.