Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Jan 24, 2025
1 parent ba455de commit a4dd67f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 18 deletions.
12 changes: 7 additions & 5 deletions balancer/lazy/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ func (lb *lazyBalancer) ResolverError(err error) {
func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if childLBCfg, ok := ccs.BalancerConfig.(lbCfg); !ok {
childLBCfg, ok := ccs.BalancerConfig.(lbCfg)
if !ok {
lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig)
ccs.BalancerConfig = nil
} else {
ccs.BalancerConfig = childLBCfg.childLBCfg
return balancer.ErrBadResolverState
}
ccs.BalancerConfig = childLBCfg.childLBCfg
if lb.delegate != nil {
return lb.delegate.UpdateClientConnState(ccs)
}
Expand Down Expand Up @@ -198,6 +198,8 @@ type idlePicker struct {
}

func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.exitIdle()
// Call exitIdle in a new goroutine to avoid deadlocks while calling back
// into the channel synchronously.
go i.exitIdle()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
119 changes: 106 additions & 13 deletions balancer/lazy/lazy_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"

Expand Down Expand Up @@ -64,15 +65,25 @@ func Test(t *testing.T) {
// calls Connect() on the channel which in turn calls ExitIdle on the lazy
// balancer. The test verifies that the channel enters READY.
func (s) TestExitIdle(t *testing.T) {
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend1.Address}}},
},
})

json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s": %s}]}`, lazy.Name, lazy.LazyPickfirstConfig)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(json),
grpc.WithResolvers(mr),
}
cc, err := grpc.NewClient(backend.Address, opts...)
cc, err := grpc.NewClient(mr.Scheme()+":///", opts...)
if err != nil {
t.Fatalf("grpc.NewClient(_) failed: %v", err)

Expand All @@ -83,6 +94,26 @@ func (s) TestExitIdle(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Send a resolver update to verify that the resolver state is correctly
// passed through to the leaf pickfirst balancer.
backend2 := stubserver.StartTestService(t, nil)
defer backend2.Stop()

mr.UpdateState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend2.Address}}},
},
})

var peer peer.Peer
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
t.Errorf("client.EmptyCall() returned unexpected error: %v", err)
}
if got, want := peer.Addr.String(), backend2.Address; got != want {
t.Errorf("EmptyCall() went to unexpected backend: got %q, want %q", got, want)
}
}

// TestPicker creates a lazy balancer under a stub balancer which block all
Expand Down Expand Up @@ -156,14 +187,14 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
resolverStateReceived := false
resolverErrorReceived := false
resolverErrorReceivedByChild := false

childBF := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(pickfirstleaf.Name).Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
if resolverErrorReceived {
if resolverErrorReceivedByChild {
t.Error("Received resolver error before resolver state.")
}
resolverStateReceived = true
Expand All @@ -173,7 +204,7 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
if !resolverStateReceived {
t.Error("Received resolver error before resolver state.")
}
resolverErrorReceived = true
resolverErrorReceivedByChild = true
bd.Data.(balancer.Balancer).ResolverError(err)
},
Close: func(bd *stub.BalancerData) {
Expand All @@ -196,6 +227,9 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
ExitIdle: func(bd *stub.BalancerData) {
t.Log("Ignoring call to ExitIdle to delay lazy child creation till RPC time.")
},
ResolverError: func(bd *stub.BalancerData, err error) {
bd.Data.(balancer.Balancer).ResolverError(err)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: lazyCfg,
Expand Down Expand Up @@ -250,25 +284,27 @@ func (s) TestGoodUpdateThenResolverError(t *testing.T) {
if !resolverStateReceived {
t.Fatalf("Child balancer did not receive resolver state.")
}

if !resolverErrorReceivedByChild {
t.Fatalf("Child balancer did not receive error.")
}
}

// Tests the scenario when a resolver produces an error followed by a resolver
// error. The test verifies that the child balancer receives only the good
// update.
// Tests the scenario when a resolver produces a list of endpoints followed by
// a resolver error. The test verifies that the child balancer receives only the
// good update.
func (s) TestResolverErrorThenGoodUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

backend := stubserver.StartTestService(t, nil)
defer backend.Stop()
resolverStateReceived := false

childBF := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = balancer.Get(pickfirstleaf.Name).Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
resolverStateReceived = true
return bd.Data.(balancer.Balancer).UpdateClientConnState(ccs)
},
ResolverError: func(bd *stub.BalancerData, err error) {
Expand Down Expand Up @@ -348,12 +384,69 @@ func (s) TestResolverErrorThenGoodUpdate(t *testing.T) {
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle)

// An RPC would succeed only if the leaf pickfirst receives the endpoint
// list.
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Errorf("client.EmptyCall() returned unexpected error: %v", err)
}
}

if !resolverStateReceived {
t.Fatalf("Child balancer did not receive resolver state.")
// Tests that ExitIdle calls are correctly passed through to the child balancer.
// It starts a backend and ensures the channel connects to it. The test then
// stops the backend, making the channel enter IDLE. The test calls Connect on
// the channel and verifies that the child balancer exits idle.
func (s) TestExitIdlePassthrough(t *testing.T) {
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend1.Address}}},
},
})

json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s": %s}]}`, lazy.Name, lazy.LazyPickfirstConfig)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(json),
grpc.WithResolvers(mr),
}
cc, err := grpc.NewClient(mr.Scheme()+":///", opts...)
if err != nil {
t.Fatalf("grpc.NewClient(_) failed: %v", err)

}
defer cc.Close()

cc.Connect()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Stopping the active backend should put the channel in IDLE.
backend1.Stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Sending a new backend address should not kick the channel out of IDLE.
// On calling cc.Connect(), the channel should call ExitIdle on the lazy
// balancer which passes through the call to the leaf pickfirst.
backend2 := stubserver.StartTestService(t, nil)
defer backend2.Stop()

mr.UpdateState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend2.Address}}},
},
})

shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer shortCancel()
testutils.AwaitNoStateChange(shortCtx, t, cc, connectivity.Idle)

cc.Connect()
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
}

0 comments on commit a4dd67f

Please sign in to comment.