Skip to content

Commit

Permalink
Merge pull request #8590 from Lyndon-Li/fix-data-mover-progress-missi…
Browse files Browse the repository at this point in the history
…ng-after-25-updates

Issue 8579 - set event burst
  • Loading branch information
Lyndon-Li authored Jan 10, 2025
2 parents 42d2e9b + 32ae409 commit 46b8a31
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8590-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix issue #8579, set event burst to block event broadcaster from filtering events
4 changes: 4 additions & 0 deletions pkg/util/kube/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
package kube

import (
"math"
"sync"
"time"

Expand Down Expand Up @@ -60,6 +61,9 @@ func NewEventRecorder(kubeClient kubernetes.Interface, scheme *runtime.Scheme, e
}

res.broadcaster = record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{
// Bypass the built-in EventCorrelator's rate filtering, otherwise, the event will be abandoned if the rate exceeds.
// The callers (i.e., data mover pods) have controlled the rate and total number outside. E.g., the progress is designed to be updated every 10 seconds and is changeable.
BurstSize: math.MaxInt32,
MaxEvents: 1,
MessageFunc: func(event *v1.Event) string {
return event.Message
Expand Down
43 changes: 40 additions & 3 deletions pkg/util/kube/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func TestEvent(t *testing.T) {
}

cases := []struct {
name string
events []testEvent
expected int
name string
events []testEvent
generateDiff int
generateSame int
generateEnding bool
expected int
}{
{
name: "update events, different message",
Expand Down Expand Up @@ -116,6 +119,18 @@ func TestEvent(t *testing.T) {
},
expected: -1,
},
{
name: "auto generate 200",
generateDiff: 200,
generateEnding: true,
expected: 201,
},
{
name: "auto generate 200, update",
generateSame: 200,
generateEnding: true,
expected: 2,
},
}

shutdownTimeout = time.Second * 5
Expand Down Expand Up @@ -143,6 +158,28 @@ func TestEvent(t *testing.T) {
_, err = client.CoreV1().Pods("fake-ns").Create(context.Background(), pod, metav1.CreateOptions{})
require.NoError(t, err)

for i := 0; i < tc.generateDiff; i++ {
tc.events = append(tc.events, testEvent{
reason: fmt.Sprintf("fake-reason-%v", i),
message: fmt.Sprintf("fake-message-%v", i),
})
}

for i := 0; i < tc.generateSame; i++ {
tc.events = append(tc.events, testEvent{
reason: "fake-reason",
message: fmt.Sprintf("fake-message-%v", i),
})
}

if tc.generateEnding {
tc.events = append(tc.events, testEvent{
reason: "fake-ending-reason",
message: "fake-ending-message",
ending: true,
})
}

for _, e := range tc.events {
if e.ending {
recorder.EndingEvent(pod, e.warning, e.reason, e.message)
Expand Down

0 comments on commit 46b8a31

Please sign in to comment.