From 2618765aadb6c643dfe6a9125bca8c0716d997d2 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 23 May 2024 12:29:07 +0530 Subject: [PATCH 1/6] init Signed-off-by: pushkarm029 --- .../strategystore/adaptive/aggregator.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index f41bbd99ada..2bb8a5f767b 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -144,3 +144,20 @@ func (a *aggregator) Close() error { a.bgFinished.Wait() return nil } + +func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logger *zap.Logger) { + // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} From d4ad066791aa40e388609ea9628eb870408470c7 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 23 May 2024 13:22:38 +0530 Subject: [PATCH 2/6] added tests Signed-off-by: pushkarm029 --- .../strategystore/adaptive/aggregator_test.go | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 84f3f66f170..06a09d5deb6 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -115,3 +115,58 @@ func TestLowerboundThroughput(t *testing.T) { assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count) assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"]) } + +func TestRecordThroughput(t *testing.T) { + tests := []struct { + name string + span *model.Span + count int + }{ + { + name: "probabilistic", + span: &model.Span{ + OperationName: "GET", + Process: &model.Process{ + ServiceName: "A", + }, + Tags: model.KeyValues{ + model.String("sampler.type", "probabilistic"), + model.String("sampler.param", "0.001"), + }, + }, + count: 1, + }, + { + name: "lowerbound", + span: &model.Span{ + OperationName: "GET", + Process: &model.Process{ + ServiceName: "A", + }, + Tags: model.KeyValues{ + model.String("sampler.type", "lowerbound"), + model.String("sampler.param", "0.001"), + }, + }, + count: 0, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + metricsFactory := metricstest.NewFactory(0) + mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + logger := zap.NewNop() + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) + RecordThroughput(a, test.span, logger) + assert.EqualValues(t, test.count, a.(*aggregator).currentThroughput["A"]["GET"].Count) + assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"]) + }) + } +} From d9453f482cbc219eb918f73617692ba816623643 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 23 May 2024 17:30:29 +0530 Subject: [PATCH 3/6] fix test Signed-off-by: pushkarm029 --- .../strategystore/adaptive/aggregator_test.go | 85 ++++++++----------- 1 file changed, 35 insertions(+), 50 deletions(-) diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 06a09d5deb6..48ff25f531c 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -117,56 +117,41 @@ func TestLowerboundThroughput(t *testing.T) { } func TestRecordThroughput(t *testing.T) { - tests := []struct { - name string - span *model.Span - count int - }{ - { - name: "probabilistic", - span: &model.Span{ - OperationName: "GET", - Process: &model.Process{ - ServiceName: "A", - }, - Tags: model.KeyValues{ - model.String("sampler.type", "probabilistic"), - model.String("sampler.param", "0.001"), - }, - }, - count: 1, - }, - { - name: "lowerbound", - span: &model.Span{ - OperationName: "GET", - Process: &model.Process{ - ServiceName: "A", - }, - Tags: model.KeyValues{ - model.String("sampler.type", "lowerbound"), - model.String("sampler.param", "0.001"), - }, - }, - count: 0, - }, + metricsFactory := metricstest.NewFactory(0) + mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - metricsFactory := metricstest.NewFactory(0) - mockStorage := &mocks.Store{} - mockEP := &epmocks.ElectionParticipant{} - testOpts := Options{ - CalculationInterval: 1 * time.Second, - AggregationBuckets: 1, - BucketsForCalculation: 1, - } - logger := zap.NewNop() - a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) - require.NoError(t, err) - RecordThroughput(a, test.span, logger) - assert.EqualValues(t, test.count, a.(*aggregator).currentThroughput["A"]["GET"].Count) - assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"]) - }) + logger := zap.NewNop() + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) + + // Testing non-root span + span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} + RecordThroughput(a, span, logger) + require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + + // Testing span with service name but no operation + span.References = []model.SpanRef{} + span.Process = &model.Process{ + ServiceName: "A", + } + RecordThroughput(a, span, logger) + require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + + // Testing span with service name and operation but no probabilistic sampling tags + span.OperationName = "GET" + RecordThroughput(a, span, logger) + require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + + // Testing span with service name, operation, and probabilistic sampling tags + span.Tags = model.KeyValues{ + model.String("sampler.type", "probabilistic"), + model.String("sampler.param", "0.001"), } + RecordThroughput(a, span, logger) + assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) } From fd0b7728f77b597d3229169dd6d2b36de8cdb300 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sat, 25 May 2024 22:43:45 +0530 Subject: [PATCH 4/6] fix Signed-off-by: pushkarm029 --- cmd/collector/app/root_span_handler_test.go | 3 +++ cmd/collector/app/sampling/strategystore/interface.go | 5 +++++ plugin/sampling/strategystore/adaptive/aggregator.go | 6 +++--- .../sampling/strategystore/adaptive/aggregator_test.go | 10 +++++----- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cmd/collector/app/root_span_handler_test.go b/cmd/collector/app/root_span_handler_test.go index 10a2eeed51d..c0178864446 100644 --- a/cmd/collector/app/root_span_handler_test.go +++ b/cmd/collector/app/root_span_handler_test.go @@ -32,6 +32,9 @@ type mockAggregator struct { func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { t.callCount.Add(1) } +func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) { + handleRootSpan(t, logger)(span, "") +} func (t *mockAggregator) Start() {} func (t *mockAggregator) Close() error { t.closeCount.Add(1) diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 90d9464918d..51caec69d3d 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -20,6 +20,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.uber.org/zap" ) // StrategyStore keeps track of service specific sampling strategies. @@ -36,6 +37,10 @@ type Aggregator interface { // Close() from io.Closer stops the aggregator from aggregating throughput. io.Closer + // The HandleRootSpan function processes a span, checking if it's a root span. + // If it is, it extracts sampler parameters, then calls RecordThroughput. + HandleRootSpan(span *model.Span, logger *zap.Logger) + // RecordThroughput records throughput for an operation for aggregation. RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index 2bb8a5f767b..8140367b9c8 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -145,8 +145,8 @@ func (a *aggregator) Close() error { return nil } -func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logger *zap.Logger) { - // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, +func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) { + // simply checking parentId to determine if a span is a root span is not sufficient. However, // we can be sure that only a root span will have sampler tags. if span.ParentSpanID() != span_model.NewSpanID(0) { return @@ -159,5 +159,5 @@ func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logge if samplerType == span_model.SamplerTypeUnrecognized { return } - agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) + a.RecordThroughput(service, span.OperationName, samplerType, samplerParam) } diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 48ff25f531c..22906a67039 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -131,20 +131,20 @@ func TestRecordThroughput(t *testing.T) { // Testing non-root span span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} - RecordThroughput(a, span, logger) - require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + a.HandleRootSpan(span, logger) + require.Nil(t, a.(*aggregator).currentThroughput) // Testing span with service name but no operation span.References = []model.SpanRef{} span.Process = &model.Process{ ServiceName: "A", } - RecordThroughput(a, span, logger) + a.HandleRootSpan(span, logger) require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) // Testing span with service name and operation but no probabilistic sampling tags span.OperationName = "GET" - RecordThroughput(a, span, logger) + a.HandleRootSpan(span, logger) require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) // Testing span with service name, operation, and probabilistic sampling tags @@ -152,6 +152,6 @@ func TestRecordThroughput(t *testing.T) { model.String("sampler.type", "probabilistic"), model.String("sampler.param", "0.001"), } - RecordThroughput(a, span, logger) + a.HandleRootSpan(span, logger) assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) } From 14eb631f2f05215e7716a98db3d6d70e4cebc08d Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sat, 25 May 2024 22:49:59 +0530 Subject: [PATCH 5/6] fix Signed-off-by: pushkarm029 --- cmd/collector/app/root_span_handler_test.go | 1 + cmd/collector/app/sampling/strategystore/interface.go | 3 ++- plugin/sampling/strategystore/adaptive/aggregator_test.go | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/collector/app/root_span_handler_test.go b/cmd/collector/app/root_span_handler_test.go index c0178864446..7d47411f222 100644 --- a/cmd/collector/app/root_span_handler_test.go +++ b/cmd/collector/app/root_span_handler_test.go @@ -32,6 +32,7 @@ type mockAggregator struct { func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { t.callCount.Add(1) } + func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) { handleRootSpan(t, logger)(span, "") } diff --git a/cmd/collector/app/sampling/strategystore/interface.go b/cmd/collector/app/sampling/strategystore/interface.go index 51caec69d3d..9d2c3fb271b 100644 --- a/cmd/collector/app/sampling/strategystore/interface.go +++ b/cmd/collector/app/sampling/strategystore/interface.go @@ -18,9 +18,10 @@ import ( "context" "io" + "go.uber.org/zap" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" - "go.uber.org/zap" ) // StrategyStore keeps track of service specific sampling strategies. diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 22906a67039..9b98643dd34 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -132,7 +132,7 @@ func TestRecordThroughput(t *testing.T) { // Testing non-root span span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} a.HandleRootSpan(span, logger) - require.Nil(t, a.(*aggregator).currentThroughput) + require.Empty(t, a.(*aggregator).currentThroughput) // Testing span with service name but no operation span.References = []model.SpanRef{} From 143fc0c9d0cdd310b3b1daa3ff61e0a133b14ed8 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sun, 26 May 2024 12:50:49 +0530 Subject: [PATCH 6/6] fix Signed-off-by: pushkarm029 --- cmd/collector/app/collector.go | 5 +- cmd/collector/app/collector_test.go | 21 ++++++ cmd/collector/app/root_span_handler.go | 42 ----------- cmd/collector/app/root_span_handler_test.go | 74 ------------------- .../strategystore/adaptive/aggregator_test.go | 4 +- 5 files changed, 27 insertions(+), 119 deletions(-) delete mode 100644 cmd/collector/app/root_span_handler.go delete mode 100644 cmd/collector/app/root_span_handler_test.go diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index b8cfdc3b3cb..ee8371be75a 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/cmd/collector/app/server" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" @@ -103,7 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { var additionalProcessors []ProcessSpan if c.aggregator != nil { - additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger)) + additionalProcessors = append(additionalProcessors, func(span *model.Span, tenant string) { + c.aggregator.HandleRootSpan(span, c.logger) + }) } c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...) diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 8c92cd81e53..aabc1fb82e2 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -17,6 +17,7 @@ package app import ( "context" "io" + "sync/atomic" "testing" "time" @@ -47,6 +48,26 @@ func optionsForEphemeralPorts() *flags.CollectorOptions { return collectorOpts } +type mockAggregator struct { + callCount atomic.Int32 + closeCount atomic.Int32 +} + +func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { + t.callCount.Add(1) +} + +func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) { + t.callCount.Add(1) +} + +func (t *mockAggregator) Start() {} + +func (t *mockAggregator) Close() error { + t.closeCount.Add(1) + return nil +} + func TestNewCollector(t *testing.T) { // prepare hc := healthcheck.New() diff --git a/cmd/collector/app/root_span_handler.go b/cmd/collector/app/root_span_handler.go deleted file mode 100644 index c2759b7100a..00000000000 --- a/cmd/collector/app/root_span_handler.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" - "github.com/jaegertracing/jaeger/model" -) - -// handleRootSpan returns a function that records throughput for root spans -func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan { - return func(span *model.Span, tenant string) { - // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, - // we can be sure that only a root span will have sampler tags. - if span.ParentSpanID() != model.NewSpanID(0) { - return - } - service := span.Process.ServiceName - if service == "" || span.OperationName == "" { - return - } - samplerType, samplerParam := span.GetSamplerParams(logger) - if samplerType == model.SamplerTypeUnrecognized { - return - } - aggregator.RecordThroughput(service, span.OperationName, samplerType, samplerParam) - } -} diff --git a/cmd/collector/app/root_span_handler_test.go b/cmd/collector/app/root_span_handler_test.go deleted file mode 100644 index 7d47411f222..00000000000 --- a/cmd/collector/app/root_span_handler_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "sync/atomic" - "testing" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/model" -) - -type mockAggregator struct { - callCount atomic.Int32 - closeCount atomic.Int32 -} - -func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) { - t.callCount.Add(1) -} - -func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) { - handleRootSpan(t, logger)(span, "") -} -func (t *mockAggregator) Start() {} -func (t *mockAggregator) Close() error { - t.closeCount.Add(1) - return nil -} - -func TestHandleRootSpan(t *testing.T) { - aggregator := &mockAggregator{} - processor := handleRootSpan(aggregator, zap.NewNop()) - - // Testing non-root span - span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}} - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name but no operation - span.References = []model.SpanRef{} - span.Process = &model.Process{ - ServiceName: "service", - } - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name and operation but no probabilistic sampling tags - span.OperationName = "GET" - processor(span, "") - assert.EqualValues(t, 0, aggregator.callCount.Load()) - - // Testing span with service name, operation, and probabilistic sampling tags - span.Tags = model.KeyValues{ - model.String("sampler.type", "probabilistic"), - model.String("sampler.param", "0.001"), - } - processor(span, "") - assert.EqualValues(t, 1, aggregator.callCount.Load()) -} diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 9b98643dd34..bd34c3cb016 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -140,12 +140,12 @@ func TestRecordThroughput(t *testing.T) { ServiceName: "A", } a.HandleRootSpan(span, logger) - require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + require.Empty(t, a.(*aggregator).currentThroughput) // Testing span with service name and operation but no probabilistic sampling tags span.OperationName = "GET" a.HandleRootSpan(span, logger) - require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"]) + require.Empty(t, a.(*aggregator).currentThroughput) // Testing span with service name, operation, and probabilistic sampling tags span.Tags = model.KeyValues{