Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Adaptive Sampling Aggregator & Strategy Store #5441

Merged
merged 19 commits into from
May 21, 2024
41 changes: 36 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -35,22 +39,35 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
postAggregator *PostAggregator
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

postAgg, err := newPostAggregator(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
aggregationInterval: options.CalculationInterval,
postAggregator: postAgg,
storage: store,
stop: make(chan struct{}),
}
}, nil
}

func (a *aggregator) runAggregationLoop() {
Expand Down Expand Up @@ -112,9 +129,23 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa

func (a *aggregator) Start() {
go a.runAggregationLoop()
a.postAggregator.Start()
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *aggregator) Close() error {
close(a.stop)
a.Lock()
defer a.Unlock()

if err := a.postAggregator.Close(); err != nil {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return err
}

select {
case <-a.stop:
// a.stop is already closed, do nothing
default:
close(a.stop)
}

return nil
}
40 changes: 35 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand All @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) {

mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
20 changes: 16 additions & 4 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand Down Expand Up @@ -83,12 +84,23 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
participant := leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})

ss, err := NewStrategyStore(*f.options, f.logger, participant, f.store)
if err != nil {
return nil, nil, err
}
ss.Start()

a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, participant, f.store)
if err != nil {
return nil, nil, err
}
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a.Start()
return p, a, nil

return ss, a, nil
}
Loading
Loading