Skip to content

Commit

Permalink
Move metrics recorder from BuildOptions to ClientConn
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Jan 22, 2025
1 parent be12ee9 commit 1ff6728
Show file tree
Hide file tree
Showing 18 changed files with 71 additions and 70 deletions.
9 changes: 5 additions & 4 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ type ClientConn interface {
//
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string

// MetricsRecorder provides the metrics recorder that balancers can use to
// record metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this method.
MetricsRecorder() estats.MetricsRecorder
}

// BuildOptions contains additional information for Build.
Expand Down Expand Up @@ -198,10 +203,6 @@ type BuildOptions struct {
// same resolver.Target as passed to the resolver. See the documentation for
// the resolver.Target type for details about what it contains.
Target resolver.Target
// MetricsRecorder is the metrics recorder that balancers can use to record
// metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this field.
MetricsRecorder estats.MetricsRecorder
}

// Builder creates a balancer.
Expand Down
5 changes: 2 additions & 3 deletions balancer/pickfirst/pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand All @@ -56,7 +55,7 @@ func (s) TestPickFirst_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
bal.ResolverError(errors.New("resolution failed: test error"))

Expand Down Expand Up @@ -89,7 +88,7 @@ func (s) TestPickFirst_ResolverErrorinTF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()

// After sending a valid update, the LB policy should report CONNECTING.
Expand Down
5 changes: 2 additions & 3 deletions balancer/pickfirst/pickfirstleaf/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -80,7 +79,7 @@ func (s) TestPickFirstMetrics(t *testing.T) {
Addresses: []resolver.Address{{Addr: ss.Address}}},
)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
Expand Down Expand Up @@ -124,7 +123,7 @@ func (s) TestPickFirstMetricsFailure(t *testing.T) {
Addresses: []resolver.Address{{Addr: "bad address"}}},
)
grpcTarget := r.Scheme() + ":///"
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(grpcTarget, grpc.WithStatsHandler(tmr), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("NewClient() failed with error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion balancer/pickfirst/pickfirstleaf/pickfirstleaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
b := &pickfirstBalancer{
cc: cc,
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.
metricsRecorder: cc.MetricsRecorder(),

subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
Expand Down
13 changes: 6 additions & 7 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/pickfirst"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -885,7 +884,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -974,7 +973,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -1034,7 +1033,7 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
triggerTimer, timeAfter := mockTimer()
pfinternal.TimeAfterFunc = timeAfter

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dialer := testutils.NewBlockingDialer()
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, pickfirstleaf.Name)),
Expand Down Expand Up @@ -1099,7 +1098,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down Expand Up @@ -1145,7 +1144,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down Expand Up @@ -1189,7 +1188,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down
3 changes: 1 addition & 2 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -196,7 +195,7 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
Expand Down
4 changes: 2 additions & 2 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
lb.dataCache = newDataCache(maxCacheSize, lb.logger, cc.MetricsRecorder(), opts.Target.String())
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
Expand Down Expand Up @@ -539,7 +539,7 @@ func (b *rlsBalancer) sendNewPickerLocked() {
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
metricsRecorder: b.cc.MetricsRecorder(),
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{
Expand Down
14 changes: 7 additions & 7 deletions balancer/rls/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils"
)

var (
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {

func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -134,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {

func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(1, nil, &testutils.NoopMetricsRecorder{}, "")

// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {

func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {

func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand All @@ -221,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
}

initCacheEntries()
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
dc := newDataCache(5, nil, &testutils.NoopMetricsRecorder{}, "")
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (s) TestDataCache_Metrics(t *testing.T) {
{size: 4},
{size: 5},
}
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
dc := newDataCache(50, nil, tmr, "")

dc.updateRLSServerTarget("rls-server-target")
Expand Down
8 changes: 4 additions & 4 deletions balancer/rls/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
rlstest "google.golang.org/grpc/internal/testutils/rls"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s) Test_RLSDefaultTargetPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s) Test_RLSTargetPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
Expand Down Expand Up @@ -350,7 +350,7 @@ func (s) Test_RLSFailedPicksMetric(t *testing.T) {
// Register a manual resolver and push the RLS service config through it.
r := startManualResolverWithConfig(t, rlsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
// Dial the backend.
cc, err := grpc.NewClient(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithStatsHandler(tmr))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b := &wrrBalancer{
ClientConn: cc,
target: bOpts.Target.String(),
metricsRecorder: bOpts.MetricsRecorder,
metricsRecorder: cc.MetricsRecorder(),
addressWeights: resolver.NewAddressMap(),
endpointToWeight: resolver.NewEndpointMap(),
scToWeight: make(map[balancer.SubConn]*endpointWeight),
Expand Down
4 changes: 2 additions & 2 deletions balancer/weightedroundrobin/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s) TestWRRMetricsBasic(t *testing.T) {
srv := startServer(t, reportCall)
sc := svcConfig(t, testMetricsConfig)

tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc), grpc.WithStatsHandler(tmr)); err != nil {
t.Fatalf("Error starting client: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions balancer/weightedroundrobin/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"google.golang.org/grpc/internal/grpctest"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/internal/testutils"
)

type s struct {
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
wsc := &endpointWeight{
metricsRecorder: tmr,
weightVal: 3,
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s) TestWRR_Metrics_SubConnWeight(t *testing.T) {
// with no weights. Both of these should emit a count metric for round robin
// fallback.
func (s) TestWRR_Metrics_Scheduler_RR_Fallback(t *testing.T) {
tmr := stats.NewTestMetricsRecorder()
tmr := testutils.NewTestMetricsRecorder()
ew := &endpointWeight{
metricsRecorder: tmr,
weightVal: 0,
Expand Down
6 changes: 5 additions & 1 deletion balancer_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
Expand Down Expand Up @@ -92,7 +93,6 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
Expand All @@ -101,6 +101,10 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
return ccb
}

func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
return ccb.cc.metricsRecorderList
}

// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
Expand Down
2 changes: 0 additions & 2 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -604,7 +603,6 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.BuildOptions.MetricsRecorder = &stats.NoopMetricsRecorder{}
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {
Expand Down
Loading

0 comments on commit 1ff6728

Please sign in to comment.