From ac295e53ad1e8f469edf7e4ffa405fd60065a0ff Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Tue, 27 Apr 2021 17:10:41 +0200 Subject: [PATCH] Integrate verification framework Verification framework is integrated with: - integration tests (by default) - `ETCD_VERIFY=all etcdctl snapshot restore` command - etcd shutdown when running with `ETCD_VERIFY=all` env. --- CHANGELOG-3.5.md | 3 +- etcdctl/snapshot/v3_snapshot.go | 7 +++- server/embed/etcd.go | 10 +++--- tests/integration/cluster.go | 33 ++++++++++++++----- tests/integration/member_test.go | 5 +++ tests/integration/snapshot/member_test.go | 3 -- .../integration/snapshot/v3_snapshot_test.go | 7 ++-- tests/integration/testing.go | 4 +++ 8 files changed, 51 insertions(+), 21 deletions(-) diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index 45db3a171e6b..1c12d02993d6 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
-## v3.5.0 (2021 TBD) +## v3.5.0 (2021-06) See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://etcd.io/docs/latest/upgrades/upgrade_3_5/) for any breaking changes. @@ -160,6 +160,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change. - Add [`--socket-reuse-address`](https://github.com/etcd-io/etcd/pull/12702) flag - Setting this flag enables `SO_REUSEADDR` which allows binding to an address in `TIME_WAIT` state, improving etcd restart time. - Reduce [around 30% memory allocation by logging range response size without marshal](https://github.com/etcd-io/etcd/pull/12871). +- `ETCD_VERIFY="all"` enviroment triggers [additional verification of consistency](https://github.com/etcd-io/etcd/pull/) of etcd data-dir files. ### Package `runtime` - Optimize [`runtime.FDUsage` by removing unnecessary sorting](https://github.com/etcd-io/etcd/pull/12214). diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 756f84eff4d8..3a45844eea6e 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -41,6 +41,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" "go.uber.org/zap" @@ -276,7 +277,11 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { zap.String("snap-dir", s.snapDir), ) - return nil + return verify.VerifyIfEnabled(verify.Config{ + ExactIndex: true, + Logger: s.lg, + DataDir: dataDir, + }) } func (s *v3Manager) outDbPath() string { diff --git a/server/embed/etcd.go b/server/embed/etcd.go index c53450cd8b10..7dcdbc56df4a 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -42,6 +42,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2v3" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/verify" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/soheilhy/cmux" @@ -338,6 +339,11 @@ func (e *Etcd) Close() { lg.Info("closing etcd server", fields...) defer func() { lg.Info("closed etcd server", fields...) + verify.MustVerifyIfEnabled(verify.Config{ + Logger: lg, + DataDir: e.cfg.Dir, + ExactIndex: false, + }) lg.Sync() }() @@ -513,7 +519,6 @@ func (e *Etcd) servePeers() (err error) { e.cfg.logger.Info( "cmux::serve", zap.String("address", u), - zap.String("cmuxp", fmt.Sprintf("%p", m)), ) return m.Serve() } @@ -524,16 +529,13 @@ func (e *Etcd) servePeers() (err error) { e.cfg.logger.Info( "stopping serving peer traffic", zap.String("address", u), - zap.String("cmuxp", fmt.Sprintf("%p", m)), ) stopServers(ctx, &servers{secure: peerTLScfg != nil, grpc: gs, http: srv}) e.cfg.logger.Info( "stopped serving peer traffic", zap.String("address", u), - zap.String("cmuxp", fmt.Sprintf("%p", m)), ) m.Close() - e.cfg.logger.Info("Closed", zap.String("cmuxp", fmt.Sprintf("%p", m))) return nil } } diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 1291b8b724ab..755d8520a669 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -52,6 +52,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock" lockpb "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/verify" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" @@ -583,6 +584,7 @@ type member struct { useIP bool isLearner bool + closed bool } func (m *member) GRPCAddr() string { return m.grpcAddr } @@ -704,13 +706,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.InitialCorruptCheck = true m.WarningApplyDuration = embed.DefaultWarningApplyDuration - level := zapcore.InfoLevel - if os.Getenv("CLUSTER_DEBUG") != "" { - level = zapcore.DebugLevel - } - - options := zaptest.WrapOptions(zap.Fields(zap.String("member", mcfg.name))) - m.Logger = zaptest.NewLogger(t, zaptest.Level(level), options).Named(mcfg.name) + m.Logger = memberLogger(t, mcfg.name) t.Cleanup(func() { // if we didn't cleanup the logger, the consecutive test // might reuse this (t). @@ -719,6 +715,16 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { return m } +func memberLogger(t testutil.TB, name string) *zap.Logger { + level := zapcore.InfoLevel + if os.Getenv("CLUSTER_DEBUG") != "" { + level = zapcore.DebugLevel + } + + options := zaptest.WrapOptions(zap.Fields(zap.String("member", name))) + return zaptest.NewLogger(t, zaptest.Level(level), options).Named(name) +} + // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain @@ -782,7 +788,7 @@ func NewClientV3(m *member) (*clientv3.Client, error) { // Clone returns a member with the same server configuration. The returned // member will not set PeerListeners and ClientListeners. -func (m *member) Clone(_ testutil.TB) *member { +func (m *member) Clone(t testutil.TB) *member { mm := &member{} mm.ServerConfig = m.ServerConfig @@ -809,6 +815,7 @@ func (m *member) Clone(_ testutil.TB) *member { mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo mm.ClientTLSInfo = m.ClientTLSInfo + mm.Logger = memberLogger(t, mm.Name+"c") return mm } @@ -1071,6 +1078,16 @@ func (m *member) Close() { for _, f := range m.serverClosers { f() } + if !m.closed { + // Avoid verification of the same file multiple times + // (that might not exist any longer) + verify.MustVerifyIfEnabled(verify.Config{ + Logger: m.Logger, + DataDir: m.DataDir, + ExactIndex: false, + }) + } + m.closed = true } // Stop stops the member, but the data dir of the member is preserved. diff --git a/tests/integration/member_test.go b/tests/integration/member_test.go index 62520bbe6625..5493924c9d27 100644 --- a/tests/integration/member_test.go +++ b/tests/integration/member_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v2" ) @@ -65,6 +66,7 @@ func TestRestartMember(t *testing.T) { } func TestLaunchDuplicateMemberShouldFail(t *testing.T) { + BeforeTest(t) size := 3 c := NewCluster(t, size) m := c.Members[0].Clone(t) @@ -78,6 +80,9 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { if err := m.Launch(); err == nil { t.Errorf("unexpect successful launch") + } else { + t.Logf("launch failed as expected: %v", err) + assert.Contains(t, err.Error(), "has already been bootstrapped") } } diff --git a/tests/integration/snapshot/member_test.go b/tests/integration/snapshot/member_test.go index 320c06c8dfc7..4ae13f395b68 100644 --- a/tests/integration/snapshot/member_test.go +++ b/tests/integration/snapshot/member_test.go @@ -17,7 +17,6 @@ package snapshot_test import ( "context" "fmt" - "os" "testing" "time" @@ -42,7 +41,6 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { defer func() { for i := 0; i < clusterN; i++ { - os.RemoveAll(srvs[i].Config().Dir) srvs[i].Close() } }() @@ -82,7 +80,6 @@ func TestSnapshotV3RestoreMultiMemberAdd(t *testing.T) { t.Fatal(err) } defer func() { - os.RemoveAll(cfg.Dir) srv.Close() }() select { diff --git a/tests/integration/snapshot/v3_snapshot_test.go b/tests/integration/snapshot/v3_snapshot_test.go index c0df2e48c139..4f6e557858b9 100644 --- a/tests/integration/snapshot/v3_snapshot_test.go +++ b/tests/integration/snapshot/v3_snapshot_test.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/etcdctl/v3/snapshot" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/tests/v3/integration" + "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" ) @@ -39,7 +40,6 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { integration.BeforeTest(t) kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} dbPath := createSnapshotFile(t, kvs) - defer os.RemoveAll(dbPath) clusterN := 1 urls := newEmbedURLs(clusterN * 2) @@ -73,7 +73,6 @@ func TestSnapshotV3RestoreSingle(t *testing.T) { t.Fatal(err) } defer func() { - os.RemoveAll(cfg.Dir) srv.Close() }() select { @@ -215,7 +214,6 @@ func createSnapshotFile(t *testing.T, kvs []kv) string { t.Fatal(err) } - os.RemoveAll(cfg.Dir) return dpPath } @@ -243,7 +241,8 @@ func restoreCluster(t *testing.T, clusterN int, dbPath string) ( cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} cfg.InitialCluster = ics - sp := snapshot.NewV3(zaptest.NewLogger(t)) + sp := snapshot.NewV3( + zaptest.NewLogger(t, zaptest.Level(zapcore.InfoLevel)).Named(cfg.Name).Named("sm")) if err := sp.Restore(snapshot.RestoreConfig{ SnapshotPath: dbPath, diff --git a/tests/integration/testing.go b/tests/integration/testing.go index df7b3dd1fe03..27d5730f33f3 100644 --- a/tests/integration/testing.go +++ b/tests/integration/testing.go @@ -22,6 +22,7 @@ import ( grpc_logsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/server/v3/verify" "go.uber.org/zap/zapcore" "go.uber.org/zap/zapgrpc" "go.uber.org/zap/zaptest" @@ -38,6 +39,9 @@ func BeforeTest(t testutil.TB) { grpc_logger.Set(zapgrpc.NewLogger(zaptest.NewLogger(t).Named("grpc"))) + // Integration tests should verify written state as much as possible. + os.Setenv(verify.ENV_VERIFY, verify.ENV_VERIFY_ALL_VALUE) + previousWD, err := os.Getwd() if err != nil { t.Fatal(err)