Skip to content

Commit

Permalink
Keep the minRev between syncWatchers loops
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Dec 3, 2024
1 parent b038739 commit a4789dd
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,21 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
curRev := s.store.currentRev
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minRev := s.unsynced.chooseAll(curRev, compactionRev)
for _, batch := range s.victims {
for w, b := range batch {
watcherRev := max(w.minRev, b.moreRev)
if len(b.evs) > 0 {
watcherRev = max(watcherRev, b.evs[len(b.evs)-1].Kv.ModRevision)
}
minRev = min(minRev, watcherRev)
}
}
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
wb := newWatcherBatch(&s.unsynced, evs)
for w := range s.unsynced.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
Expand Down

0 comments on commit a4789dd

Please sign in to comment.