From 6ed9d7a71607e2f98978392c81905aff9a370b91 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sat, 1 Jun 2024 19:13:42 +0200 Subject: [PATCH] Query: make endpoint heartbeat interval configurable * make endpoint heartbat interval configurable * random cleanups of endpoint set Signed-off-by: Michael Hoffmann --- CHANGELOG.md | 1 + cmd/thanos/query.go | 8 +- cmd/thanos/rule.go | 1 + pkg/query/endpointset.go | 243 +++++++++++++++++----------------- pkg/query/endpointset_test.go | 44 +++--- 5 files changed, 154 insertions(+), 143 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3edd24e4b..d4c6659876 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration. - [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs. - [#7361](https://github.com/thanos-io/thanos/pull/7361) Query: *breaking :warning:* pass query stats from remote execution from server to client. We changed the protobuf of the QueryAPI, if you use `query.mode=distributed` you need to update your client (upper level Queriers) first, before updating leaf Queriers (servers). +- [#7407](https://github.com/thanos-io/thanos/pull/7407) Query: make endpoint heartbeat interval configurable ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 41e1348b6e..1546bc1dd4 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -180,6 +180,7 @@ func registerQuery(app *extkingpin.App) { unhealthyStoreTimeout := extkingpin.ModelDuration(cmd.Flag("store.unhealthy-timeout", "Timeout before an unhealthy store is cleaned from the store UI page.").Default("5m")) endpointInfoTimeout := extkingpin.ModelDuration(cmd.Flag("endpoint.info-timeout", "Timeout of gRPC Info requests.").Default("5s").Hidden()) + endpointInfoInterval := extkingpin.ModelDuration(cmd.Flag("endpoint.info-interval", "Interval to refresh gRPC Info requests.").Default("5s").Hidden()) enableAutodownsampling := cmd.Flag("query.auto-downsampling", "Enable automatic adjustment (step / 5) to what source of data should be used in store gateways if no max_source_resolution param is specified."). Default("false").Bool() @@ -347,6 +348,7 @@ func registerQuery(app *extkingpin.App) { *dnsSDResolver, time.Duration(*unhealthyStoreTimeout), time.Duration(*endpointInfoTimeout), + time.Duration(*endpointInfoInterval), time.Duration(*instantDefaultMaxSourceResolution), *defaultMetadataTimeRange, *strictStores, @@ -429,6 +431,7 @@ func runQuery( dnsSDResolver string, unhealthyStoreTimeout time.Duration, endpointInfoTimeout time.Duration, + endpointInfoInterval time.Duration, instantDefaultMaxSourceResolution time.Duration, defaultMetadataTimeRange time.Duration, strictStores []string, @@ -548,6 +551,7 @@ func runQuery( dialOpts, unhealthyStoreTimeout, endpointInfoTimeout, + endpointInfoInterval, queryConnMetricLabels..., ) @@ -863,10 +867,10 @@ func prepareEndpointSet( dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, endpointInfoTimeout time.Duration, + endpointInfoInterval time.Duration, queryConnMetricLabels ...string, ) *query.EndpointSet { endpointSet := query.NewEndpointSet( - time.Now, logger, reg, func() (specs []*query.GRPCEndpointSpec) { @@ -913,7 +917,7 @@ func prepareEndpointSet( { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return runutil.Repeat(5*time.Second, ctx.Done(), func() error { + return runutil.Repeat(endpointInfoInterval, ctx.Done(), func() error { endpointSet.Update(ctx) return nil }) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 3cff43ca7c..9e551160a5 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -440,6 +440,7 @@ func runRule( dialOpts, 5*time.Minute, 5*time.Second, + 5*time.Second, ) // Periodically update the GRPC addresses from query config by resolving them using DNS SD if necessary. diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index a9c0373d9e..a4be657cda 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -65,107 +65,9 @@ func NewGRPCEndpointSpec(addr string, isStrictStatic bool, dialOpts ...grpc.Dial } } -func (es *GRPCEndpointSpec) Addr() string { +func (spec *GRPCEndpointSpec) Addr() string { // API address should not change between state changes. - return es.addr -} - -// Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after -// that time, we assume that the host is unhealthy and return error. -func (es *endpointRef) Metadata(ctx context.Context, infoClient infopb.InfoClient, storeClient storepb.StoreClient) (*endpointMetadata, error) { - if infoClient != nil { - resp, err := infoClient.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true)) - if err != nil { - if status.Convert(err).Code() != codes.Unimplemented { - return nil, err - } - } else { - return &endpointMetadata{resp}, nil - } - } - - // Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI. - if storeClient != nil { - metadata, err := es.getMetadataUsingStoreAPI(ctx, storeClient) - if err != nil { - return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr) - } - return metadata, nil - } - - return nil, errors.New(noMetadataEndpointMessage) -} - -func (es *endpointRef) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) { - resp, err := client.Info(ctx, &storepb.InfoRequest{}) - if err != nil { - return nil, err - } - - infoResp := fillExpectedAPIs(component.FromProto(resp.StoreType), resp.MinTime, resp.MaxTime) - infoResp.LabelSets = resp.LabelSets - infoResp.ComponentType = component.FromProto(resp.StoreType).String() - - return &endpointMetadata{ - &infoResp, - }, nil -} - -func fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse { - switch componentType { - case component.Sidecar: - return infopb.InfoResponse{ - Store: &infopb.StoreInfo{ - MinTime: mintime, - MaxTime: maxTime, - }, - Rules: &infopb.RulesInfo{}, - Targets: &infopb.TargetsInfo{}, - MetricMetadata: &infopb.MetricMetadataInfo{}, - Exemplars: &infopb.ExemplarsInfo{}, - } - case component.Query: - { - return infopb.InfoResponse{ - Store: &infopb.StoreInfo{ - MinTime: mintime, - MaxTime: maxTime, - }, - Rules: &infopb.RulesInfo{}, - Targets: &infopb.TargetsInfo{}, - MetricMetadata: &infopb.MetricMetadataInfo{}, - Exemplars: &infopb.ExemplarsInfo{}, - Query: &infopb.QueryAPIInfo{}, - } - } - case component.Receive: - { - return infopb.InfoResponse{ - Store: &infopb.StoreInfo{ - MinTime: mintime, - MaxTime: maxTime, - }, - Exemplars: &infopb.ExemplarsInfo{}, - } - } - case component.Store: - return infopb.InfoResponse{ - Store: &infopb.StoreInfo{ - MinTime: mintime, - MaxTime: maxTime, - }, - } - case component.Rule: - return infopb.InfoResponse{ - Store: &infopb.StoreInfo{ - MinTime: mintime, - MaxTime: maxTime, - }, - Rules: &infopb.RulesInfo{}, - } - default: - return infopb.InfoResponse{} - } + return spec.addr } // stringError forces the error to be a string @@ -317,7 +219,6 @@ type nowFunc func() time.Time // NewEndpointSet returns a new set of Thanos APIs. func NewEndpointSet( - now nowFunc, logger log.Logger, reg prometheus.Registerer, endpointSpecs func() []*GRPCEndpointSpec, @@ -340,7 +241,7 @@ func NewEndpointSet( } return &EndpointSet{ - now: now, + now: time.Now, logger: log.With(logger, "component", "endpointset"), endpointsMetric: endpointsMetric, @@ -377,17 +278,17 @@ func (e *EndpointSet) Update(ctx context.Context) { for _, spec := range e.endpointSpec() { spec := spec - if er, existingRef := e.endpoints[spec.Addr()]; existingRef { + if existingRef, ok := e.endpoints[spec.Addr()]; ok { wg.Add(1) go func(spec *GRPCEndpointSpec) { defer wg.Done() ctx, cancel := context.WithTimeout(ctx, e.endpointInfoTimeout) defer cancel() - e.updateEndpoint(ctx, spec, er) + existingRef.update(ctx) mu.Lock() defer mu.Unlock() - existingRefs[spec.Addr()] = er + existingRefs[spec.Addr()] = existingRef }(spec) continue @@ -404,8 +305,8 @@ func (e *EndpointSet) Update(ctx context.Context) { level.Warn(e.logger).Log("msg", "new endpoint creation failed", "err", err, "address", spec.Addr()) return } + newRef.update(ctx) - e.updateEndpoint(ctx, spec, newRef) if !newRef.isQueryable() { newRef.Close() return @@ -466,14 +367,6 @@ func (e *EndpointSet) Update(ctx context.Context) { e.endpointsMetric.Update(stats) } -func (e *EndpointSet) updateEndpoint(ctx context.Context, spec *GRPCEndpointSpec, er *endpointRef) { - metadata, err := er.Metadata(ctx, infopb.NewInfoClient(er.cc), storepb.NewStoreClient(er.cc)) - if err != nil { - level.Warn(e.logger).Log("msg", "update of endpoint failed", "err", errors.Wrap(err, "getting metadata"), "address", spec.Addr()) - } - er.update(e.now, metadata, err) -} - // getTimedOutRefs returns unhealthy endpoints for which the last // successful health check is older than the unhealthyEndpointTimeout. // Strict endpoints are never considered as timed out. @@ -642,6 +535,8 @@ func (e *EndpointSet) GetEndpointStatus() []EndpointStatus { type endpointRef struct { storepb.StoreClient + now nowFunc + mtx sync.RWMutex cc *grpc.ClientConn addr string @@ -672,6 +567,7 @@ func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec } return &endpointRef{ + now: e.now, logger: e.logger, created: e.now(), addr: spec.Addr(), @@ -680,24 +576,127 @@ func (e *EndpointSet) newEndpointRef(ctx context.Context, spec *GRPCEndpointSpec }, nil } +// Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after +// that time, we assume that the host is unhealthy and return error. +func (er *endpointRef) getMetadata(ctx context.Context, infoClient infopb.InfoClient, storeClient storepb.StoreClient) (*endpointMetadata, error) { + if infoClient != nil { + resp, err := infoClient.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true)) + if err != nil { + if status.Convert(err).Code() != codes.Unimplemented { + return nil, err + } + } else { + return &endpointMetadata{resp}, nil + } + } + + // Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI. + if storeClient != nil { + metadata, err := er.getMetadataUsingStoreAPI(ctx, storeClient) + if err != nil { + return nil, errors.Wrapf(err, "fallback fetching info from %s", er.addr) + } + return metadata, nil + } + + return nil, errors.New(noMetadataEndpointMessage) +} + +func (er *endpointRef) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) { + resp, err := client.Info(ctx, &storepb.InfoRequest{}) + if err != nil { + return nil, err + } + + infoResp := fillExpectedAPIs(component.FromProto(resp.StoreType), resp.MinTime, resp.MaxTime) + infoResp.LabelSets = resp.LabelSets + infoResp.ComponentType = component.FromProto(resp.StoreType).String() + + return &endpointMetadata{ + &infoResp, + }, nil +} + +func fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse { + switch componentType { + case component.Sidecar: + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + Rules: &infopb.RulesInfo{}, + Targets: &infopb.TargetsInfo{}, + MetricMetadata: &infopb.MetricMetadataInfo{}, + Exemplars: &infopb.ExemplarsInfo{}, + } + case component.Query: + { + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + Rules: &infopb.RulesInfo{}, + Targets: &infopb.TargetsInfo{}, + MetricMetadata: &infopb.MetricMetadataInfo{}, + Exemplars: &infopb.ExemplarsInfo{}, + Query: &infopb.QueryAPIInfo{}, + } + } + case component.Receive: + { + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + Exemplars: &infopb.ExemplarsInfo{}, + } + } + case component.Store: + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + } + case component.Rule: + return infopb.InfoResponse{ + Store: &infopb.StoreInfo{ + MinTime: mintime, + MaxTime: maxTime, + }, + Rules: &infopb.RulesInfo{}, + } + default: + return infopb.InfoResponse{} + } +} + // update sets the metadata and status of the endpoint ref based on the info response value and error. -func (er *endpointRef) update(now nowFunc, metadata *endpointMetadata, err error) { +func (er *endpointRef) update(ctx context.Context) { + metadata, err := er.getMetadata(ctx, infopb.NewInfoClient(er.cc), storepb.NewStoreClient(er.cc)) + if err != nil { + level.Warn(er.logger).Log("msg", "update of endpoint failed", "err", errors.Wrap(err, "getting metadata"), "address", er.addr) + } + er.mtx.Lock() defer er.mtx.Unlock() er.updateMetadata(metadata, err) - er.updateStatus(now, err) + er.updateStatus(err) } // updateStatus updates the endpointRef status based on the info call error. -func (er *endpointRef) updateStatus(now nowFunc, err error) { +func (er *endpointRef) updateStatus(err error) { mint, maxt := er.timeRange() if er.status == nil { er.status = &EndpointStatus{Name: er.addr} } if err == nil { - er.status.LastCheck = now() + er.status.LastCheck = er.now() er.status.LabelSets = er.labelSets() er.status.ComponentType = er.componentType() er.status.MinTime = mint @@ -884,19 +883,19 @@ func (er *endpointRef) apisPresent() []string { var apisPresent []string if er.HasStoreAPI() { - apisPresent = append(apisPresent, "storeEndpoints") + apisPresent = append(apisPresent, "StoreEndpoints") } if er.HasRulesAPI() { - apisPresent = append(apisPresent, "rulesAPI") + apisPresent = append(apisPresent, "RulesAPI") } if er.HasExemplarsAPI() { - apisPresent = append(apisPresent, "exemplarsAPI") + apisPresent = append(apisPresent, "ExemplarsAPI") } if er.HasTargetsAPI() { - apisPresent = append(apisPresent, "targetsAPI") + apisPresent = append(apisPresent, "TargetsAPI") } if er.HasMetricMetadataAPI() { diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index f9955d3412..18193dba42 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -431,7 +431,7 @@ func TestEndpointSetUpdate(t *testing.T) { discoveredEndpointAddr := endpoints.EndpointAddresses() // Specify only "store_type" to exclude "external_labels". - endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict, time.Now, tc.connLabels...) + endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict, tc.connLabels...) defer endpointSet.Close() endpointSet.Update(context.Background()) @@ -460,7 +460,7 @@ func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) { discoveredEndpointAddr := endpoints.EndpointAddresses() discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0]) - endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now) + endpointSet := makeEndpointSet(discoveredEndpointAddr, false) defer endpointSet.Close() endpointSet.Update(context.Background()) @@ -482,7 +482,7 @@ func TestEndpointSetUpdate_EndpointGoingAway(t *testing.T) { defer endpoints.Close() discoveredEndpointAddr := endpoints.EndpointAddresses() - endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now) + endpointSet := makeEndpointSet(discoveredEndpointAddr, false) defer endpointSet.Close() // Initial update. @@ -510,7 +510,7 @@ func TestEndpointSetUpdate_EndpointComingOnline(t *testing.T) { defer endpoints.Close() discoveredEndpointAddr := endpoints.EndpointAddresses() - endpointSet := makeEndpointSet(discoveredEndpointAddr, false, time.Now) + endpointSet := makeEndpointSet(discoveredEndpointAddr, false) defer endpointSet.Close() // Initial update. @@ -542,7 +542,7 @@ func TestEndpointSetUpdate_StrictEndpointMetadata(t *testing.T) { defer endpoints.Close() discoveredEndpointAddr := endpoints.EndpointAddresses() - endpointSet := makeEndpointSet(discoveredEndpointAddr, true, time.Now) + endpointSet := makeEndpointSet(discoveredEndpointAddr, true) defer endpointSet.Close() addr := discoveredEndpointAddr[0] @@ -613,7 +613,8 @@ func TestEndpointSetUpdate_PruneInactiveEndpoints(t *testing.T) { updateTime := time.Now() discoveredEndpointAddr := endpoints.EndpointAddresses() - endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict, func() time.Time { return updateTime }) + endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict) + endpointSet.now = func() time.Time { return updateTime } defer endpointSet.Close() endpointSet.Update(context.Background()) @@ -644,7 +645,8 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) { updateTime := time.Now() discoveredEndpointAddr := endpoints.EndpointAddresses() - endpointSet := makeEndpointSet(discoveredEndpointAddr, false, func() time.Time { return updateTime }) + endpointSet := makeEndpointSet(discoveredEndpointAddr, false) + endpointSet.now = func() time.Time { return updateTime } endpointSet.endpointInfoTimeout = 3 * time.Second defer endpointSet.Close() @@ -724,10 +726,9 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { discoveredEndpointAddr := endpoints.EndpointAddresses() now := time.Now() - nowFunc := func() time.Time { return now } // Testing if duplicates can cause weird results. discoveredEndpointAddr = append(discoveredEndpointAddr, discoveredEndpointAddr[0]) - endpointSet := NewEndpointSet(nowFunc, nil, nil, + endpointSet := NewEndpointSet(nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { specs = append(specs, NewGRPCEndpointSpec(addr, false)) @@ -735,6 +736,7 @@ func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) { return specs }, testGRPCOpts, time.Minute, 2*time.Second) + endpointSet.now = func() time.Time { return now } defer endpointSet.Close() // Initial update. @@ -1108,7 +1110,7 @@ func TestEndpointSet_Update_NoneAvailable(t *testing.T) { endpoints.CloseOne(initialEndpointAddr[0]) endpoints.CloseOne(initialEndpointAddr[1]) - endpointSet := NewEndpointSet(time.Now, nil, nil, + endpointSet := NewEndpointSet(nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range initialEndpointAddr { specs = append(specs, NewGRPCEndpointSpec(addr, false)) @@ -1218,7 +1220,7 @@ func TestEndpoint_Update_QuerierStrict(t *testing.T) { staticEndpointAddr := discoveredEndpointAddr[0] slowStaticEndpointAddr := discoveredEndpointAddr[2] - endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { + endpointSet := NewEndpointSet(nil, nil, func() (specs []*GRPCEndpointSpec) { return []*GRPCEndpointSpec{ NewGRPCEndpointSpec(discoveredEndpointAddr[0], true), NewGRPCEndpointSpec(discoveredEndpointAddr[1], false), @@ -1395,7 +1397,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { t.Run(tc.name, func(t *testing.T) { currentState := 0 - endpointSet := NewEndpointSet(time.Now, nil, nil, + endpointSet := NewEndpointSet(nil, nil, func() []*GRPCEndpointSpec { if tc.states[currentState].endpointSpec == nil { return nil @@ -1545,13 +1547,13 @@ func TestUpdateEndpointStateLastError(t *testing.T) { for _, tc := range tcs { mockEndpointRef := &endpointRef{ + now: time.Now, addr: "mockedStore", metadata: &endpointMetadata{ &infopb.InfoResponse{}, }, } - - mockEndpointRef.update(time.Now, mockEndpointRef.metadata, tc.InputError) + mockEndpointRef.updateStatus(tc.InputError) b, err := json.Marshal(mockEndpointRef.status.LastError) testutil.Ok(t, err) @@ -1561,28 +1563,29 @@ func TestUpdateEndpointStateLastError(t *testing.T) { func TestUpdateEndpointStateForgetsPreviousErrors(t *testing.T) { mockEndpointRef := &endpointRef{ + now: time.Now, addr: "mockedStore", metadata: &endpointMetadata{ &infopb.InfoResponse{}, }, } - mockEndpointRef.update(time.Now, mockEndpointRef.metadata, errors.New("test err")) + mockEndpointRef.updateStatus(errors.New("test err")) b, err := json.Marshal(mockEndpointRef.status.LastError) testutil.Ok(t, err) testutil.Equals(t, `"test err"`, string(b)) // updating status without and error should clear the previous one. - mockEndpointRef.update(time.Now, mockEndpointRef.metadata, nil) + mockEndpointRef.updateStatus(nil) b, err = json.Marshal(mockEndpointRef.status.LastError) testutil.Ok(t, err) testutil.Equals(t, `null`, string(b)) } -func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, metricLabels ...string) *EndpointSet { - endpointSet := NewEndpointSet(now, nil, nil, +func makeEndpointSet(discoveredEndpointAddr []string, strict bool, metricLabels ...string) *EndpointSet { + endpointSet := NewEndpointSet(nil, nil, func() (specs []*GRPCEndpointSpec) { for _, addr := range discoveredEndpointAddr { specs = append(specs, NewGRPCEndpointSpec(addr, strict)) @@ -1642,6 +1645,7 @@ func TestDeadlockLocking(t *testing.T) { t.Parallel() mockEndpointRef := &endpointRef{ + now: time.Now, addr: "mockedStore", metadata: &endpointMetadata{ &infopb.InfoResponse{}, @@ -1656,9 +1660,11 @@ func TestDeadlockLocking(t *testing.T) { if time.Now().After(deadline) { break } - mockEndpointRef.update(time.Now, &endpointMetadata{ + mockEndpointRef.mtx.Lock() + mockEndpointRef.updateMetadata(&endpointMetadata{ InfoResponse: &infopb.InfoResponse{}, }, nil) + mockEndpointRef.mtx.Unlock() } return nil })