Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: change the snapshot + compact into sync operation #18283

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 48 additions & 50 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,65 +2126,63 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// the go routine created below.
s.KV().Commit()

s.GoAttach(func() {
lg := s.Logger()
lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {
// the snapshot was done asynchronously with the progress of raft.
// raft might have already got a newer snapshot.
if err == raft.ErrSnapOutOfDate {
return
}
lg.Panic("failed to create snapshot", zap.Error(err))
}

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
lg.Panic("failed to compact", zap.Error(err))
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
})
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
}

// CutPeer drops messages to the specified peer.
Expand Down
Loading