diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index b9948f30ef53..7a0c09cc2f98 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -227,6 +227,14 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s } } if beExist { + s1, s2 := be.Size(), be.SizeInUse() + cfg.Logger.Info( + "recovered v3 backend", + zap.Int64("backend-size-bytes", s1), + zap.String("backend-size", humanize.Bytes(uint64(s1))), + zap.Int64("backend-size-in-use-bytes", s2), + zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), + ) if err = schema.Validate(cfg.Logger, be.ReadTx()); err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) return nil, err @@ -414,14 +422,6 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe // already been closed in this case, so we should set the backend again. ci.SetBackend(be) - s1, s2 := be.Size(), be.SizeInUse() - cfg.Logger.Info( - "recovered v3 backend from snapshot", - zap.Int64("backend-size-bytes", s1), - zap.String("backend-size", humanize.Bytes(uint64(s1))), - zap.Int64("backend-size-in-use-bytes", s2), - zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))), - ) if beExist { // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. diff --git a/server/storage/backend.go b/server/storage/backend.go index 9f518f11c009..1137d838981a 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -105,8 +105,10 @@ func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { + cfg.Logger.Info("Skipping snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index)) return oldbe, nil } + cfg.Logger.Info("Recovering from snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index)) oldbe.Close() return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 13305b1a7964..8d75366396a1 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -22,17 +22,22 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/framework/testutils" + "go.etcd.io/etcd/tests/v3/robustness/failpoint" ) func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) } @@ -450,3 +455,65 @@ func hasKVs(t *testing.T, ctl *e2e.EtcdctlV3, kvs []testutils.KV, currentRev int require.True(t, int64(currentRev) >= v.Kvs[0].ModRevision) } } + +func TestRecoverSnapshotBackend(t *testing.T) { + e2e.BeforeTest(t) + ctx, trafficCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer trafficCancel() + + epc, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithClusterSize(3), + e2e.WithKeepDataDir(true), + e2e.WithPeerProxy(true), + e2e.WithSnapshotCatchUpEntries(50), + e2e.WithSnapshotCount(50), + e2e.WithGoFailEnabled(true), + e2e.WithIsPeerTLS(true), + ) + require.NoError(t, err) + + defer epc.Close() + + blackholedMember := epc.Procs[0] + otherMember := epc.Procs[1] + + wg := sync.WaitGroup{} + + trafficCtx, trafficCancel := context.WithCancel(ctx) + c, err := clientv3.New(clientv3.Config{ + Endpoints: otherMember.EndpointsGRPC(), + Logger: zap.NewNop(), + DialKeepAliveTime: 10 * time.Second, + DialKeepAliveTimeout: 100 * time.Millisecond, + }) + require.NoError(t, err) + defer c.Close() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-trafficCtx.Done(): + return + default: + } + ctx, cancel := context.WithTimeout(trafficCtx, 50*time.Millisecond) + c.Put(ctx, "a", "b") + cancel() + time.Sleep(10 * time.Millisecond) + } + }() + + err = blackholedMember.Failpoints().SetupHTTP(ctx, "beforeOpenSnapshotBackend", "panic") + require.NoError(t, err) + err = failpoint.Blackhole(ctx, t, blackholedMember, epc, true) + require.NoError(t, err) + err = blackholedMember.Wait(ctx) + require.NoError(t, err) + trafficCancel() + wg.Wait() + err = blackholedMember.Start(ctx) + require.NoError(t, err) + _, err = blackholedMember.Logs().ExpectWithContext(ctx, expect.ExpectedResponse{Value: "Recovering from snapshot backend"}) + assert.NoError(t, err) +} diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index de5bfe53e9ff..d640ac355353 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -50,6 +50,7 @@ var ( DropPeerNetwork, RaftBeforeSaveSleep, RaftAfterSaveSleep, + BeforeOpenSnapshotBackend, } ) diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index 7c12945ef6ae..85634c6987cd 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -53,6 +53,7 @@ var ( RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackhole{waitTillSnapshot: true}, Follower} RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower} RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower} + BeforeOpenSnapshotBackend Failpoint = goPanicFailpoint{"beforeOpenSnapshotBackend", triggerBlackhole{waitTillSnapshot: true}, Follower} BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second} RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second} RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second} diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index a765c427ec24..5d59fba3d99c 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -51,7 +51,7 @@ type triggerBlackhole struct { } func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - return blackhole(ctx, t, member, clus, tb.waitTillSnapshot) + return Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) } func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { @@ -61,7 +61,7 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces return config.ClusterSize > 1 && process.PeerProxy() != nil } -func blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { +func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { proxy := member.PeerProxy() // Blackholing will cause peers to not be able to use streamWriters registered with member