Skip to content

Commit

Permalink
Fix typo in accumulator implementation (#257)
Browse files Browse the repository at this point in the history
* Fix typo in accumulator implementation

* Fix lint issue

* Revert to sync/atomic
  • Loading branch information
nikola-jokic authored Oct 3, 2024
1 parent d848ae8 commit 457704f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/yunikorn/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (a *accumulator) run(ctx context.Context) error {
if len(events) == 0 {
timer.Stop()
a.events <- events
return
continue
}
timer.Reset(a.idleInterval)
a.events <- events
Expand Down
42 changes: 42 additions & 0 deletions internal/yunikorn/accumulator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package yunikorn

import (
"context"
"testing"
"time"

"sync/atomic"

"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
"github.com/stretchr/testify/require"
)

func TestAccumulatorCallback(t *testing.T) {
var callCount atomic.Int32
callback := func(ctx context.Context, events []*si.EventRecord) {
t.Logf("callback called with %d events", len(events))
callCount.Add(1)
}
interval := 250 * time.Millisecond
wait := 300 * time.Millisecond

acc := newAccumulator(callback, interval)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
require.NoError(t, acc.run(ctx))
}()

time.Sleep(wait)
require.Equal(t, int32(0), callCount.Load())

acc.add(&si.EventRecord{})
time.Sleep(wait)
require.Equal(t, int32(1), callCount.Load())

acc.add(&si.EventRecord{})
acc.add(&si.EventRecord{})
acc.add(&si.EventRecord{})
time.Sleep(wait)
require.Equal(t, int32(2), callCount.Load())
}

0 comments on commit 457704f

Please sign in to comment.