Skip to content

Commit

Permalink
Adaptive sync draft
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 29, 2024
1 parent 8f91eb1 commit 7296cd9
Showing 1 changed file with 7 additions and 17 deletions.
24 changes: 7 additions & 17 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,29 +215,19 @@ func (s *watchableStore) Restore(b backend.Backend) error {
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()

waitDuration := 100 * time.Millisecond
baseWaitDuration := 25 * time.Millisecond
waitDuration := baseWaitDuration
delayTicker := time.NewTicker(waitDuration)
defer delayTicker.Stop()

for {
s.mu.RLock()
st := time.Now()
lastUnsyncedWatchers := s.unsynced.size()
s.mu.RUnlock()

unsyncedWatchers := 0
if lastUnsyncedWatchers > 0 {
unsyncedWatchers = s.syncWatchers()
unsynced := s.syncWatchers()
if unsynced == 0 {
waitDuration = baseWaitDuration
} else {
waitDuration = min(time.Second, waitDuration*2)
}
syncDuration := time.Since(st)

delayTicker.Reset(waitDuration)
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
// be fair to other store operations by yielding time taken
delayTicker.Reset(syncDuration)
}

select {
case <-delayTicker.C:
case <-s.stopc:
Expand Down

0 comments on commit 7296cd9

Please sign in to comment.