From c9fe8911ffc82d2fba28c3b1fdf96c0723dedbda Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Wed, 22 May 2024 00:36:42 +0530 Subject: [PATCH] Refactor Adaptive Sampling Aggregator & Strategy Store (#5441) ## Which problem is this PR solving? - part of #5389 ## Description of the changes - processor is co-located in strategy_store and aggregator. - In aggregator to run `generateStrategyResponses`, `runCalculationLoop`. - In strategy_store to run `loadProbabilities`, `runUpdateProbabilitiesLoop` ## How was this change tested? - `make test` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Pushkar Mishra Signed-off-by: pushkarm029 --- .../app/sampling/strategystore/factory.go | 3 + cmd/collector/main.go | 3 + .../strategystore/adaptive/aggregator.go | 36 ++++- .../strategystore/adaptive/aggregator_test.go | 40 ++++- .../strategystore/adaptive/factory.go | 25 ++- .../strategystore/adaptive/factory_test.go | 2 + .../strategystore/adaptive/processor.go | 147 +++++++----------- .../strategystore/adaptive/processor_test.go | 91 ++++------- .../strategystore/adaptive/strategy_store.go | 81 +++++++--- plugin/sampling/strategystore/factory.go | 12 ++ plugin/sampling/strategystore/factory_test.go | 9 ++ .../sampling/strategystore/static/factory.go | 5 + .../strategystore/static/factory_test.go | 1 + 13 files changed, 262 insertions(+), 193 deletions(-) diff --git a/cmd/collector/app/sampling/strategystore/factory.go b/cmd/collector/app/sampling/strategystore/factory.go index 5fab77b62bd..4a4059c4f5f 100644 --- a/cmd/collector/app/sampling/strategystore/factory.go +++ b/cmd/collector/app/sampling/strategystore/factory.go @@ -33,4 +33,7 @@ type Factory interface { // CreateStrategyStore initializes the StrategyStore and returns it. CreateStrategyStore() (StrategyStore, Aggregator, error) + + // Close closes the factory + Close() error } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index e8f1ca4db95..fdf85102e90 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -134,6 +134,9 @@ func main() { if err := storageFactory.Close(); err != nil { logger.Error("Failed to close storage factory", zap.Error(err)) } + if err := strategyStoreFactory.Close(); err != nil { + logger.Error("Failed to close sampling strategy store factory", zap.Error(err)) + } }) return nil }, diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index 44d8c9906b9..f41bbd99ada 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -18,10 +18,14 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" span_model "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/hostname" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" "github.com/jaegertracing/jaeger/storage/samplingstore" ) @@ -35,22 +39,36 @@ type aggregator struct { operationsCounter metrics.Counter servicesCounter metrics.Counter currentThroughput serviceOperationThroughput + processor *Processor aggregationInterval time.Duration storage samplingstore.Store stop chan struct{} + bgFinished sync.WaitGroup } // NewAggregator creates a throughput aggregator that simply emits metrics // about the number of operations seen over the aggregationInterval. -func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator { +func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) { + hostname, err := hostname.AsIdentifier() + if err != nil { + return nil, err + } + logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname)) + + processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger) + if err != nil { + return nil, err + } + return &aggregator{ operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}), servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}), currentThroughput: make(serviceOperationThroughput), - aggregationInterval: interval, - storage: storage, + aggregationInterval: options.CalculationInterval, + processor: processor, + storage: store, stop: make(chan struct{}), - } + }, nil } func (a *aggregator) runAggregationLoop() { @@ -61,6 +79,7 @@ func (a *aggregator) runAggregationLoop() { a.Lock() a.saveThroughput() a.currentThroughput = make(serviceOperationThroughput) + a.processor.runCalculation() a.Unlock() case <-a.stop: ticker.Stop() @@ -111,10 +130,17 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } func (a *aggregator) Start() { - go a.runAggregationLoop() + a.processor.Start() + + a.bgFinished.Add(1) + go func() { + a.runAggregationLoop() + a.bgFinished.Done() + }() } func (a *aggregator) Close() error { close(a.stop) + a.bgFinished.Wait() return nil } diff --git a/plugin/sampling/strategystore/adaptive/aggregator_test.go b/plugin/sampling/strategystore/adaptive/aggregator_test.go index 781c51e72b0..84f3f66f170 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator_test.go +++ b/plugin/sampling/strategystore/adaptive/aggregator_test.go @@ -20,9 +20,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" + epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks" "github.com/jaegertracing/jaeger/storage/samplingstore/mocks" ) @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) { mockStorage := &mocks.Store{} mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil) + mockEP := &epmocks.ElectionParticipant{} + mockEP.On("Start").Return(nil) + mockEP.On("Close").Return(nil) + mockEP.On("IsLeader").Return(true) + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + logger := zap.NewNop() - a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage) + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001) a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001) a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001) @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) { func TestIncrementThroughput(t *testing.T) { metricsFactory := metricstest.NewFactory(0) mockStorage := &mocks.Store{} - - a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage) + mockEP := &epmocks.ElectionParticipant{} + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + logger := zap.NewNop() + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) // 20 different probabilities for i := 0; i < 20; i++ { a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i)) } assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10) - a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage) + a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) // 20 of the same probabilities for i := 0; i < 20; i++ { a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001) @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) { func TestLowerboundThroughput(t *testing.T) { metricsFactory := metricstest.NewFactory(0) mockStorage := &mocks.Store{} + mockEP := &epmocks.ElectionParticipant{} + testOpts := Options{ + CalculationInterval: 1 * time.Second, + AggregationBuckets: 1, + BucketsForCalculation: 1, + } + logger := zap.NewNop() - a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage) + a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) + require.NoError(t, err) a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001) assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count) assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"]) diff --git a/plugin/sampling/strategystore/adaptive/factory.go b/plugin/sampling/strategystore/adaptive/factory.go index d45f8e61f9b..ef875e2dcee 100644 --- a/plugin/sampling/strategystore/adaptive/factory.go +++ b/plugin/sampling/strategystore/adaptive/factory.go @@ -25,6 +25,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/distributedlock" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/samplingstore" ) @@ -38,6 +39,7 @@ type Factory struct { metricsFactory metrics.Factory lock distributedlock.Lock store samplingstore.Store + participant *leaderelection.DistributedElectionParticipant } // NewFactory creates a new Factory. @@ -66,7 +68,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S if ssFactory == nil { return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling") } - var err error f.logger = logger f.metricsFactory = metricsFactory @@ -78,17 +79,31 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S if err != nil { return err } + f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{ + FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval, + LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval, + Logger: f.logger, + }) + f.participant.Start() + return nil } // CreateStrategyStore implements strategystore.Factory func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) { - p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store) + s := NewStrategyStore(*f.options, f.logger, f.participant, f.store) + a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, f.participant, f.store) if err != nil { return nil, nil, err } - p.Start() - a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store) + + s.Start() a.Start() - return p, a, nil + + return s, a, nil +} + +// Closes the factory +func (f *Factory) Close() error { + return f.participant.Close() } diff --git a/plugin/sampling/strategystore/adaptive/factory_test.go b/plugin/sampling/strategystore/adaptive/factory_test.go index 99c02b8c1f2..39157f1f39c 100644 --- a/plugin/sampling/strategystore/adaptive/factory_test.go +++ b/plugin/sampling/strategystore/adaptive/factory_test.go @@ -76,6 +76,7 @@ func TestFactory(t *testing.T) { require.NoError(t, err) require.NoError(t, store.Close()) require.NoError(t, aggregator.Close()) + require.NoError(t, f.Close()) } func TestBadConfigFail(t *testing.T) { @@ -97,6 +98,7 @@ func TestBadConfigFail(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop())) _, _, err := f.CreateStrategyStore() require.Error(t, err) + require.NoError(t, f.Close()) } } diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index 7ac5c1834fa..6fe13d4ba88 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -37,8 +37,6 @@ const ( getThroughputErrMsg = "failed to get throughput from storage" - defaultFollowerProbabilityInterval = 20 * time.Second - // The number of past entries for samplingCache the leader keeps in memory serviceCacheSize = 25 @@ -95,26 +93,17 @@ type Processor struct { // The latest throughput is stored at the head of the slice. throughputs []*throughputBucket - // strategyResponses is the cache of the sampling strategies for every service, in Thrift format. - // TODO change this to work with protobuf model instead, to support gRPC endpoint. - strategyResponses map[string]*api_v2.SamplingStrategyResponse - weightVectorCache *WeightVectorCache probabilityCalculator calculationstrategy.ProbabilityCalculator - // followerRefreshInterval determines how often the follower processor updates its probabilities. - // Given only the leader writes probabilities, the followers need to fetch the probabilities into - // cache. - followerRefreshInterval time.Duration - serviceCache []SamplingCache - shutdown chan struct{} - bgFinished sync.WaitGroup + shutdown chan struct{} operationsCalculatedGauge metrics.Gauge calculateProbabilitiesLatency metrics.Timer + lastCheckedTime time.Time } // newProcessor creates a new sampling processor that generates sampling rates for service operations. @@ -139,21 +128,20 @@ func newProcessor( probabilities: make(model.ServiceOperationProbabilities), qps: make(model.ServiceOperationQPS), hostname: hostname, - strategyResponses: make(map[string]*api_v2.SamplingStrategyResponse), logger: logger, electionParticipant: electionParticipant, // TODO make weightsCache and probabilityCalculator configurable weightVectorCache: NewWeightVectorCache(), probabilityCalculator: calculationstrategy.NewPercentageIncreaseCappedCalculator(1.0), - followerRefreshInterval: defaultFollowerProbabilityInterval, serviceCache: []SamplingCache{}, operationsCalculatedGauge: metricsFactory.Gauge(metrics.Options{Name: "operations_calculated"}), calculateProbabilitiesLatency: metricsFactory.Timer(metrics.TimerOptions{Name: "calculate_probabilities"}), + shutdown: make(chan struct{}), }, nil } -// GetSamplingStrategy implements Thrift endpoint for retrieving sampling strategy for a service. -func (p *Processor) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) { +// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service. +func (p *StrategyStore) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) { p.RLock() defer p.RUnlock() if strategy, ok := p.strategyResponses[service]; ok { @@ -165,37 +153,13 @@ func (p *Processor) GetSamplingStrategy(_ context.Context, service string) (*api // Start initializes and starts the sampling processor which regularly calculates sampling probabilities. func (p *Processor) Start() error { p.logger.Info("starting adaptive sampling processor") - if err := p.electionParticipant.Start(); err != nil { - return err - } - p.shutdown = make(chan struct{}) - p.loadProbabilities() - p.generateStrategyResponses() - p.runBackground(p.runCalculationLoop) - p.runBackground(p.runUpdateProbabilitiesLoop) + // NB: the first tick will be slightly delayed by the initializeThroughput call. + p.lastCheckedTime = time.Now().Add(p.Delay * -1) + p.initializeThroughput(p.lastCheckedTime) return nil } -func (p *Processor) runBackground(f func()) { - p.bgFinished.Add(1) - go func() { - f() - p.bgFinished.Done() - }() -} - -// Close stops the processor from calculating probabilities. -func (p *Processor) Close() error { - p.logger.Info("stopping adaptive sampling processor") - err := p.electionParticipant.Close() - if p.shutdown != nil { - close(p.shutdown) - } - p.bgFinished.Wait() - return err -} - -func (p *Processor) loadProbabilities() { +func (p *StrategyStore) loadProbabilities() { // TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization probabilities, err := p.storage.GetLatestProbabilities() if err != nil { @@ -209,9 +173,10 @@ func (p *Processor) loadProbabilities() { // runUpdateProbabilitiesLoop is a loop that reads probabilities from storage. // The follower updates its local cache with the latest probabilities and serves them. -func (p *Processor) runUpdateProbabilitiesLoop() { +func (p *StrategyStore) runUpdateProbabilitiesLoop() { select { case <-time.After(addJitter(p.followerRefreshInterval)): + // continue after jitter delay case <-p.shutdown: return } @@ -236,6 +201,10 @@ func (p *Processor) isLeader() bool { return p.electionParticipant.IsLeader() } +func (p *StrategyStore) isLeader() bool { + return p.electionParticipant.IsLeader() +} + // addJitter adds a random amount of time. Without jitter, if the host holding the leader // lock were to die, then all other collectors can potentially wait for a full cycle before // trying to acquire the lock. With jitter, we can reduce the average amount of time before a @@ -244,53 +213,41 @@ func addJitter(jitterAmount time.Duration) time.Duration { return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) } -func (p *Processor) runCalculationLoop() { - lastCheckedTime := time.Now().Add(p.Delay * -1) - p.initializeThroughput(lastCheckedTime) - // NB: the first tick will be slightly delayed by the initializeThroughput call. - ticker := time.NewTicker(p.CalculationInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - endTime := time.Now().Add(p.Delay * -1) - startTime := lastCheckedTime - throughput, err := p.storage.GetThroughput(startTime, endTime) - if err != nil { - p.logger.Error(getThroughputErrMsg, zap.Error(err)) - break - } - aggregatedThroughput := p.aggregateThroughput(throughput) - p.prependThroughputBucket(&throughputBucket{ - throughput: aggregatedThroughput, - interval: endTime.Sub(startTime), - endTime: endTime, - }) - lastCheckedTime = endTime - // Load the latest throughput so that if this host ever becomes leader, it - // has the throughput ready in memory. However, only run the actual calculations - // if this host becomes leader. - // TODO fill the throughput buffer only when we're leader - if p.isLeader() { - startTime := time.Now() - probabilities, qps := p.calculateProbabilitiesAndQPS() - p.Lock() - p.probabilities = probabilities - p.qps = qps - p.Unlock() - // NB: This has the potential of running into a race condition if the CalculationInterval - // is set to an extremely low value. The worst case scenario is that probabilities is calculated - // and swapped more than once before generateStrategyResponses() and saveProbabilities() are called. - // This will result in one or more batches of probabilities not being saved which is completely - // fine. This race condition should not ever occur anyway since the calculation interval will - // be way longer than the time to run the calculations. - p.generateStrategyResponses() - p.calculateProbabilitiesLatency.Record(time.Since(startTime)) - p.runBackground(p.saveProbabilitiesAndQPS) - } - case <-p.shutdown: - return - } +func (p *Processor) runCalculation() { + endTime := time.Now().Add(p.Delay * -1) + startTime := p.lastCheckedTime + throughput, err := p.storage.GetThroughput(startTime, endTime) + if err != nil { + p.logger.Error(getThroughputErrMsg, zap.Error(err)) + return + } + aggregatedThroughput := p.aggregateThroughput(throughput) + p.prependThroughputBucket(&throughputBucket{ + throughput: aggregatedThroughput, + interval: endTime.Sub(startTime), + endTime: endTime, + }) + p.lastCheckedTime = endTime + // Load the latest throughput so that if this host ever becomes leader, it + // has the throughput ready in memory. However, only run the actual calculations + // if this host becomes leader. + // TODO fill the throughput buffer only when we're leader + if p.isLeader() { + startTime := time.Now() + probabilities, qps := p.calculateProbabilitiesAndQPS() + p.Lock() + p.probabilities = probabilities + p.qps = qps + p.Unlock() + // NB: This has the potential of running into a race condition if the CalculationInterval + // is set to an extremely low value. The worst case scenario is that probabilities is calculated + // and swapped more than once before generateStrategyResponses() and saveProbabilities() are called. + // This will result in one or more batches of probabilities not being saved which is completely + // fine. This race condition should not ever occur anyway since the calculation interval will + // be way longer than the time to run the calculations. + + p.calculateProbabilitiesLatency.Record(time.Since(startTime)) + p.saveProbabilitiesAndQPS() } } @@ -513,7 +470,7 @@ func (p *Processor) isUsingAdaptiveSampling( } // generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities. -func (p *Processor) generateStrategyResponses() { +func (p *StrategyStore) generateStrategyResponses() { p.RLock() strategies := make(map[string]*api_v2.SamplingStrategyResponse) for svc, opProbabilities := range p.probabilities { @@ -539,7 +496,7 @@ func (p *Processor) generateStrategyResponses() { p.strategyResponses = strategies } -func (p *Processor) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse { +func (p *StrategyStore) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse { return &api_v2.SamplingStrategyResponse{ StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, OperationSampling: &api_v2.PerOperationSamplingStrategies{ diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index 03ae6b7b7bb..1ff7b402de8 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -339,8 +339,9 @@ func TestRunCalculationLoop(t *testing.T) { mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). Return(testThroughputs(), nil) mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage()) - mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"), + mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"), mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage()) + mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage()) mockEP := &epmocks.ElectionParticipant{} mockEP.On("Start").Return(nil) mockEP.On("Close").Return(nil) @@ -357,22 +358,23 @@ func TestRunCalculationLoop(t *testing.T) { FollowerLeaseRefreshInterval: time.Second, BucketsForCalculation: 10, } - p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) + agg, err := NewAggregator(cfg, logger, metrics.NullFactory, mockEP, mockStorage) require.NoError(t, err) - p.Start() + agg.Start() + defer agg.Close() for i := 0; i < 1000; i++ { - strategy, _ := p.GetSamplingStrategy(context.Background(), "svcA") - if len(strategy.OperationSampling.PerOperationStrategies) != 0 { + agg.(*aggregator).Lock() + probabilities := agg.(*aggregator).processor.probabilities + agg.(*aggregator).Unlock() + if len(probabilities) != 0 { break } time.Sleep(time.Millisecond) } - p.Close() - strategy, err := p.GetSamplingStrategy(context.Background(), "svcA") - require.NoError(t, err) - assert.Len(t, strategy.OperationSampling.PerOperationStrategies, 2) + probabilities := agg.(*aggregator).processor.probabilities + require.Len(t, probabilities["svcA"], 2) } func TestRunCalculationLoop_GetThroughputError(t *testing.T) { @@ -380,6 +382,11 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { mockStorage := &smocks.Store{} mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). Return(nil, errTestStorage()) + mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage()) + mockStorage.On("InsertProbabilitiesAndQPS", mock.AnythingOfType("string"), mock.AnythingOfType("model.ServiceOperationProbabilities"), + mock.AnythingOfType("model.ServiceOperationQPS")).Return(errTestStorage()) + mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(errTestStorage()) + mockEP := &epmocks.ElectionParticipant{} mockEP.On("Start").Return(nil) mockEP.On("Close").Return(nil) @@ -390,12 +397,9 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { AggregationBuckets: 2, BucketsForCalculation: 10, } - p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) + agg, err := NewAggregator(cfg, logger, metrics.NullFactory, mockEP, mockStorage) require.NoError(t, err) - p.shutdown = make(chan struct{}) - defer close(p.shutdown) - go p.runCalculationLoop() - + agg.Start() for i := 0; i < 1000; i++ { // match logs specific to getThroughputErrMsg. We expect to see more than 2, once during // initialization and one or more times during the loop. @@ -406,13 +410,14 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) { } match, errMsg := testutils.LogMatcher(2, getThroughputErrMsg, logBuffer.Lines()) assert.True(t, match, errMsg) + require.NoError(t, agg.Close()) } func TestLoadProbabilities(t *testing.T) { mockStorage := &smocks.Store{} mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil) - p := &Processor{storage: mockStorage} + p := &StrategyStore{storage: mockStorage} require.Nil(t, p.probabilities) p.loadProbabilities() require.NotNil(t, p.probabilities) @@ -426,7 +431,7 @@ func TestRunUpdateProbabilitiesLoop(t *testing.T) { mockEP.On("Close").Return(nil) mockEP.On("IsLeader").Return(false) - p := &Processor{ + p := &StrategyStore{ storage: mockStorage, shutdown: make(chan struct{}), followerRefreshInterval: time.Millisecond, @@ -480,20 +485,19 @@ func TestRealisticRunCalculationLoop(t *testing.T) { AggregationBuckets: 1, Delay: time.Second * 10, } - p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, logger) - require.NoError(t, err) - p.Start() + s := NewStrategyStore(cfg, logger, mockEP, mockStorage) + s.Start() for i := 0; i < 100; i++ { - strategy, _ := p.GetSamplingStrategy(context.Background(), "svcA") + strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA") if len(strategy.OperationSampling.PerOperationStrategies) != 0 { break } time.Sleep(250 * time.Millisecond) } - p.Close() + s.Close() - strategy, err := p.GetSamplingStrategy(context.Background(), "svcA") + strategy, err := s.GetSamplingStrategy(context.Background(), "svcA") require.NoError(t, err) require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4) strategies := strategy.OperationSampling.PerOperationStrategies @@ -557,7 +561,7 @@ func TestGenerateStrategyResponses(t *testing.T) { "GET": 0.5, }, } - p := &Processor{ + p := &StrategyStore{ probabilities: probabilities, Options: Options{ InitialSamplingProbability: 0.001, @@ -853,44 +857,3 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) { p.probabilities = probabilities p.qps = qps } - -func TestErrors(t *testing.T) { - mockStorage := &smocks.Store{} - mockStorage.On("GetLatestProbabilities").Return(model.ServiceOperationProbabilities{}, errTestStorage()) - mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")). - Return(nil, nil) - - cfg := Options{ - TargetSamplesPerSecond: 1.0, - DeltaTolerance: 0.1, - InitialSamplingProbability: 0.001, - CalculationInterval: time.Millisecond * 5, - AggregationBuckets: 2, - Delay: time.Millisecond * 5, - LeaderLeaseRefreshInterval: time.Millisecond, - FollowerLeaseRefreshInterval: time.Second, - BucketsForCalculation: 10, - } - - // start errors - mockEP := &epmocks.ElectionParticipant{} - mockEP.On("Start").Return(errors.New("bad")) - mockEP.On("Close").Return(errors.New("also bad")) - mockEP.On("IsLeader").Return(false) - - p, err := newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - require.Error(t, p.Start()) - require.Error(t, p.Close()) - - // close errors - mockEP = &epmocks.ElectionParticipant{} - mockEP.On("Start").Return(nil) - mockEP.On("Close").Return(errors.New("still bad")) - mockEP.On("IsLeader").Return(false) - - p, err = newProcessor(cfg, "host", mockStorage, mockEP, metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - require.NoError(t, p.Start()) - require.Error(t, p.Close()) -} diff --git a/plugin/sampling/strategystore/adaptive/strategy_store.go b/plugin/sampling/strategystore/adaptive/strategy_store.go index 76d2b9f5d5e..6a8749c9dc8 100644 --- a/plugin/sampling/strategystore/adaptive/strategy_store.go +++ b/plugin/sampling/strategystore/adaptive/strategy_store.go @@ -15,32 +15,75 @@ package adaptive import ( + "sync" + "time" + "go.uber.org/zap" - "github.com/jaegertracing/jaeger/pkg/distributedlock" - "github.com/jaegertracing/jaeger/pkg/hostname" - "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model" "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/storage/samplingstore" ) +const defaultFollowerProbabilityInterval = 20 * time.Second + +type StrategyStore struct { + sync.RWMutex + Options + + electionParticipant leaderelection.ElectionParticipant + storage samplingstore.Store + logger *zap.Logger + + // probabilities contains the latest calculated sampling probabilities for service operations. + probabilities model.ServiceOperationProbabilities + + // strategyResponses is the cache of the sampling strategies for every service, in protobuf format. + strategyResponses map[string]*api_v2.SamplingStrategyResponse + + // followerRefreshInterval determines how often the follower processor updates its probabilities. + // Given only the leader writes probabilities, the followers need to fetch the probabilities into + // cache. + followerRefreshInterval time.Duration + + shutdown chan struct{} + bgFinished sync.WaitGroup +} + // NewStrategyStore creates a strategy store that holds adaptive sampling strategies. -func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) (*Processor, error) { - hostname, err := hostname.AsIdentifier() - if err != nil { - return nil, err - } - logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname)) - - participant := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{ - FollowerLeaseRefreshInterval: options.FollowerLeaseRefreshInterval, - LeaderLeaseRefreshInterval: options.LeaderLeaseRefreshInterval, - Logger: logger, - }) - p, err := newProcessor(options, hostname, store, participant, metricsFactory, logger) - if err != nil { - return nil, err +func NewStrategyStore(options Options, logger *zap.Logger, participant leaderelection.ElectionParticipant, store samplingstore.Store) *StrategyStore { + return &StrategyStore{ + Options: options, + storage: store, + probabilities: make(model.ServiceOperationProbabilities), + strategyResponses: make(map[string]*api_v2.SamplingStrategyResponse), + logger: logger, + electionParticipant: participant, + followerRefreshInterval: defaultFollowerProbabilityInterval, + shutdown: make(chan struct{}), } +} + +// Start initializes and starts the sampling service which regularly loads sampling probabilities and generates strategies. +func (ss *StrategyStore) Start() error { + ss.logger.Info("starting adaptive sampling service") + ss.loadProbabilities() + ss.generateStrategyResponses() + + ss.bgFinished.Add(1) + go func() { + ss.runUpdateProbabilitiesLoop() + ss.bgFinished.Done() + }() + + return nil +} - return p, nil +// Close stops the service from loading probabilities and generating strategies. +func (ss *StrategyStore) Close() error { + ss.logger.Info("stopping adaptive sampling service") + close(ss.shutdown) + ss.bgFinished.Wait() + return nil } diff --git a/plugin/sampling/strategystore/factory.go b/plugin/sampling/strategystore/factory.go index 9cc2693e7bf..d1506ed712d 100644 --- a/plugin/sampling/strategystore/factory.go +++ b/plugin/sampling/strategystore/factory.go @@ -15,6 +15,7 @@ package strategystore import ( + "errors" "flag" "fmt" @@ -113,3 +114,14 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst } return factory.CreateStrategyStore() } + +// Close closes all factories. +func (f *Factory) Close() error { + var errs []error + for _, factory := range f.factories { + if err := factory.Close(); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} diff --git a/plugin/sampling/strategystore/factory_test.go b/plugin/sampling/strategystore/factory_test.go index f42a7a70861..75493a10b9e 100644 --- a/plugin/sampling/strategystore/factory_test.go +++ b/plugin/sampling/strategystore/factory_test.go @@ -79,12 +79,14 @@ func TestNewFactory(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, mockSSFactory, zap.NewNop())) _, _, err = f.CreateStrategyStore() require.NoError(t, err) + require.NoError(t, f.Close()) // force the mock to return errors mock.retError = true require.EqualError(t, f.Initialize(metrics.NullFactory, mockSSFactory, zap.NewNop()), "error initializing store") _, _, err = f.CreateStrategyStore() require.EqualError(t, err, "error creating store") + require.EqualError(t, f.Close(), "error closing store") // request something that doesn't exist f.StrategyStoreType = "doesntexist" @@ -144,6 +146,13 @@ func (f *mockFactory) Initialize(metricsFactory metrics.Factory, ssFactory stora return nil } +func (f *mockFactory) Close() error { + if f.retError { + return errors.New("error closing store") + } + return nil +} + type mockSamplingStoreFactory struct{} func (m *mockSamplingStoreFactory) CreateLock() (distributedlock.Lock, error) { diff --git a/plugin/sampling/strategystore/static/factory.go b/plugin/sampling/strategystore/static/factory.go index 91aa46e6d9c..16097860b6c 100644 --- a/plugin/sampling/strategystore/static/factory.go +++ b/plugin/sampling/strategystore/static/factory.go @@ -67,3 +67,8 @@ func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategyst return s, nil, nil } + +// Close closes the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/plugin/sampling/strategystore/static/factory_test.go b/plugin/sampling/strategystore/static/factory_test.go index f5d7b1880a3..11b169c4818 100644 --- a/plugin/sampling/strategystore/static/factory_test.go +++ b/plugin/sampling/strategystore/static/factory_test.go @@ -40,4 +40,5 @@ func TestFactory(t *testing.T) { require.NoError(t, f.Initialize(metrics.NullFactory, nil, zap.NewNop())) _, _, err := f.CreateStrategyStore() require.NoError(t, err) + require.NoError(t, f.Close()) }