diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index b8cfdc3b3cb..ee8371be75a 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/cmd/collector/app/server" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -103,7 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { var additionalProcessors []ProcessSpan if c.aggregator != nil { - additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger)) + additionalProcessors = append(additionalProcessors, func(span *model.Span, tenant string) { + c.aggregator.HandleRootSpan(span, c.logger) + }) } c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...) diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 8c92cd81e53..aabc1fb82e2 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -17,6 +17,7 @@ package app import ( "context" "io" + "sync/atomic" "testing" "time" @@ -47,6 +48,26 @@ func optionsForEphemeralPorts() *flags.CollectorOptions { return collectorOpts } +type mockAggregator struct { + callCount atomic.Int32 + closeCount atomic.Int32 +} + +func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { + t.callCount.Add(1) +} + +func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) { + t.callCount.Add(1) +} + +func (t *mockAggregator) Start() {} + +func (t *mockAggregator) Close() error { + t.closeCount.Add(1) + return nil +} + func TestNewCollector(t *testing.T) { // prepare hc := healthcheck.New() diff --git a/cmd/collector/app/root_span_handler.go b/cmd/collector/app/root_span_handler.go deleted file mode 100644 index c2759b7100a..00000000000 --- a/cmd/collector/app/root_span_handler.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" - "github.com/jaegertracing/jaeger/model" -) - -// handleRootSpan returns a function that records throughput for root spans -func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan { - return func(span *model.Span, tenant string) { - // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, - // we can be sure that only a root span will have sampler tags. - if span.ParentSpanID() != model.NewSpanID(0) { - return - } - service := span.Process.ServiceName - if service == "" || span.OperationName == "" { - return - } - samplerType, samplerParam := span.GetSamplerParams(logger) - if samplerType == model.SamplerTypeUnrecognized { - return - } - aggregator.RecordThroughput(service, span.OperationName, samplerType, samplerParam) - } -} diff --git a/cmd/collector/app/root_span_handler_test.go b/cmd/collector/app/root_span_handler_test.go deleted file mode 100644 index 10a2eeed51d..00000000000 --- a/cmd/collector/app/root_span_handler_test.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/model" -) - -type mockAggregator struct { - callCount atomic.Int32 - closeCount atomic.Int32 -} - -func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { - t.callCount.Add(1) -} -func (t *mockAggregator) Start() {} -func (t *mockAggregator) Close() error { - t.closeCount.Add(1) - return nil -} - -func TestHandleRootSpan(t *testing.T) { - aggregator := &mockAggregator{} - processor := handleRootSpan(aggregator, zap.NewNop()) - - // Testing non-root span - span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name but no operation - span.References = []model.SpanRef{} - span.Process = &model.Process{ - ServiceName: "service", - } - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name and operation but no probabilistic sampling tags - span.OperationName = "GET" - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name, operation, and probabilistic sampling tags - span.Tags = model.KeyValues{ - model.String("sampler.type", "probabilistic"), - model.String("sampler.param", "0.001"), - } - processor(span, "") - assert.EqualValues(t, 1, aggregator.callCount.Load()) -} diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 90d9464918d..9d2c3fb271b 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -18,6 +18,8 @@ import ( "context" "io" + "go.uber.org/zap" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -36,6 +38,10 @@ type Aggregator interface { // Close() from io.Closer stops the aggregator from aggregating throughput. io.Closer + // The HandleRootSpan function processes a span, checking if it's a root span. + // If it is, it extracts sampler parameters, then calls RecordThroughput. + HandleRootSpan(span *model.Span, logger *zap.Logger) + // RecordThroughput records throughput for an operation for aggregation. RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index f41bbd99ada..8140367b9c8 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -144,3 +144,20 @@ func (a *aggregator) Close() error { a.bgFinished.Wait() return nil } + +func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) { + // simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + a.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 84f3f66f170..bd34c3cb016 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -115,3 +115,43 @@ func TestLowerboundThroughput(t *testing.T) { assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count) assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"]) } + +func TestRecordThroughput(t *testing.T) { + metricsFactory := metricstest.NewFactory(0) + mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + logger := zap.NewNop() + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) + + // Testing non-root span + span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} + a.HandleRootSpan(span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name but no operation + span.References = []model.SpanRef{} + span.Process = &model.Process{ + ServiceName: "A", + } + a.HandleRootSpan(span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name and operation but no probabilistic sampling tags + span.OperationName = "GET" + a.HandleRootSpan(span, logger) + require.Empty(t, a.(*aggregator).currentThroughput) + + // Testing span with service name, operation, and probabilistic sampling tags + span.Tags = model.KeyValues{ + model.String("sampler.type", "probabilistic"), + model.String("sampler.param", "0.001"), + } + a.HandleRootSpan(span, logger) + assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) +}