From 655c3983752fbb6bba936038402c6b8b2bf94d8f Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Fri, 24 May 2024 22:21:59 +0530 Subject: [PATCH] Rename `Processor` to `PostAggregator` (#5479) ## Which problem is this PR solving? - https://github.com/jaegertracing/jaeger/pull/5441#discussion_r1602373549 ## Description of the changes - Rename `Processor` to `PostAggregator` ## How was this change tested? - `make test` ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [ ] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` Signed-off-by: pushkarm029 Signed-off-by: Vamshi Maskuri --- .../strategystore/adaptive/aggregator.go | 10 ++-- .../strategystore/adaptive/processor.go | 46 +++++++++---------- .../strategystore/adaptive/processor_test.go | 36 +++++++-------- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index f41bbd99ada..0afb4d3d261 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -39,7 +39,7 @@ type aggregator struct { operationsCounter metrics.Counter servicesCounter metrics.Counter currentThroughput serviceOperationThroughput - processor *Processor + postAggregator *PostAggregator aggregationInterval time.Duration storage samplingstore.Store stop chan struct{} @@ -55,7 +55,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F } logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname)) - processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger) + postAggregator, err := newPostAggregator(options, hostname, store, participant, metricsFactory, logger) if err != nil { return nil, err } @@ -65,7 +65,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}), currentThroughput: make(serviceOperationThroughput), aggregationInterval: options.CalculationInterval, - processor: processor, + postAggregator: postAggregator, storage: store, stop: make(chan struct{}), }, nil @@ -79,7 +79,7 @@ func (a *aggregator) runAggregationLoop() { a.Lock() a.saveThroughput() a.currentThroughput = make(serviceOperationThroughput) - a.processor.runCalculation() + a.postAggregator.runCalculation() a.Unlock() case <-a.stop: ticker.Stop() @@ -130,7 +130,7 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } func (a *aggregator) Start() { - a.processor.Start() + a.postAggregator.Start() a.bgFinished.Add(1) go func() { diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go index 6fe13d4ba88..b62be138249 100644 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -69,11 +69,11 @@ type throughputBucket struct { endTime time.Time } -// Processor retrieves service throughput over a look back interval and calculates sampling probabilities +// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities // per operation such that each operation is sampled at a specified target QPS. It achieves this by // retrieving discrete buckets of operation throughput and doing a weighted average of the throughput // and generating a probability to match the targetQPS. -type Processor struct { +type PostAggregator struct { sync.RWMutex Options @@ -106,15 +106,15 @@ type Processor struct { lastCheckedTime time.Time } -// newProcessor creates a new sampling processor that generates sampling rates for service operations. -func newProcessor( +// newPostAggregator creates a new sampling postAggregator that generates sampling rates for service operations. +func newPostAggregator( opts Options, hostname string, storage samplingstore.Store, electionParticipant leaderelection.ElectionParticipant, metricsFactory metrics.Factory, logger *zap.Logger, -) (*Processor, error) { +) (*PostAggregator, error) { if opts.CalculationInterval == 0 || opts.AggregationBuckets == 0 { return nil, errNonZero } @@ -122,7 +122,7 @@ func newProcessor( return nil, errBucketsForCalculation } metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "adaptive_sampling_processor"}) - return &Processor{ + return &PostAggregator{ Options: opts, storage: storage, probabilities: make(model.ServiceOperationProbabilities), @@ -150,9 +150,9 @@ func (p *StrategyStore) GetSamplingStrategy(_ context.Context, service string) ( return p.generateDefaultSamplingStrategyResponse(), nil } -// Start initializes and starts the sampling processor which regularly calculates sampling probabilities. -func (p *Processor) Start() error { - p.logger.Info("starting adaptive sampling processor") +// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities. +func (p *PostAggregator) Start() error { + p.logger.Info("starting adaptive sampling postAggregator") // NB: the first tick will be slightly delayed by the initializeThroughput call. p.lastCheckedTime = time.Now().Add(p.Delay * -1) p.initializeThroughput(p.lastCheckedTime) @@ -186,7 +186,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() { for { select { case <-ticker.C: - // Only load probabilities if this processor doesn't hold the leader lock + // Only load probabilities if this strategy_store doesn't hold the leader lock if !p.isLeader() { p.loadProbabilities() p.generateStrategyResponses() @@ -197,7 +197,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() { } } -func (p *Processor) isLeader() bool { +func (p *PostAggregator) isLeader() bool { return p.electionParticipant.IsLeader() } @@ -213,7 +213,7 @@ func addJitter(jitterAmount time.Duration) time.Duration { return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) } -func (p *Processor) runCalculation() { +func (p *PostAggregator) runCalculation() { endTime := time.Now().Add(p.Delay * -1) startTime := p.lastCheckedTime throughput, err := p.storage.GetThroughput(startTime, endTime) @@ -251,7 +251,7 @@ func (p *Processor) runCalculation() { } } -func (p *Processor) saveProbabilitiesAndQPS() { +func (p *PostAggregator) saveProbabilitiesAndQPS() { p.RLock() defer p.RUnlock() if err := p.storage.InsertProbabilitiesAndQPS(p.hostname, p.probabilities, p.qps); err != nil { @@ -259,7 +259,7 @@ func (p *Processor) saveProbabilitiesAndQPS() { } } -func (p *Processor) prependThroughputBucket(bucket *throughputBucket) { +func (p *PostAggregator) prependThroughputBucket(bucket *throughputBucket) { p.throughputs = append([]*throughputBucket{bucket}, p.throughputs...) if len(p.throughputs) > p.AggregationBuckets { p.throughputs = p.throughputs[0:p.AggregationBuckets] @@ -269,7 +269,7 @@ func (p *Processor) prependThroughputBucket(bucket *throughputBucket) { // aggregateThroughput aggregates operation throughput from different buckets into one. // All input buckets represent a single time range, but there are many of them because // they are all independently generated by different collector instances from inbound span traffic. -func (p *Processor) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput { +func (p *PostAggregator) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput { aggregatedThroughput := make(serviceOperationThroughput) for _, throughput := range throughputs { service := throughput.Service @@ -301,7 +301,7 @@ func copySet(in map[string]struct{}) map[string]struct{} { return out } -func (p *Processor) initializeThroughput(endTime time.Time) { +func (p *PostAggregator) initializeThroughput(endTime time.Time) { for i := 0; i < p.AggregationBuckets; i++ { startTime := endTime.Add(p.CalculationInterval * -1) throughput, err := p.storage.GetThroughput(startTime, endTime) @@ -323,7 +323,7 @@ func (p *Processor) initializeThroughput(endTime time.Time) { } // throughputToQPS converts raw throughput counts for all accumulated buckets to QPS values. -func (p *Processor) throughputToQPS() serviceOperationQPS { +func (p *PostAggregator) throughputToQPS() serviceOperationQPS { // TODO previous qps buckets have already been calculated, just need to calculate latest batch // and append them where necessary and throw out the oldest batch. // Edge case #buckets < p.AggregationBuckets, then we shouldn't throw out @@ -351,7 +351,7 @@ func calculateQPS(count int64, interval time.Duration) float64 { // calculateWeightedQPS calculates the weighted qps of the slice allQPS where weights are biased // towards more recent qps. This function assumes that the most recent qps is at the head of the slice. -func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 { +func (p *PostAggregator) calculateWeightedQPS(allQPS []float64) float64 { if len(allQPS) == 0 { return 0 } @@ -363,14 +363,14 @@ func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 { return qps } -func (p *Processor) prependServiceCache() { +func (p *PostAggregator) prependServiceCache() { p.serviceCache = append([]SamplingCache{make(SamplingCache)}, p.serviceCache...) if len(p.serviceCache) > serviceCacheSize { p.serviceCache = p.serviceCache[0:serviceCacheSize] } } -func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) { +func (p *PostAggregator) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) { p.prependServiceCache() retProbabilities := make(model.ServiceOperationProbabilities) retQPS := make(model.ServiceOperationQPS) @@ -394,7 +394,7 @@ func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbab return retProbabilities, retQPS } -func (p *Processor) calculateProbability(service, operation string, qps float64) float64 { +func (p *PostAggregator) calculateProbability(service, operation string, qps float64) float64 { oldProbability := p.InitialSamplingProbability // TODO: is this loop overly expensive? p.RLock() @@ -429,7 +429,7 @@ func (p *Processor) calculateProbability(service, operation string, qps float64) } // is actual value within p.DeltaTolerance percentage of expected value. -func (p *Processor) withinTolerance(actual, expected float64) bool { +func (p *PostAggregator) withinTolerance(actual, expected float64) bool { return math.Abs(actual-expected)/expected < p.DeltaTolerance } @@ -441,7 +441,7 @@ func merge(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} { return p1 } -func (p *Processor) isUsingAdaptiveSampling( +func (p *PostAggregator) isUsingAdaptiveSampling( probability float64, service string, operation string, diff --git a/plugin/sampling/strategystore/adaptive/processor_test.go b/plugin/sampling/strategystore/adaptive/processor_test.go index 1ff7b402de8..565773b8ec1 100644 --- a/plugin/sampling/strategystore/adaptive/processor_test.go +++ b/plugin/sampling/strategystore/adaptive/processor_test.go @@ -85,14 +85,14 @@ func testCalculator() calculationstrategy.ProbabilityCalculator { } func TestAggregateThroughputInputsImmutability(t *testing.T) { - p := &Processor{} + p := &PostAggregator{} in := testThroughputs() _ = p.aggregateThroughput(in) assert.Equal(t, in, testThroughputs()) } func TestAggregateThroughput(t *testing.T) { - p := &Processor{} + p := &PostAggregator{} aggregatedThroughput := p.aggregateThroughput(testThroughputs()) require.Len(t, aggregatedThroughput, 2) @@ -128,7 +128,7 @@ func TestInitializeThroughput(t *testing.T) { Return([]*model.Throughput{{Service: "svcA", Operation: "GET", Count: 7}}, nil) mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*17), time.Time{}.Add(time.Minute*18)). Return([]*model.Throughput{}, nil) - p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}} + p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}} p.initializeThroughput(time.Time{}.Add(time.Minute * 20)) require.Len(t, p.throughputs, 2) @@ -144,7 +144,7 @@ func TestInitializeThroughputFailure(t *testing.T) { mockStorage := &smocks.Store{} mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)). Return(nil, errTestStorage()) - p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}} + p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}} p.initializeThroughput(time.Time{}.Add(time.Minute * 20)) assert.Empty(t, p.throughputs) @@ -159,7 +159,7 @@ func TestCalculateQPS(t *testing.T) { } func TestGenerateOperationQPS(t *testing.T) { - p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}} + p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}} svcOpQPS := p.throughputToQPS() assert.Len(t, svcOpQPS, 2) @@ -207,7 +207,7 @@ func TestGenerateOperationQPS(t *testing.T) { } func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) { - p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}} + p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}} svcOpQPS := p.throughputToQPS() assert.Len(t, svcOpQPS, 2) @@ -241,7 +241,7 @@ func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) { } func TestCalculateWeightedQPS(t *testing.T) { - p := Processor{weightVectorCache: NewWeightVectorCache()} + p := PostAggregator{weightVectorCache: NewWeightVectorCache()} assert.InDelta(t, 0.86735, p.calculateWeightedQPS([]float64{0.8, 1.2, 1.0}), 0.001) assert.InDelta(t, 0.95197, p.calculateWeightedQPS([]float64{1.0, 1.0, 0.0, 0.0}), 0.001) assert.Equal(t, 0.0, p.calculateWeightedQPS([]float64{})) @@ -268,7 +268,7 @@ func TestCalculateProbability(t *testing.T) { InitialSamplingProbability: 0.001, MinSamplingProbability: 0.00001, } - p := &Processor{ + p := &PostAggregator{ Options: cfg, probabilities: probabilities, probabilityCalculator: testCalculator(), @@ -308,7 +308,7 @@ func TestCalculateProbabilitiesAndQPS(t *testing.T) { }, } mets := metricstest.NewFactory(0) - p := &Processor{ + p := &PostAggregator{ Options: Options{ TargetSamplesPerSecond: 1.0, DeltaTolerance: 0.2, @@ -365,7 +365,7 @@ func TestRunCalculationLoop(t *testing.T) { for i := 0; i < 1000; i++ { agg.(*aggregator).Lock() - probabilities := agg.(*aggregator).processor.probabilities + probabilities := agg.(*aggregator).postAggregator.probabilities agg.(*aggregator).Unlock() if len(probabilities) != 0 { break @@ -373,7 +373,7 @@ func TestRunCalculationLoop(t *testing.T) { time.Sleep(time.Millisecond) } - probabilities := agg.(*aggregator).processor.probabilities + probabilities := agg.(*aggregator).postAggregator.probabilities require.Len(t, probabilities["svcA"], 2) } @@ -521,7 +521,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) { } func TestPrependBucket(t *testing.T) { - p := &Processor{Options: Options{AggregationBuckets: 1}} + p := &PostAggregator{Options: Options{AggregationBuckets: 1}} p.prependThroughputBucket(&throughputBucket{interval: time.Minute}) require.Len(t, p.throughputs, 1) assert.Equal(t, time.Minute, p.throughputs[0].interval) @@ -541,17 +541,17 @@ func TestConstructorFailure(t *testing.T) { CalculationInterval: time.Second * 5, AggregationBuckets: 0, } - _, err := newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger) + _, err := newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger) require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0") cfg.CalculationInterval = 0 - _, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger) + _, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger) require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0") cfg.CalculationInterval = time.Millisecond cfg.AggregationBuckets = 1 cfg.BucketsForCalculation = -1 - _, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger) + _, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger) require.EqualError(t, err, "BucketsForCalculation cannot be less than 1") } @@ -591,7 +591,7 @@ func TestGenerateStrategyResponses(t *testing.T) { } func TestUsingAdaptiveSampling(t *testing.T) { - p := &Processor{} + p := &PostAggregator{} throughput := serviceOperationThroughput{ "svc": map[string]*model.Throughput{ "op": {Probabilities: map[string]struct{}{"0.010000": {}}}, @@ -617,7 +617,7 @@ func TestUsingAdaptiveSampling(t *testing.T) { } func TestPrependServiceCache(t *testing.T) { - p := &Processor{} + p := &PostAggregator{} for i := 0; i < serviceCacheSize*2; i++ { p.prependServiceCache() } @@ -640,7 +640,7 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) { }, } - p := &Processor{ + p := &PostAggregator{ Options: Options{ TargetSamplesPerSecond: 1.0, DeltaTolerance: 0.002,