Skip to content

Commit

Permalink
Use maxRequestSizeBytes for batch limiting
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 1, 2024
1 parent 5811ff1 commit 9bb91dd
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 74 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/hashkv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func calculateHashKV(dbPath string, rev int64) (HashKV, error) {
cfg := backend.DefaultBackendConfig(zap.NewNop())
cfg.Path = dbPath
b := backend.New(cfg)
st := mvcc.NewStore(zap.NewNop(), b, nil, mvcc.StoreConfig{})
st := mvcc.NewStore(zap.NewNop(), b, nil, mvcc.WatchableStoreConfig{})
hst := mvcc.NewHashStorage(zap.NewNop(), st)

h, _, err := hst.HashByRev(rev)
Expand Down
4 changes: 2 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,6 @@ func (c *ServerConfig) BootstrapTimeoutEffective() time.Duration {

func (c *ServerConfig) BackendPath() string { return datadir.ToBackendFileName(c.DataDir) }

func (c *ServerConfig) MaxRequestBytesWithOverhead() uint {
return c.MaxRequestBytes + grpcOverheadBytes
func (c *ServerConfig) MaxRequestBytesWithOverhead() int {
return int(c.MaxRequestBytes) + grpcOverheadBytes
}
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
opts = append(opts, grpc.ChainUnaryInterceptor(chainUnaryInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(chainStreamInterceptors...))

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytesWithOverhead())))
opts = append(opts, grpc.MaxRecvMsgSize(s.Cfg.MaxRequestBytesWithOverhead()))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

Expand Down
10 changes: 5 additions & 5 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type watchServer struct {
clusterID int64
memberID int64

maxRequestBytes uint
maxRequestBytes int

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand Down Expand Up @@ -126,7 +126,7 @@ type serverWatchStream struct {
clusterID int64
memberID int64

maxRequestBytes uint
maxRequestBytes int

sg apply.RaftStatusGetter
watchable mvcc.WatchableKV
Expand Down Expand Up @@ -544,12 +544,12 @@ func IsCreateEvent(e mvccpb.Event) bool {

func sendFragments(
wr *pb.WatchResponse,
maxRequestBytes uint,
maxRequestBytes int,
sendFunc func(*pb.WatchResponse) error,
) error {
// no need to fragment if total request size is smaller
// than max request limit or response contains only one event
if uint(wr.Size()) < maxRequestBytes || len(wr.Events) < 2 {
if wr.Size() < maxRequestBytes || len(wr.Events) < 2 {
return sendFunc(wr)
}

Expand All @@ -562,7 +562,7 @@ func sendFragments(
cur := ow
for _, ev := range wr.Events[idx:] {
cur.Events = append(cur.Events, ev)
if len(cur.Events) > 1 && uint(cur.Size()) >= maxRequestBytes {
if len(cur.Events) > 1 && cur.Size() >= maxRequestBytes {
cur.Events = cur.Events[:len(cur.Events)-1]
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func TestSendFragment(t *testing.T) {
tt := []struct {
wr *pb.WatchResponse
maxRequestBytes uint
maxRequestBytes int
fragments int
werr error
}{
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func TestHashKVHandler(t *testing.T) {
etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID))
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, etcdSrv.kv.Close())
}()
Expand Down
9 changes: 6 additions & 3 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,12 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
return nil, err
}

mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
mvccStoreConfig := mvcc.WatchableStoreConfig{
StoreConfig: mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
},
WatchBatchMaxSize: cfg.MaxRequestBytesWithOverhead(),
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestSnapshotDisk(t *testing.T) {
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestSnapshotMemory(t *testing.T) {
v2store: st,
consistIndex: cindex.NewConsistentIndex(be),
}
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{})
srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
defer func() {
assert.NoError(t, srv.kv.Close())
}()
Expand Down Expand Up @@ -775,7 +775,7 @@ func TestSnapshotOrdering(t *testing.T) {
beHooks: serverstorage.NewBackendHooks(lg, ci),
}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
s.be = be

s.start()
Expand Down Expand Up @@ -869,7 +869,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
authStore: auth.NewAuthStore(lg, schema.NewAuthBackend(lg, be), nil, 1),
}

s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.StoreConfig{})
s.kv = mvcc.New(lg, be, &lease.FakeLessor{}, mvcc.WatchableStoreConfig{})
s.be = be

s.start()
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

w := s.NewWatchStream()
Expand Down
25 changes: 16 additions & 9 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type watchable interface {

type watchableStore struct {
*store
watchBatchMaxSize int

// mu protects watcher groups and batches. It should never be locked
// before locking store.mu to avoid deadlock.
Expand All @@ -76,24 +77,30 @@ var _ WatchableKV = (*watchableStore)(nil)
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
s := newWatchableStore(lg, b, le, cfg)
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
type WatchableStoreConfig struct {
StoreConfig
WatchBatchMaxSize int
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg WatchableStoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
store: NewStore(lg, b, le, cfg.StoreConfig),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
stopc: make(chan struct{}),
watchBatchMaxSize: cfg.WatchBatchMaxSize,
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
Expand Down Expand Up @@ -373,7 +380,7 @@ func (s *watchableStore) syncWatchers() int {
tx.RUnlock()

victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
wb := newWatcherBatch(wg, evs, s.watchBatchMaxSize)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
Expand Down Expand Up @@ -449,7 +456,7 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
// watchers that watch on the key of the event.
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
victim := make(watcherBatch)
for w, eb := range newWatcherBatch(&s.synced, evs) {
for w, eb := range newWatcherBatch(&s.synced, evs, s.watchBatchMaxSize) {
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
Expand Down
10 changes: 5 additions & 5 deletions server/storage/mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func BenchmarkWatchableStorePut(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

// arbitrary number of bytes
Expand All @@ -47,7 +47,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
// some synchronization operations, such as mutex locking.
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

// arbitrary number of bytes
Expand Down Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {

func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, be)

k := []byte("testkey")
Expand Down Expand Up @@ -122,7 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
// we should put to simulate the real-world use cases.
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})

defer cleanup(ws, be)

Expand Down Expand Up @@ -164,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})

defer cleanup(s, be)

Expand Down
36 changes: 16 additions & 20 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

func TestWatch(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand All @@ -52,7 +52,7 @@ func TestWatch(t *testing.T) {

func TestNewWatcherCancel(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestCancelUnsynced(t *testing.T) {
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced to test if syncWatchers works as expected.
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

// Put a key so that we can spawn watchers on that key.
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestCancelUnsynced(t *testing.T) {
// and moves these watchers to synced.
func TestSyncWatchers(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestSyncWatchers(t *testing.T) {
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {

b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := New(lg, b, &lease.FakeLessor{}, StoreConfig{})
s := New(lg, b, &lease.FakeLessor{}, WatchableStoreConfig{})

defer func() {
cleanup(s, b)
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {

func TestWatchFutureRev(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey := []byte("foo")
Expand Down Expand Up @@ -344,11 +344,11 @@ func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, _ := betesting.NewDefaultTmpBackend(t)
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{})
s1 := New(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s1, b1)

b2, _ := betesting.NewDefaultTmpBackend(t)
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{})
s2 := New(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s2, b2)

testKey, testValue := []byte("foo"), []byte("bar")
Expand Down Expand Up @@ -451,13 +451,9 @@ func TestWatchBatchUnsynced(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
oldMaxRevs := watchBatchMaxSize
defer func() {
watchBatchMaxSize = oldMaxRevs
cleanup(s, b)
}()
watchBatchMaxSize = tc.watchBatchMaxSize
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{
WatchBatchMaxSize: tc.watchBatchMaxSize,
})

k := []byte("k")
eventProtoOverhead := 13
Expand Down Expand Up @@ -576,7 +572,7 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
wg.add(w)
}

gwe := newWatcherBatch(&wg, tt.evs)
gwe := newWatcherBatch(&wg, tt.evs, 0)
if len(gwe) != len(tt.wwe) {
t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
}
Expand All @@ -598,7 +594,7 @@ func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync

b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})

defer func() {
cleanup(s, b)
Expand Down Expand Up @@ -675,7 +671,7 @@ func TestWatchVictims(t *testing.T) {
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, WatchableStoreConfig{})
defer cleanup(s, b)

testKey, testValue := []byte("foo"), []byte("bar")
Expand Down
2 changes: 1 addition & 1 deletion server/storage/mvcc/watcher_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
watchable := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, WatchableStoreConfig{})

defer cleanup(watchable, be)

Expand Down
Loading

0 comments on commit 9bb91dd

Please sign in to comment.