Skip to content

Commit

Permalink
fix: address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <[email protected]>
  • Loading branch information
isubasinghe committed Jan 7, 2025
1 parent ef494d6 commit 576cd1f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 15 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli

func (wfc *WorkflowController) newThrottler() sync.Throttler {
f := func(key string) { wfc.wfQueue.Add(key) }
return sync.NewMultiThrottler(wfc.Config.Parallelism, make(map[string]int), wfc.Config.NamespaceParallelism, f)
return sync.NewMultiThrottler(wfc.Config.Parallelism, wfc.Config.NamespaceParallelism, f)
}

// runGCcontroller runs the workflow garbage collector controller
Expand Down
16 changes: 7 additions & 9 deletions workflow/sync/multi_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ type Key = string
type QueueFunc func(Key)

// NewMultiThrottler creates a new multi throttler for throttling both namespace and global parallelism, a parallelism value of zero disables throttling
func NewMultiThrottler(parallelism int, namespaceParallelism map[string]int, namespaceParallelismDefault int, queue QueueFunc) Throttler {
func NewMultiThrottler(parallelism int, namespaceParallelismLimit int, queue QueueFunc) Throttler {
namespaceParallelism := make(map[string]int)
return &multiThrottler{
queue: queue,
namespaceParallelism: namespaceParallelism,
namespaceParallelismDefault: namespaceParallelismDefault,
namespaceParallelismDefault: namespaceParallelismLimit,
totalParallelism: parallelism,
running: make(map[Key]bool),
pending: make(map[string]*priorityQueue),
Expand Down Expand Up @@ -79,6 +80,10 @@ func (m *multiThrottler) namespaceCount(namespace string) (int, int) {
m.namespaceParallelism[namespace] = m.namespaceParallelismDefault
setLimit = m.namespaceParallelismDefault
}
if setLimit == 0 {
// return count is no longer accurate, but preserves behaviour
return 0, 0
}
count := 0
for key := range m.running {
ns, _, _ := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -139,13 +144,6 @@ func (m *multiThrottler) queueThrottled() {

minPq := &priorityQueue{itemByKey: make(map[string]*item)}

cnts := make(map[string]int)

for key := range m.running {
namespace, _, _ := cache.SplitMetaNamespaceKey(key)
cnts[namespace] = cnts[namespace] + 1
}

for _, pq := range m.pending {
if len(pq.items) == 0 {
continue
Expand Down
19 changes: 14 additions & 5 deletions workflow/sync/multi_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sync

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -14,7 +15,7 @@ import (
)

func TestMultiNoParallelismSamePriority(t *testing.T) {
throttler := NewMultiThrottler(0, make(map[string]int), 0, func(Key) {})
throttler := NewMultiThrottler(0, 0, func(Key) {})

throttler.Add("default/c", 0, time.Now().Add(2*time.Hour))
throttler.Add("default/b", 0, time.Now().Add(1*time.Hour))
Expand All @@ -26,7 +27,7 @@ func TestMultiNoParallelismSamePriority(t *testing.T) {
}

func TestMultiNoParallelismMultipleBuckets(t *testing.T) {
throttler := NewMultiThrottler(1, make(map[string]int), 1, func(Key) {})
throttler := NewMultiThrottler(1, 1, func(Key) {})
throttler.Add("a/0", 0, time.Now())
throttler.Add("a/1", 0, time.Now().Add(-1*time.Second))
throttler.Add("b/0", 0, time.Now().Add(-2*time.Second))
Expand All @@ -42,7 +43,7 @@ func TestMultiNoParallelismMultipleBuckets(t *testing.T) {

func TestMultiWithParallelismLimitAndPriority(t *testing.T) {
queuedKey := ""
throttler := NewMultiThrottler(2, make(map[string]int), 2, func(key string) { queuedKey = key })
throttler := NewMultiThrottler(2, 0, func(key string) { queuedKey = key })

throttler.Add("default/a", 1, time.Now())
throttler.Add("default/b", 2, time.Now())
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestMultiWithParallelismLimitAndPriority(t *testing.T) {

func TestMultiInitWithWorkflows(t *testing.T) {
queuedKey := ""
throttler := NewMultiThrottler(1, make(map[string]int), 1, func(key string) { queuedKey = key })
throttler := NewMultiThrottler(1, 1, func(key string) { queuedKey = key })
ctx := context.Background()

wfclientset := fakewfclientset.NewSimpleClientset(
Expand Down Expand Up @@ -148,7 +149,15 @@ func TestTotalAllowNamespaceLimit(t *testing.T) {
namespaceLimits := make(map[string]int)
namespaceLimits["a"] = 2
namespaceLimits["b"] = 1
throttler := NewMultiThrottler(4, namespaceLimits, 6, func(k Key) {})
throttler := &multiThrottler{
queue: func(key Key) {},
namespaceParallelism: namespaceLimits,
namespaceParallelismDefault: 6,
totalParallelism: 4,
running: make(map[Key]bool),
pending: make(map[string]*priorityQueue),
lock: &sync.Mutex{},
}
throttler.Add("a/0", 1, time.Now())
throttler.Add("b/0", 2, time.Now())
throttler.Add("a/1", 3, time.Now())
Expand Down

0 comments on commit 576cd1f

Please sign in to comment.