From 1998a71c8ab176316e4a9d262dd982dd1c3e1eb6 Mon Sep 17 00:00:00 2001 From: Clement Date: Wed, 26 Jun 2024 13:19:59 +0800 Subject: [PATCH 1/8] etcdserver: separate compact from snapshot Signed-off-by: Clement --- server/etcdserver/server.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a6f25548ca1..90afb5192d7 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -17,6 +17,7 @@ package etcdserver import ( "context" "encoding/json" + goerrors "errors" "expvar" "fmt" "math" @@ -963,6 +964,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { <-apply.notifyc s.triggerSnapshot(ep) + s.compactRaftLog(ep.appliedi) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -2164,18 +2166,24 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { lg.Info("skip compaction since there is an inflight snapshot") return } + }) +} + +func (s *EtcdServer) compactRaftLog(appliedi uint64) { + s.GoAttach(func() { + lg := s.Logger() // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries + compacti := uint64(0) + if appliedi > s.Cfg.SnapshotCatchUpEntries { + compacti = appliedi - s.Cfg.SnapshotCatchUpEntries } - err = s.r.raftStorage.Compact(compacti) + err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. - if err == raft.ErrCompacted { + if goerrors.Is(err, raft.ErrCompacted) { return } lg.Panic("failed to compact", zap.Error(err)) From 8dc2a4203b792e1cc6d2680fcafb11762ce41472 Mon Sep 17 00:00:00 2001 From: Clement Date: Wed, 26 Jun 2024 19:50:53 +0800 Subject: [PATCH 2/8] etcdserver: skip compaction if there is an inflight snapshot Signed-off-by: Clement --- server/etcdserver/server.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 90afb5192d7..26981266984 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2156,23 +2156,23 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) - - // When sending a snapshot, etcd will pause compaction. - // After receives a snapshot, the slow follower needs to get all the entries right after - // the snapshot sent to catch up. If we do not pause compaction, the log entries right after - // the snapshot sent might already be compacted. It happens when the snapshot takes long time - // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. - if atomic.LoadInt64(&s.inflightSnapshots) != 0 { - lg.Info("skip compaction since there is an inflight snapshot") - return - } }) } func (s *EtcdServer) compactRaftLog(appliedi uint64) { - s.GoAttach(func() { - lg := s.Logger() + lg := s.Logger() + // When sending a snapshot, etcd will pause compaction. + // After receives a snapshot, the slow follower needs to get all the entries right after + // the snapshot sent to catch up. If we do not pause compaction, the log entries right after + // the snapshot sent might already be compacted. It happens when the snapshot takes long time + // to send and save. Pausing compaction avoids triggering a snapshot sending cycle. + if atomic.LoadInt64(&s.inflightSnapshots) != 0 { + lg.Info("skip compaction since there is an inflight snapshot") + return + } + + s.GoAttach(func() { // keep some in memory log entries for slow followers. compacti := uint64(0) if appliedi > s.Cfg.SnapshotCatchUpEntries { @@ -2188,7 +2188,7 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( + lg.Debug( "compacted Raft logs", zap.Uint64("compact-index", compacti), ) From 641eb411ed3d338aec9af7380e9b13ae4e2e7ecb Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 27 Jun 2024 16:34:44 +0800 Subject: [PATCH 3/8] etcdserver: skip compaction if there are snapshots being created Signed-off-by: Clement --- server/etcdserver/server.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 26981266984..d1784f8a3a1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -208,6 +208,7 @@ type Server interface { type EtcdServer struct { // inflightSnapshots holds count the number of snapshots currently inflight. inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. + creatingSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. term uint64 // must use atomic operations to access; keep 64-bit aligned. @@ -2128,7 +2129,11 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // the go routine created below. s.KV().Commit() + atomic.AddInt64(&s.creatingSnapshots, 1) s.GoAttach(func() { + defer func() { + atomic.AddInt64(&s.creatingSnapshots, -1) + }() lg := s.Logger() // For backward compatibility, generate v2 snapshot from v3 state. @@ -2172,6 +2177,14 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) { return } + // If there are snapshots being created, skip compaction until they are done. + // This ensures `s.r.raftStorage.Compact` does not remove elements from `s.r.raftStorage.Ents`, + // preventing `s.r.raftStorage.CreateSnapshot` from causing a panic. + if atomic.LoadInt64(&s.creatingSnapshots) != 0 { + lg.Info("skip compaction since there are snapshots being created") + return + } + s.GoAttach(func() { // keep some in memory log entries for slow followers. compacti := uint64(0) @@ -2188,7 +2201,7 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Debug( + lg.Info( "compacted Raft logs", zap.Uint64("compact-index", compacti), ) From 93d0484c632a9629c95406e5bb65b5d90c6a738e Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 29 Jun 2024 02:19:52 +0800 Subject: [PATCH 4/8] etcdserver: compact the raft log up to the minimum snapshot index of all ongoing snapshots Signed-off-by: Clement --- server/etcdserver/raft.go | 16 ++- server/etcdserver/server.go | 31 +++--- server/etcdserver/snapshot_tracker.go | 74 +++++++++++++ server/etcdserver/snapshot_tracker_test.go | 122 +++++++++++++++++++++ 4 files changed, 223 insertions(+), 20 deletions(-) create mode 100644 server/etcdserver/snapshot_tracker.go create mode 100644 server/etcdserver/snapshot_tracker_test.go diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index d397612af9c..f6858dc7b37 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -92,6 +92,9 @@ type raftNode struct { // a chan to send out readState readStateC chan raft.ReadState + // keep track of snapshots being created + snapshotTracker SnapshotTracker + // utility ticker *time.Ticker // contention detectors for raft heartbeat message @@ -136,12 +139,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { raftNodeConfig: cfg, // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. - td: contention.NewTimeoutDetector(2 * cfg.heartbeat), - readStateC: make(chan raft.ReadState, 1), - msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), - applyc: make(chan toApply), - stopped: make(chan struct{}), - done: make(chan struct{}), + td: contention.NewTimeoutDetector(2 * cfg.heartbeat), + readStateC: make(chan raft.ReadState, 1), + snapshotTracker: SnapshotTracker{}, + msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), + applyc: make(chan toApply), + stopped: make(chan struct{}), + done: make(chan struct{}), } if r.heartbeat == 0 { r.ticker = &time.Ticker{} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index d1784f8a3a1..c061cae2c95 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -208,7 +208,6 @@ type Server interface { type EtcdServer struct { // inflightSnapshots holds count the number of snapshots currently inflight. inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. - creatingSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. term uint64 // must use atomic operations to access; keep 64-bit aligned. @@ -2129,11 +2128,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // the go routine created below. s.KV().Commit() - atomic.AddInt64(&s.creatingSnapshots, 1) + s.r.snapshotTracker.Track(snapi) + s.GoAttach(func() { defer func() { - atomic.AddInt64(&s.creatingSnapshots, -1) + s.r.snapshotTracker.UnTrack(snapi) }() + lg := s.Logger() // For backward compatibility, generate v2 snapshot from v3 state. @@ -2177,21 +2178,23 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) { return } - // If there are snapshots being created, skip compaction until they are done. - // This ensures `s.r.raftStorage.Compact` does not remove elements from `s.r.raftStorage.Ents`, - // preventing `s.r.raftStorage.CreateSnapshot` from causing a panic. - if atomic.LoadInt64(&s.creatingSnapshots) != 0 { - lg.Info("skip compaction since there are snapshots being created") + // keep some in memory log entries for slow followers. + compacti := uint64(0) + if appliedi > s.Cfg.SnapshotCatchUpEntries { + compacti = appliedi - s.Cfg.SnapshotCatchUpEntries + } + + // if there are snapshots being created, compact the raft log up to the minimum snapshot index. + if minSpani, err := s.r.snapshotTracker.MinSnapi(); err == nil && minSpani < appliedi && minSpani > s.Cfg.SnapshotCatchUpEntries { + compacti = minSpani - s.Cfg.SnapshotCatchUpEntries + } + + // no need to compact if compacti == 0 + if compacti == 0 { return } s.GoAttach(func() { - // keep some in memory log entries for slow followers. - compacti := uint64(0) - if appliedi > s.Cfg.SnapshotCatchUpEntries { - compacti = appliedi - s.Cfg.SnapshotCatchUpEntries - } - err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. diff --git a/server/etcdserver/snapshot_tracker.go b/server/etcdserver/snapshot_tracker.go new file mode 100644 index 00000000000..e18a87255f8 --- /dev/null +++ b/server/etcdserver/snapshot_tracker.go @@ -0,0 +1,74 @@ +package etcdserver + +import ( + "cmp" + "container/heap" + "errors" + "sync" +) + +// SnapshotTracker keeps track of all ongoing snapshot creation. To safeguard ongoing snapshot creation, +// only compact the raft log up to the minimum snapshot index in the track. +type SnapshotTracker struct { + h minHeap[uint64] + mu sync.Mutex +} + +// MinSnapi returns the minimum snapshot index in the track or an error if the tracker is empty. +func (st *SnapshotTracker) MinSnapi() (uint64, error) { + st.mu.Lock() + defer st.mu.Unlock() + if st.h.Len() == 0 { + return 0, errors.New("SnapshotTracker is empty") + } + return st.h[0], nil +} + +// Track adds a snapi to the tracker. Make sure to call UnTrack once the snapshot has been created. +func (st *SnapshotTracker) Track(snapi uint64) { + st.mu.Lock() + defer st.mu.Unlock() + heap.Push(&st.h, snapi) +} + +// UnTrack removes 'snapi' from the tracker. No action taken if 'snapi' is not found. +func (st *SnapshotTracker) UnTrack(snapi uint64) { + st.mu.Lock() + defer st.mu.Unlock() + + for i := 0; i < len((*st).h); i++ { + if (*st).h[i] == snapi { + heap.Remove(&st.h, i) + return + } + } +} + +// minHeap implements the heap.Interface for E. +type minHeap[E interface { + cmp.Ordered +}] []E + +func (h minHeap[_]) Len() int { + return len(h) +} + +func (h minHeap[_]) Less(i, j int) bool { + return h[i] < h[j] +} + +func (h minHeap[_]) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minHeap[E]) Push(x any) { + *h = append(*h, x.(E)) +} + +func (h *minHeap[E]) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/server/etcdserver/snapshot_tracker_test.go b/server/etcdserver/snapshot_tracker_test.go new file mode 100644 index 00000000000..27f2cf670d0 --- /dev/null +++ b/server/etcdserver/snapshot_tracker_test.go @@ -0,0 +1,122 @@ +package etcdserver + +import ( + "container/heap" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestSnapTracker_MinSnapi(t *testing.T) { + st := SnapshotTracker{} + + _, err := st.MinSnapi() + assert.NotNil(t, err, "SnapshotTracker should be empty initially") + + st.Track(10) + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the only tracked snapshot index") + + st.Track(5) + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(5), minSnapi, "MinSnapi should return the minimum tracked snapshot index") + + st.UnTrack(5) + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the remaining tracked snapshot index") +} + +func TestSnapTracker_Track(t *testing.T) { + st := SnapshotTracker{} + st.Track(20) + st.Track(10) + st.Track(15) + + assert.Equal(t, 3, st.h.Len(), "SnapshotTracker should have 3 snapshots tracked") + + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the minimum tracked snapshot index") +} + +func TestSnapTracker_UnTrack(t *testing.T) { + st := SnapshotTracker{} + st.Track(20) + st.Track(30) + st.Track(40) + // track another snapshot with the same index + st.Track(20) + + st.UnTrack(30) + assert.Equal(t, 3, st.h.Len()) + + minSnapi, err := st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(20), minSnapi) + + st.UnTrack(20) + assert.Equal(t, 2, st.h.Len()) + + minSnapi, err = st.MinSnapi() + assert.Nil(t, err) + assert.Equal(t, uint64(20), minSnapi) + + st.UnTrack(20) + minSnapi, err = st.MinSnapi() + assert.Equal(t, uint64(40), minSnapi) + + st.UnTrack(40) + _, err = st.MinSnapi() + assert.NotNil(t, err) +} + +func newMinHeap(elements ...uint64) minHeap[uint64] { + h := minHeap[uint64](elements) + heap.Init(&h) + return h +} + +func TestMinHeapLen(t *testing.T) { + h := newMinHeap(3, 2, 1) + assert.Equal(t, 3, h.Len()) +} + +func TestMinHeapLess(t *testing.T) { + h := newMinHeap(3, 2, 1) + assert.True(t, h.Less(0, 1)) +} + +func TestMinHeapSwap(t *testing.T) { + h := newMinHeap(3, 2, 1) + h.Swap(0, 1) + assert.Equal(t, uint64(2), h[0]) + assert.Equal(t, uint64(1), h[1]) + assert.Equal(t, uint64(3), h[2]) +} + +func TestMinHeapPushPop(t *testing.T) { + h := newMinHeap(3, 2) + heap.Push(&h, uint64(1)) + assert.Equal(t, 3, h.Len()) + + got := heap.Pop(&h).(uint64) + assert.Equal(t, uint64(1), got) +} + +func TestMinHeapEmpty(t *testing.T) { + h := minHeap[uint64]{} + assert.Equal(t, 0, h.Len()) +} + +func TestMinHeapSingleElement(t *testing.T) { + h := newMinHeap(uint64(1)) + assert.Equal(t, 1, h.Len()) + + heap.Push(&h, uint64(2)) + assert.Equal(t, 2, h.Len()) + + got := heap.Pop(&h) + assert.Equal(t, uint64(1), got) +} From 302254b9b3fdf5fc7615e156b4768516ec4e3b3b Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 29 Jun 2024 02:37:55 +0800 Subject: [PATCH 5/8] fix lint errorgs Signed-off-by: Clement --- server/etcdserver/snapshot_tracker_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/etcdserver/snapshot_tracker_test.go b/server/etcdserver/snapshot_tracker_test.go index 27f2cf670d0..d2d00239dde 100644 --- a/server/etcdserver/snapshot_tracker_test.go +++ b/server/etcdserver/snapshot_tracker_test.go @@ -2,8 +2,9 @@ package etcdserver import ( "container/heap" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestSnapTracker_MinSnapi(t *testing.T) { @@ -65,6 +66,7 @@ func TestSnapTracker_UnTrack(t *testing.T) { st.UnTrack(20) minSnapi, err = st.MinSnapi() + assert.Nil(t, err) assert.Equal(t, uint64(40), minSnapi) st.UnTrack(40) From fcb154fc74047f9391ab417488dbcdd1d027dbb6 Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 29 Jun 2024 02:55:45 +0800 Subject: [PATCH 6/8] fix vet errors Signed-off-by: Clement --- server/etcdserver/raft.go | 2 +- server/etcdserver/snapshot_tracker.go | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index f6858dc7b37..7777cff61fe 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -141,7 +141,7 @@ func newRaftNode(cfg raftNodeConfig) *raftNode { // expect to send a heartbeat within 2 heartbeat intervals. td: contention.NewTimeoutDetector(2 * cfg.heartbeat), readStateC: make(chan raft.ReadState, 1), - snapshotTracker: SnapshotTracker{}, + snapshotTracker: *newSnapshotTracker(), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), applyc: make(chan toApply), stopped: make(chan struct{}), diff --git a/server/etcdserver/snapshot_tracker.go b/server/etcdserver/snapshot_tracker.go index e18a87255f8..6a05f68f604 100644 --- a/server/etcdserver/snapshot_tracker.go +++ b/server/etcdserver/snapshot_tracker.go @@ -11,7 +11,14 @@ import ( // only compact the raft log up to the minimum snapshot index in the track. type SnapshotTracker struct { h minHeap[uint64] - mu sync.Mutex + mu *sync.Mutex +} + +func newSnapshotTracker() *SnapshotTracker { + return &SnapshotTracker{ + h: minHeap[uint64]{}, + mu: new(sync.Mutex), + } } // MinSnapi returns the minimum snapshot index in the track or an error if the tracker is empty. From 99b3806d0a4e75b9b97bde5f3c9c15150b04b2db Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 29 Jun 2024 03:06:24 +0800 Subject: [PATCH 7/8] fix test cases Signed-off-by: Clement --- server/etcdserver/snapshot_tracker_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/snapshot_tracker_test.go b/server/etcdserver/snapshot_tracker_test.go index d2d00239dde..965722cd1bf 100644 --- a/server/etcdserver/snapshot_tracker_test.go +++ b/server/etcdserver/snapshot_tracker_test.go @@ -8,7 +8,7 @@ import ( ) func TestSnapTracker_MinSnapi(t *testing.T) { - st := SnapshotTracker{} + st := *newSnapshotTracker() _, err := st.MinSnapi() assert.NotNil(t, err, "SnapshotTracker should be empty initially") @@ -30,7 +30,7 @@ func TestSnapTracker_MinSnapi(t *testing.T) { } func TestSnapTracker_Track(t *testing.T) { - st := SnapshotTracker{} + st := *newSnapshotTracker() st.Track(20) st.Track(10) st.Track(15) @@ -43,7 +43,7 @@ func TestSnapTracker_Track(t *testing.T) { } func TestSnapTracker_UnTrack(t *testing.T) { - st := SnapshotTracker{} + st := *newSnapshotTracker() st.Track(20) st.Track(30) st.Track(40) From 5745418abd6af66320321b69ce502e92c41a9ce1 Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 29 Jun 2024 03:15:16 +0800 Subject: [PATCH 8/8] add license header Signed-off-by: Clement --- server/etcdserver/snapshot_tracker.go | 14 ++++++++++++++ server/etcdserver/snapshot_tracker_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/server/etcdserver/snapshot_tracker.go b/server/etcdserver/snapshot_tracker.go index 6a05f68f604..271ab53999e 100644 --- a/server/etcdserver/snapshot_tracker.go +++ b/server/etcdserver/snapshot_tracker.go @@ -1,3 +1,17 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package etcdserver import ( diff --git a/server/etcdserver/snapshot_tracker_test.go b/server/etcdserver/snapshot_tracker_test.go index 965722cd1bf..098c518e51d 100644 --- a/server/etcdserver/snapshot_tracker_test.go +++ b/server/etcdserver/snapshot_tracker_test.go @@ -1,3 +1,17 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package etcdserver import (