Skip to content

Commit

Permalink
Extract rangeEvents function
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 2, 2024
1 parent 78885f6 commit ae2d87b
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 16 deletions.
42 changes: 26 additions & 16 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,20 +357,7 @@ func (s *watchableStore) syncWatchers() int {
compactionRev := s.store.compactMainRev

wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(s.store.lg, wg, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
evs := rangeEvents(s.store.lg, s.store.b, minRev, curRev+1, wg)

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
Expand Down Expand Up @@ -422,15 +409,38 @@ func (s *watchableStore) syncWatchers() int {
return s.unsynced.size()
}

// rangeEvents returns events in range [minRev, maxRev).
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c contains) []mvccpb.Event {
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := b.ReadTx()
tx.RLock()
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
evs := kvsToEvents(lg, c, revs, vs)
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
// We can only unlock after Unmarshal, which will do deep copy.
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
tx.RUnlock()
return evs
}

type contains interface {
contains(string) bool
}

// kvsToEvents gets all events for the watchers from all key-value pairs
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
func kvsToEvents(lg *zap.Logger, c contains, revs, vals [][]byte) (evs []mvccpb.Event) {
for i, v := range vals {
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
}

if !wg.contains(string(kv.Key)) {
if !c.contains(string(kv.Key)) {
continue
}

Expand Down
108 changes: 108 additions & 0 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,114 @@ func TestSyncWatchers(t *testing.T) {
}
}

func TestRangeEvents(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(s, b)

foo1 := []byte("foo1")
foo2 := []byte("foo2")
foo3 := []byte("foo3")
value := []byte("bar")
s.Put(foo1, value, lease.NoLease)
s.Put(foo2, value, lease.NoLease)
s.Put(foo3, value, lease.NoLease)
s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events

expectEvents := []mvccpb.Event{
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo1,
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo2,
CreateRevision: 3,
ModRevision: 3,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.PUT,
Kv: &mvccpb.KeyValue{
Key: foo3,
CreateRevision: 4,
ModRevision: 4,
Version: 1,
Value: value,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo1,
ModRevision: 5,
},
},
{
Type: mvccpb.DELETE,
Kv: &mvccpb.KeyValue{
Key: foo2,
ModRevision: 5,
},
},
}

var evs []mvccpb.Event
evs = rangeEvents(lg, b, 2, 6, fakeContains{})
assert.Equal(t, expectEvents[0:5], evs)
evs = rangeEvents(lg, b, 2, 5, fakeContains{})
assert.Equal(t, expectEvents[0:3], evs)
evs = rangeEvents(lg, b, 2, 4, fakeContains{})
assert.Equal(t, expectEvents[0:2], evs)
evs = rangeEvents(lg, b, 2, 3, fakeContains{})
assert.Equal(t, expectEvents[0:1], evs)
evs = rangeEvents(lg, b, 2, 2, fakeContains{})
assert.Empty(t, evs)

evs = rangeEvents(lg, b, 3, 6, fakeContains{})
assert.Equal(t, expectEvents[1:5], evs)
evs = rangeEvents(lg, b, 4, 6, fakeContains{})
assert.Equal(t, expectEvents[2:5], evs)
evs = rangeEvents(lg, b, 5, 6, fakeContains{})
assert.Equal(t, expectEvents[3:5], evs)
evs = rangeEvents(lg, b, 6, 6, fakeContains{})
assert.Empty(t, evs)

evs = rangeEvents(lg, b, 2, 3, fakeContains{})
assert.Equal(t, expectEvents[0:1], evs)
evs = rangeEvents(lg, b, 2, 4, fakeContains{})
assert.Equal(t, expectEvents[0:2], evs)
evs = rangeEvents(lg, b, 3, 4, fakeContains{})
assert.Equal(t, expectEvents[1:2], evs)
evs = rangeEvents(lg, b, 3, 5, fakeContains{})
assert.Equal(t, expectEvents[1:3], evs)
evs = rangeEvents(lg, b, 4, 5, fakeContains{})
assert.Equal(t, expectEvents[2:3], evs)
evs = rangeEvents(lg, b, 4, 6, fakeContains{})
assert.Equal(t, expectEvents[2:5], evs)
evs = rangeEvents(lg, b, 5, 6, fakeContains{})
assert.Equal(t, expectEvents[3:5], evs)
evs = rangeEvents(lg, b, 6, 6, fakeContains{})
assert.Empty(t, evs)
}

type fakeContains struct{}

func (f fakeContains) contains(string) bool {
return true
}

// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
Expand Down

0 comments on commit ae2d87b

Please sign in to comment.