Skip to content

Commit

Permalink
Rename Processor to PostAggregator (jaegertracing#5479)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
-
jaegertracing#5441 (comment)

## Description of the changes
- Rename `Processor` to `PostAggregator`

## How was this change tested?
- `make test`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

Signed-off-by: pushkarm029 <[email protected]>
Signed-off-by: Vamshi Maskuri <[email protected]>
  • Loading branch information
Pushkarm029 authored and varshith257 committed Jun 2, 2024
1 parent ac7731c commit 655c398
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 46 deletions.
10 changes: 5 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
processor *Processor
postAggregator *PostAggregator
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
Expand All @@ -55,7 +55,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
postAggregator, err := newPostAggregator(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}
Expand All @@ -65,7 +65,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: options.CalculationInterval,
processor: processor,
postAggregator: postAggregator,
storage: store,
stop: make(chan struct{}),
}, nil
Expand All @@ -79,7 +79,7 @@ func (a *aggregator) runAggregationLoop() {
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.processor.runCalculation()
a.postAggregator.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -130,7 +130,7 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
a.processor.Start()
a.postAggregator.Start()

a.bgFinished.Add(1)
go func() {
Expand Down
46 changes: 23 additions & 23 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ type throughputBucket struct {
endTime time.Time
}

// Processor retrieves service throughput over a look back interval and calculates sampling probabilities
// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities
// per operation such that each operation is sampled at a specified target QPS. It achieves this by
// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput
// and generating a probability to match the targetQPS.
type Processor struct {
type PostAggregator struct {
sync.RWMutex
Options

Expand Down Expand Up @@ -106,23 +106,23 @@ type Processor struct {
lastCheckedTime time.Time
}

// newProcessor creates a new sampling processor that generates sampling rates for service operations.
func newProcessor(
// newPostAggregator creates a new sampling postAggregator that generates sampling rates for service operations.
func newPostAggregator(
opts Options,
hostname string,
storage samplingstore.Store,
electionParticipant leaderelection.ElectionParticipant,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Processor, error) {
) (*PostAggregator, error) {
if opts.CalculationInterval == 0 || opts.AggregationBuckets == 0 {
return nil, errNonZero
}
if opts.BucketsForCalculation < 1 {
return nil, errBucketsForCalculation
}
metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "adaptive_sampling_processor"})
return &Processor{
return &PostAggregator{
Options: opts,
storage: storage,
probabilities: make(model.ServiceOperationProbabilities),
Expand Down Expand Up @@ -150,9 +150,9 @@ func (p *StrategyStore) GetSamplingStrategy(_ context.Context, service string) (
return p.generateDefaultSamplingStrategyResponse(), nil
}

// Start initializes and starts the sampling processor which regularly calculates sampling probabilities.
func (p *Processor) Start() error {
p.logger.Info("starting adaptive sampling processor")
// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
func (p *PostAggregator) Start() error {
p.logger.Info("starting adaptive sampling postAggregator")
// NB: the first tick will be slightly delayed by the initializeThroughput call.
p.lastCheckedTime = time.Now().Add(p.Delay * -1)
p.initializeThroughput(p.lastCheckedTime)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() {
for {
select {
case <-ticker.C:
// Only load probabilities if this processor doesn't hold the leader lock
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
Expand All @@ -197,7 +197,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() {
}
}

func (p *Processor) isLeader() bool {
func (p *PostAggregator) isLeader() bool {
return p.electionParticipant.IsLeader()
}

Expand All @@ -213,7 +213,7 @@ func addJitter(jitterAmount time.Duration) time.Duration {
return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
}

func (p *Processor) runCalculation() {
func (p *PostAggregator) runCalculation() {
endTime := time.Now().Add(p.Delay * -1)
startTime := p.lastCheckedTime
throughput, err := p.storage.GetThroughput(startTime, endTime)
Expand Down Expand Up @@ -251,15 +251,15 @@ func (p *Processor) runCalculation() {
}
}

func (p *Processor) saveProbabilitiesAndQPS() {
func (p *PostAggregator) saveProbabilitiesAndQPS() {
p.RLock()
defer p.RUnlock()
if err := p.storage.InsertProbabilitiesAndQPS(p.hostname, p.probabilities, p.qps); err != nil {
p.logger.Warn("could not save probabilities", zap.Error(err))
}
}

func (p *Processor) prependThroughputBucket(bucket *throughputBucket) {
func (p *PostAggregator) prependThroughputBucket(bucket *throughputBucket) {
p.throughputs = append([]*throughputBucket{bucket}, p.throughputs...)
if len(p.throughputs) > p.AggregationBuckets {
p.throughputs = p.throughputs[0:p.AggregationBuckets]
Expand All @@ -269,7 +269,7 @@ func (p *Processor) prependThroughputBucket(bucket *throughputBucket) {
// aggregateThroughput aggregates operation throughput from different buckets into one.
// All input buckets represent a single time range, but there are many of them because
// they are all independently generated by different collector instances from inbound span traffic.
func (p *Processor) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput {
func (p *PostAggregator) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput {
aggregatedThroughput := make(serviceOperationThroughput)
for _, throughput := range throughputs {
service := throughput.Service
Expand Down Expand Up @@ -301,7 +301,7 @@ func copySet(in map[string]struct{}) map[string]struct{} {
return out
}

func (p *Processor) initializeThroughput(endTime time.Time) {
func (p *PostAggregator) initializeThroughput(endTime time.Time) {
for i := 0; i < p.AggregationBuckets; i++ {
startTime := endTime.Add(p.CalculationInterval * -1)
throughput, err := p.storage.GetThroughput(startTime, endTime)
Expand All @@ -323,7 +323,7 @@ func (p *Processor) initializeThroughput(endTime time.Time) {
}

// throughputToQPS converts raw throughput counts for all accumulated buckets to QPS values.
func (p *Processor) throughputToQPS() serviceOperationQPS {
func (p *PostAggregator) throughputToQPS() serviceOperationQPS {
// TODO previous qps buckets have already been calculated, just need to calculate latest batch
// and append them where necessary and throw out the oldest batch.
// Edge case #buckets < p.AggregationBuckets, then we shouldn't throw out
Expand Down Expand Up @@ -351,7 +351,7 @@ func calculateQPS(count int64, interval time.Duration) float64 {

// calculateWeightedQPS calculates the weighted qps of the slice allQPS where weights are biased
// towards more recent qps. This function assumes that the most recent qps is at the head of the slice.
func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 {
func (p *PostAggregator) calculateWeightedQPS(allQPS []float64) float64 {
if len(allQPS) == 0 {
return 0
}
Expand All @@ -363,14 +363,14 @@ func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 {
return qps
}

func (p *Processor) prependServiceCache() {
func (p *PostAggregator) prependServiceCache() {
p.serviceCache = append([]SamplingCache{make(SamplingCache)}, p.serviceCache...)
if len(p.serviceCache) > serviceCacheSize {
p.serviceCache = p.serviceCache[0:serviceCacheSize]
}
}

func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) {
func (p *PostAggregator) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) {
p.prependServiceCache()
retProbabilities := make(model.ServiceOperationProbabilities)
retQPS := make(model.ServiceOperationQPS)
Expand All @@ -394,7 +394,7 @@ func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbab
return retProbabilities, retQPS
}

func (p *Processor) calculateProbability(service, operation string, qps float64) float64 {
func (p *PostAggregator) calculateProbability(service, operation string, qps float64) float64 {
oldProbability := p.InitialSamplingProbability
// TODO: is this loop overly expensive?
p.RLock()
Expand Down Expand Up @@ -429,7 +429,7 @@ func (p *Processor) calculateProbability(service, operation string, qps float64)
}

// is actual value within p.DeltaTolerance percentage of expected value.
func (p *Processor) withinTolerance(actual, expected float64) bool {
func (p *PostAggregator) withinTolerance(actual, expected float64) bool {
return math.Abs(actual-expected)/expected < p.DeltaTolerance
}

Expand All @@ -441,7 +441,7 @@ func merge(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} {
return p1
}

func (p *Processor) isUsingAdaptiveSampling(
func (p *PostAggregator) isUsingAdaptiveSampling(
probability float64,
service string,
operation string,
Expand Down
36 changes: 18 additions & 18 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func testCalculator() calculationstrategy.ProbabilityCalculator {
}

func TestAggregateThroughputInputsImmutability(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
in := testThroughputs()
_ = p.aggregateThroughput(in)
assert.Equal(t, in, testThroughputs())
}

func TestAggregateThroughput(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
aggregatedThroughput := p.aggregateThroughput(testThroughputs())
require.Len(t, aggregatedThroughput, 2)

Expand Down Expand Up @@ -128,7 +128,7 @@ func TestInitializeThroughput(t *testing.T) {
Return([]*model.Throughput{{Service: "svcA", Operation: "GET", Count: 7}}, nil)
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*17), time.Time{}.Add(time.Minute*18)).
Return([]*model.Throughput{}, nil)
p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}}
p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

require.Len(t, p.throughputs, 2)
Expand All @@ -144,7 +144,7 @@ func TestInitializeThroughputFailure(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)).
Return(nil, errTestStorage())
p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

assert.Empty(t, p.throughputs)
Expand All @@ -159,7 +159,7 @@ func TestCalculateQPS(t *testing.T) {
}

func TestGenerateOperationQPS(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -207,7 +207,7 @@ func TestGenerateOperationQPS(t *testing.T) {
}

func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
}

func TestCalculateWeightedQPS(t *testing.T) {
p := Processor{weightVectorCache: NewWeightVectorCache()}
p := PostAggregator{weightVectorCache: NewWeightVectorCache()}
assert.InDelta(t, 0.86735, p.calculateWeightedQPS([]float64{0.8, 1.2, 1.0}), 0.001)
assert.InDelta(t, 0.95197, p.calculateWeightedQPS([]float64{1.0, 1.0, 0.0, 0.0}), 0.001)
assert.Equal(t, 0.0, p.calculateWeightedQPS([]float64{}))
Expand All @@ -268,7 +268,7 @@ func TestCalculateProbability(t *testing.T) {
InitialSamplingProbability: 0.001,
MinSamplingProbability: 0.00001,
}
p := &Processor{
p := &PostAggregator{
Options: cfg,
probabilities: probabilities,
probabilityCalculator: testCalculator(),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestCalculateProbabilitiesAndQPS(t *testing.T) {
},
}
mets := metricstest.NewFactory(0)
p := &Processor{
p := &PostAggregator{
Options: Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.2,
Expand Down Expand Up @@ -365,15 +365,15 @@ func TestRunCalculationLoop(t *testing.T) {

for i := 0; i < 1000; i++ {
agg.(*aggregator).Lock()
probabilities := agg.(*aggregator).processor.probabilities
probabilities := agg.(*aggregator).postAggregator.probabilities
agg.(*aggregator).Unlock()
if len(probabilities) != 0 {
break
}
time.Sleep(time.Millisecond)
}

probabilities := agg.(*aggregator).processor.probabilities
probabilities := agg.(*aggregator).postAggregator.probabilities
require.Len(t, probabilities["svcA"], 2)
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
}

func TestPrependBucket(t *testing.T) {
p := &Processor{Options: Options{AggregationBuckets: 1}}
p := &PostAggregator{Options: Options{AggregationBuckets: 1}}
p.prependThroughputBucket(&throughputBucket{interval: time.Minute})
require.Len(t, p.throughputs, 1)
assert.Equal(t, time.Minute, p.throughputs[0].interval)
Expand All @@ -541,17 +541,17 @@ func TestConstructorFailure(t *testing.T) {
CalculationInterval: time.Second * 5,
AggregationBuckets: 0,
}
_, err := newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err := newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0")

cfg.CalculationInterval = 0
_, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0")

cfg.CalculationInterval = time.Millisecond
cfg.AggregationBuckets = 1
cfg.BucketsForCalculation = -1
_, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "BucketsForCalculation cannot be less than 1")
}

Expand Down Expand Up @@ -591,7 +591,7 @@ func TestGenerateStrategyResponses(t *testing.T) {
}

func TestUsingAdaptiveSampling(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
throughput := serviceOperationThroughput{
"svc": map[string]*model.Throughput{
"op": {Probabilities: map[string]struct{}{"0.010000": {}}},
Expand All @@ -617,7 +617,7 @@ func TestUsingAdaptiveSampling(t *testing.T) {
}

func TestPrependServiceCache(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
for i := 0; i < serviceCacheSize*2; i++ {
p.prependServiceCache()
}
Expand All @@ -640,7 +640,7 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) {
},
}

p := &Processor{
p := &PostAggregator{
Options: Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.002,
Expand Down

0 comments on commit 655c398

Please sign in to comment.