Skip to content

Commit

Permalink
Improve logs around recovering snapshot backend and add an e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Dec 20, 2023
1 parent 1e8d66e commit ab24395
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 10 deletions.
16 changes: 8 additions & 8 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,14 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s
}
}
if beExist {
s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info(
"recovered v3 backend",
zap.Int64("backend-size-bytes", s1),
zap.String("backend-size", humanize.Bytes(uint64(s1))),
zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
)
if err = schema.Validate(cfg.Logger, be.ReadTx()); err != nil {
cfg.Logger.Error("Failed to validate schema", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -414,14 +422,6 @@ func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backe
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)

s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info(
"recovered v3 backend from snapshot",
zap.Int64("backend-size-bytes", s1),
zap.String("backend-size", humanize.Bytes(uint64(s1))),
zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
)
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
Expand Down
2 changes: 2 additions & 0 deletions server/storage/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap
consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx())
}
if snapshot.Metadata.Index <= consistentIndex {
cfg.Logger.Info("Skipping snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index))
return oldbe, nil
}
cfg.Logger.Info("Recovering from snapshot backend", zap.Uint64("consistent-index", consistentIndex), zap.Uint64("snapshot-index", snapshot.Metadata.Index))
oldbe.Close()
return OpenSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks)
}
67 changes: 67 additions & 0 deletions tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/etcdutl/v3/snapshot"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/framework/testutils"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
)

func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
Expand Down Expand Up @@ -450,3 +455,65 @@ func hasKVs(t *testing.T, ctl *e2e.EtcdctlV3, kvs []testutils.KV, currentRev int
require.True(t, int64(currentRev) >= v.Kvs[0].ModRevision)
}
}

func TestRecoverSnapshotBackend(t *testing.T) {
e2e.BeforeTest(t)
ctx, trafficCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer trafficCancel()

epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(3),
e2e.WithKeepDataDir(true),
e2e.WithPeerProxy(true),
e2e.WithSnapshotCatchUpEntries(50),
e2e.WithSnapshotCount(50),
e2e.WithGoFailEnabled(true),
e2e.WithIsPeerTLS(true),
)
require.NoError(t, err)

defer epc.Close()

blackholedMember := epc.Procs[0]
otherMember := epc.Procs[1]

wg := sync.WaitGroup{}

trafficCtx, trafficCancel := context.WithCancel(ctx)
c, err := clientv3.New(clientv3.Config{
Endpoints: otherMember.EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 10 * time.Second,
DialKeepAliveTimeout: 100 * time.Millisecond,
})
require.NoError(t, err)
defer c.Close()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-trafficCtx.Done():
return
default:
}
ctx, cancel := context.WithTimeout(trafficCtx, 50*time.Millisecond)
c.Put(ctx, "a", "b")
cancel()
time.Sleep(10 * time.Millisecond)
}
}()

err = blackholedMember.Failpoints().SetupHTTP(ctx, "beforeOpenSnapshotBackend", "panic")
require.NoError(t, err)
err = failpoint.Blackhole(ctx, t, blackholedMember, epc, true)
require.NoError(t, err)
err = blackholedMember.Wait(ctx)
require.NoError(t, err)
trafficCancel()
wg.Wait()
err = blackholedMember.Start(ctx)
require.NoError(t, err)
_, err = blackholedMember.Logs().ExpectWithContext(ctx, expect.ExpectedResponse{Value: "Recovering from snapshot backend"})
assert.NoError(t, err)
}
1 change: 1 addition & 0 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
DropPeerNetwork,
RaftBeforeSaveSleep,
RaftAfterSaveSleep,
BeforeOpenSnapshotBackend,
}
)

Expand Down
1 change: 1 addition & 0 deletions tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackhole{waitTillSnapshot: true}, Follower}
BeforeOpenSnapshotBackend Failpoint = goPanicFailpoint{"beforeOpenSnapshotBackend", triggerBlackhole{waitTillSnapshot: true}, Follower}
BeforeApplyOneConfChangeSleep Failpoint = killAndGofailSleep{"beforeApplyOneConfChange", time.Second}
RaftBeforeSaveSleep Failpoint = gofailSleepAndDeactivate{"raftBeforeSave", time.Second}
RaftAfterSaveSleep Failpoint = gofailSleepAndDeactivate{"raftAfterSave", time.Second}
Expand Down
4 changes: 2 additions & 2 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type triggerBlackhole struct {
}

func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
return blackhole(ctx, t, member, clus, tb.waitTillSnapshot)
return Blackhole(ctx, t, member, clus, tb.waitTillSnapshot)
}

func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool {
Expand All @@ -61,7 +61,7 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces
return config.ClusterSize > 1 && process.PeerProxy() != nil
}

func blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
Expand Down

0 comments on commit ab24395

Please sign in to comment.