From 4f5c6a8d65e700eec00a739c70b131b3f5810118 Mon Sep 17 00:00:00 2001 From: Upamanyu Sharma Date: Thu, 5 Dec 2024 14:42:03 -0500 Subject: [PATCH 1/3] tests/integration: add failing lease expiration test for `leasing` The new test checks if `leasingKV.Get()` correctly checks for lease expiration by checking if it (incorrectly) returns stale values when network partitioned from the rest of the system. Signed-off-by: Upamanyu Sharma --- .../clientv3/lease/leasing_test.go | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index 22c98f9ed05..8bcbfbbe07e 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -416,6 +416,80 @@ func TestLeasingConcurrentPut(t *testing.T) { } } +func TestLeasingGetChecksForExpiration(t *testing.T) { + integration2.BeforeTest(t) + + ttl := 6 + + // There's no way to partition a client from the server, so use multiple + // servers and shut down a single server to partition the client from the + // system. + clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, UseBridge: true}) + defer clus.Terminate(t) + + // Try to make sure server 0 is not the leader + if clus.Members[0].Server.Leader() == clus.Members[0].Server.MemberID() { + err := clus.Members[0].Server.MoveLeader(context.TODO(), + clus.Members[0].Server.Lead(), + uint64(clus.Members[1].Server.MemberID())) + if err != nil { + panic(err) + } + } + time.Sleep(2 * time.Second) + + leaderID := clus.Members[0].Server.Leader() + if leaderID == clus.Members[0].Server.MemberID() { + panic("test wants 0 to not be leader") + } + + // This client will get partitioned away from the system (by killing the + // node it's connected to). + lkv0, closeLKV0, err := leasing.NewKV(clus.Client(0), "pfx/", concurrency.WithTTL(ttl)) + require.NoError(t, err) + defer closeLKV0() + + lkv1, closeLKV1, err := leasing.NewKV(clus.Client(1), "pfx/") + require.NoError(t, err) + defer closeLKV1() + + if _, err = lkv0.Put(context.TODO(), "k", "abc"); err != nil { + t.Fatal(err) + } + if _, err = lkv0.Get(context.TODO(), "k"); err != nil { + t.Fatal(err) + } + + time.Sleep(6500 * time.Millisecond) + clus.Members[0].Stop(t) + + if _, err = lkv1.Put(context.TODO(), "k", "def"); err != nil { + t.Fatal(err) + } + + resp, err := lkv1.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(resp.Kvs) != 1 || string(resp.Kvs[0].Value) != "def" { + t.Fatalf(`expected "k"->"def" from lkv1, got response %+v`, resp) + } + + go func() { + // Eventually bring back the server so the disconnected client can + // finish its last `Get()`. + time.Sleep(1 * time.Second) + clus.Members[0].Restart(t) + }() + cachedResp, err := lkv0.Get(context.TODO(), "k") + if err != nil { + t.Fatal(err) + } + if len(cachedResp.Kvs) != 1 || string(cachedResp.Kvs[0].Value) != "def" { + t.Fatalf(`expected "k"->"def", got response %+v`, cachedResp) + } +} + func TestLeasingDisconnectedGet(t *testing.T) { integration2.BeforeTest(t) clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, UseBridge: true}) From d065a4a9eeff4343bb02546c3d09fd9318c8f670 Mon Sep 17 00:00:00 2001 From: Upamanyu Sharma Date: Tue, 10 Dec 2024 14:03:14 -0500 Subject: [PATCH 2/3] etcdclient: add a way to check if a lease is unexpired. This helps fix the failing test TestLeasingGetChecksForExpiration. Previously, the `leasing` library relied on *not* receiving something over the `session.Done()` channel in `readySession()`. Failing to immediately receive over the channel does not guarantee that the lease is actually still valid. Signed-off-by: Upamanyu Sharma --- client/v3/concurrency/session.go | 6 ++++++ client/v3/lease.go | 14 ++++++++++++++ client/v3/leasing/kv.go | 7 +------ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 2275e96c972..209d0241916 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -154,3 +154,9 @@ func WithContext(ctx context.Context) SessionOption { so.ctx = ctx } } + +// Unexpired returns true iff the session was unexpired at some point during the +// Unexpired() call. +func (s *Session) Unexpired() bool { + return s.client.Unexpired(s.id) +} diff --git a/client/v3/lease.go b/client/v3/lease.go index 11b58348286..739dd161eaf 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -136,6 +136,10 @@ type Lease interface { // (see https://github.com/etcd-io/etcd/pull/7866) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) + // Unexpired returns true iff the lease is unexpired (more precisely: iff the + // lease was unexpired during the execution of the Unexpired() call). + Unexpired(id LeaseID) bool + // KeepAliveOnce renews the lease once. The response corresponds to the // first message from calling KeepAlive. If the response has a recoverable // error, KeepAliveOnce will retry the RPC with a new keep alive message. @@ -624,3 +628,13 @@ func (ka *keepAlive) close() { close(ch) } } + +func (l *lessor) Unexpired(id LeaseID) bool { + l.mu.Lock() + defer l.mu.Unlock() + ka, ok := l.keepAlives[id] + if !ok { + return false + } + return ka.deadline.After(time.Now()) +} diff --git a/client/v3/leasing/kv.go b/client/v3/leasing/kv.go index c14af78d629..d045f12ab3f 100644 --- a/client/v3/leasing/kv.go +++ b/client/v3/leasing/kv.go @@ -466,12 +466,7 @@ func (lkv *leasingKV) readySession() bool { if lkv.session == nil { return false } - select { - case <-lkv.session.Done(): - default: - return true - } - return false + return lkv.session.Unexpired() } func (lkv *leasingKV) leaseID() v3.LeaseID { From 9dda756c3711ca882def51e920107275f69ec3b8 Mon Sep 17 00:00:00 2001 From: Upamanyu Sharma Date: Mon, 23 Dec 2024 14:58:23 -0500 Subject: [PATCH 3/3] etcdclient: change Unexpired to Expired for clarity Signed-off-by: Upamanyu Sharma --- client/v3/concurrency/session.go | 7 +++---- client/v3/lease.go | 12 ++++++------ client/v3/leasing/kv.go | 2 +- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/client/v3/concurrency/session.go b/client/v3/concurrency/session.go index 209d0241916..4d19f378997 100644 --- a/client/v3/concurrency/session.go +++ b/client/v3/concurrency/session.go @@ -155,8 +155,7 @@ func WithContext(ctx context.Context) SessionOption { } } -// Unexpired returns true iff the session was unexpired at some point during the -// Unexpired() call. -func (s *Session) Unexpired() bool { - return s.client.Unexpired(s.id) +// Expired returns true iff the session is expired. +func (s *Session) Expired() bool { + return s.client.Expired(s.id) } diff --git a/client/v3/lease.go b/client/v3/lease.go index 739dd161eaf..543fa7c50a0 100644 --- a/client/v3/lease.go +++ b/client/v3/lease.go @@ -136,9 +136,9 @@ type Lease interface { // (see https://github.com/etcd-io/etcd/pull/7866) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) - // Unexpired returns true iff the lease is unexpired (more precisely: iff the - // lease was unexpired during the execution of the Unexpired() call). - Unexpired(id LeaseID) bool + // Expired returns true iff the lease is expired (more precisely: iff the + // lease was expired during the execution of the Expired() call). + Expired(id LeaseID) bool // KeepAliveOnce renews the lease once. The response corresponds to the // first message from calling KeepAlive. If the response has a recoverable @@ -629,12 +629,12 @@ func (ka *keepAlive) close() { } } -func (l *lessor) Unexpired(id LeaseID) bool { +func (l *lessor) Expired(id LeaseID) bool { l.mu.Lock() defer l.mu.Unlock() ka, ok := l.keepAlives[id] if !ok { - return false + return true } - return ka.deadline.After(time.Now()) + return ka.deadline.Before(time.Now()) } diff --git a/client/v3/leasing/kv.go b/client/v3/leasing/kv.go index d045f12ab3f..5fe899ab2c8 100644 --- a/client/v3/leasing/kv.go +++ b/client/v3/leasing/kv.go @@ -466,7 +466,7 @@ func (lkv *leasingKV) readySession() bool { if lkv.session == nil { return false } - return lkv.session.Unexpired() + return !lkv.session.Expired() } func (lkv *leasingKV) leaseID() v3.LeaseID {