Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Processor to PostAggregator #5479

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
processor *Processor
postAggregator *PostAggregator
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
Expand All @@ -55,7 +55,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
postAggregator, err := newPostAggregator(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}
Expand All @@ -65,7 +65,7 @@ func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.F
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: options.CalculationInterval,
processor: processor,
postAggregator: postAggregator,
storage: store,
stop: make(chan struct{}),
}, nil
Expand All @@ -79,7 +79,7 @@ func (a *aggregator) runAggregationLoop() {
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.processor.runCalculation()
a.postAggregator.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -130,7 +130,7 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
a.processor.Start()
a.postAggregator.Start()

a.bgFinished.Add(1)
go func() {
Expand Down
46 changes: 23 additions & 23 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ type throughputBucket struct {
endTime time.Time
}

// Processor retrieves service throughput over a look back interval and calculates sampling probabilities
// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities
// per operation such that each operation is sampled at a specified target QPS. It achieves this by
// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput
// and generating a probability to match the targetQPS.
type Processor struct {
type PostAggregator struct {
sync.RWMutex
Options

Expand Down Expand Up @@ -106,23 +106,23 @@ type Processor struct {
lastCheckedTime time.Time
}

// newProcessor creates a new sampling processor that generates sampling rates for service operations.
func newProcessor(
// newPostAggregator creates a new sampling postAggregator that generates sampling rates for service operations.
func newPostAggregator(
opts Options,
hostname string,
storage samplingstore.Store,
electionParticipant leaderelection.ElectionParticipant,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Processor, error) {
) (*PostAggregator, error) {
if opts.CalculationInterval == 0 || opts.AggregationBuckets == 0 {
return nil, errNonZero
}
if opts.BucketsForCalculation < 1 {
return nil, errBucketsForCalculation
}
metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "adaptive_sampling_processor"})
return &Processor{
return &PostAggregator{
Options: opts,
storage: storage,
probabilities: make(model.ServiceOperationProbabilities),
Expand Down Expand Up @@ -150,9 +150,9 @@ func (p *StrategyStore) GetSamplingStrategy(_ context.Context, service string) (
return p.generateDefaultSamplingStrategyResponse(), nil
}

// Start initializes and starts the sampling processor which regularly calculates sampling probabilities.
func (p *Processor) Start() error {
p.logger.Info("starting adaptive sampling processor")
// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
func (p *PostAggregator) Start() error {
p.logger.Info("starting adaptive sampling postAggregator")
// NB: the first tick will be slightly delayed by the initializeThroughput call.
p.lastCheckedTime = time.Now().Add(p.Delay * -1)
p.initializeThroughput(p.lastCheckedTime)
Expand Down Expand Up @@ -186,7 +186,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() {
for {
select {
case <-ticker.C:
// Only load probabilities if this processor doesn't hold the leader lock
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
Expand All @@ -197,7 +197,7 @@ func (p *StrategyStore) runUpdateProbabilitiesLoop() {
}
}

func (p *Processor) isLeader() bool {
func (p *PostAggregator) isLeader() bool {
return p.electionParticipant.IsLeader()
}

Expand All @@ -213,7 +213,7 @@ func addJitter(jitterAmount time.Duration) time.Duration {
return (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2)))
}

func (p *Processor) runCalculation() {
func (p *PostAggregator) runCalculation() {
endTime := time.Now().Add(p.Delay * -1)
startTime := p.lastCheckedTime
throughput, err := p.storage.GetThroughput(startTime, endTime)
Expand Down Expand Up @@ -251,15 +251,15 @@ func (p *Processor) runCalculation() {
}
}

func (p *Processor) saveProbabilitiesAndQPS() {
func (p *PostAggregator) saveProbabilitiesAndQPS() {
p.RLock()
defer p.RUnlock()
if err := p.storage.InsertProbabilitiesAndQPS(p.hostname, p.probabilities, p.qps); err != nil {
p.logger.Warn("could not save probabilities", zap.Error(err))
}
}

func (p *Processor) prependThroughputBucket(bucket *throughputBucket) {
func (p *PostAggregator) prependThroughputBucket(bucket *throughputBucket) {
p.throughputs = append([]*throughputBucket{bucket}, p.throughputs...)
if len(p.throughputs) > p.AggregationBuckets {
p.throughputs = p.throughputs[0:p.AggregationBuckets]
Expand All @@ -269,7 +269,7 @@ func (p *Processor) prependThroughputBucket(bucket *throughputBucket) {
// aggregateThroughput aggregates operation throughput from different buckets into one.
// All input buckets represent a single time range, but there are many of them because
// they are all independently generated by different collector instances from inbound span traffic.
func (p *Processor) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput {
func (p *PostAggregator) aggregateThroughput(throughputs []*model.Throughput) serviceOperationThroughput {
aggregatedThroughput := make(serviceOperationThroughput)
for _, throughput := range throughputs {
service := throughput.Service
Expand Down Expand Up @@ -301,7 +301,7 @@ func copySet(in map[string]struct{}) map[string]struct{} {
return out
}

func (p *Processor) initializeThroughput(endTime time.Time) {
func (p *PostAggregator) initializeThroughput(endTime time.Time) {
for i := 0; i < p.AggregationBuckets; i++ {
startTime := endTime.Add(p.CalculationInterval * -1)
throughput, err := p.storage.GetThroughput(startTime, endTime)
Expand All @@ -323,7 +323,7 @@ func (p *Processor) initializeThroughput(endTime time.Time) {
}

// throughputToQPS converts raw throughput counts for all accumulated buckets to QPS values.
func (p *Processor) throughputToQPS() serviceOperationQPS {
func (p *PostAggregator) throughputToQPS() serviceOperationQPS {
// TODO previous qps buckets have already been calculated, just need to calculate latest batch
// and append them where necessary and throw out the oldest batch.
// Edge case #buckets < p.AggregationBuckets, then we shouldn't throw out
Expand Down Expand Up @@ -351,7 +351,7 @@ func calculateQPS(count int64, interval time.Duration) float64 {

// calculateWeightedQPS calculates the weighted qps of the slice allQPS where weights are biased
// towards more recent qps. This function assumes that the most recent qps is at the head of the slice.
func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 {
func (p *PostAggregator) calculateWeightedQPS(allQPS []float64) float64 {
if len(allQPS) == 0 {
return 0
}
Expand All @@ -363,14 +363,14 @@ func (p *Processor) calculateWeightedQPS(allQPS []float64) float64 {
return qps
}

func (p *Processor) prependServiceCache() {
func (p *PostAggregator) prependServiceCache() {
p.serviceCache = append([]SamplingCache{make(SamplingCache)}, p.serviceCache...)
if len(p.serviceCache) > serviceCacheSize {
p.serviceCache = p.serviceCache[0:serviceCacheSize]
}
}

func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) {
func (p *PostAggregator) calculateProbabilitiesAndQPS() (model.ServiceOperationProbabilities, model.ServiceOperationQPS) {
p.prependServiceCache()
retProbabilities := make(model.ServiceOperationProbabilities)
retQPS := make(model.ServiceOperationQPS)
Expand All @@ -394,7 +394,7 @@ func (p *Processor) calculateProbabilitiesAndQPS() (model.ServiceOperationProbab
return retProbabilities, retQPS
}

func (p *Processor) calculateProbability(service, operation string, qps float64) float64 {
func (p *PostAggregator) calculateProbability(service, operation string, qps float64) float64 {
oldProbability := p.InitialSamplingProbability
// TODO: is this loop overly expensive?
p.RLock()
Expand Down Expand Up @@ -429,7 +429,7 @@ func (p *Processor) calculateProbability(service, operation string, qps float64)
}

// is actual value within p.DeltaTolerance percentage of expected value.
func (p *Processor) withinTolerance(actual, expected float64) bool {
func (p *PostAggregator) withinTolerance(actual, expected float64) bool {
return math.Abs(actual-expected)/expected < p.DeltaTolerance
}

Expand All @@ -441,7 +441,7 @@ func merge(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} {
return p1
}

func (p *Processor) isUsingAdaptiveSampling(
func (p *PostAggregator) isUsingAdaptiveSampling(
probability float64,
service string,
operation string,
Expand Down
36 changes: 18 additions & 18 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func testCalculator() calculationstrategy.ProbabilityCalculator {
}

func TestAggregateThroughputInputsImmutability(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
in := testThroughputs()
_ = p.aggregateThroughput(in)
assert.Equal(t, in, testThroughputs())
}

func TestAggregateThroughput(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
aggregatedThroughput := p.aggregateThroughput(testThroughputs())
require.Len(t, aggregatedThroughput, 2)

Expand Down Expand Up @@ -128,7 +128,7 @@ func TestInitializeThroughput(t *testing.T) {
Return([]*model.Throughput{{Service: "svcA", Operation: "GET", Count: 7}}, nil)
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*17), time.Time{}.Add(time.Minute*18)).
Return([]*model.Throughput{}, nil)
p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}}
p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 3}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

require.Len(t, p.throughputs, 2)
Expand All @@ -144,7 +144,7 @@ func TestInitializeThroughputFailure(t *testing.T) {
mockStorage := &smocks.Store{}
mockStorage.On("GetThroughput", time.Time{}.Add(time.Minute*19), time.Time{}.Add(time.Minute*20)).
Return(nil, errTestStorage())
p := &Processor{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p := &PostAggregator{storage: mockStorage, Options: Options{CalculationInterval: time.Minute, AggregationBuckets: 1}}
p.initializeThroughput(time.Time{}.Add(time.Minute * 20))

assert.Empty(t, p.throughputs)
Expand All @@ -159,7 +159,7 @@ func TestCalculateQPS(t *testing.T) {
}

func TestGenerateOperationQPS(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 10, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -207,7 +207,7 @@ func TestGenerateOperationQPS(t *testing.T) {
}

func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
p := &Processor{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
p := &PostAggregator{throughputs: testThroughputBuckets(), Options: Options{BucketsForCalculation: 1, AggregationBuckets: 10}}
svcOpQPS := p.throughputToQPS()
assert.Len(t, svcOpQPS, 2)

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
}

func TestCalculateWeightedQPS(t *testing.T) {
p := Processor{weightVectorCache: NewWeightVectorCache()}
p := PostAggregator{weightVectorCache: NewWeightVectorCache()}
assert.InDelta(t, 0.86735, p.calculateWeightedQPS([]float64{0.8, 1.2, 1.0}), 0.001)
assert.InDelta(t, 0.95197, p.calculateWeightedQPS([]float64{1.0, 1.0, 0.0, 0.0}), 0.001)
assert.Equal(t, 0.0, p.calculateWeightedQPS([]float64{}))
Expand All @@ -268,7 +268,7 @@ func TestCalculateProbability(t *testing.T) {
InitialSamplingProbability: 0.001,
MinSamplingProbability: 0.00001,
}
p := &Processor{
p := &PostAggregator{
Options: cfg,
probabilities: probabilities,
probabilityCalculator: testCalculator(),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestCalculateProbabilitiesAndQPS(t *testing.T) {
},
}
mets := metricstest.NewFactory(0)
p := &Processor{
p := &PostAggregator{
Options: Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.2,
Expand Down Expand Up @@ -365,15 +365,15 @@ func TestRunCalculationLoop(t *testing.T) {

for i := 0; i < 1000; i++ {
agg.(*aggregator).Lock()
probabilities := agg.(*aggregator).processor.probabilities
probabilities := agg.(*aggregator).postAggregator.probabilities
agg.(*aggregator).Unlock()
if len(probabilities) != 0 {
break
}
time.Sleep(time.Millisecond)
}

probabilities := agg.(*aggregator).processor.probabilities
probabilities := agg.(*aggregator).postAggregator.probabilities
require.Len(t, probabilities["svcA"], 2)
}

Expand Down Expand Up @@ -521,7 +521,7 @@ func TestRealisticRunCalculationLoop(t *testing.T) {
}

func TestPrependBucket(t *testing.T) {
p := &Processor{Options: Options{AggregationBuckets: 1}}
p := &PostAggregator{Options: Options{AggregationBuckets: 1}}
p.prependThroughputBucket(&throughputBucket{interval: time.Minute})
require.Len(t, p.throughputs, 1)
assert.Equal(t, time.Minute, p.throughputs[0].interval)
Expand All @@ -541,17 +541,17 @@ func TestConstructorFailure(t *testing.T) {
CalculationInterval: time.Second * 5,
AggregationBuckets: 0,
}
_, err := newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err := newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0")

cfg.CalculationInterval = 0
_, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "CalculationInterval and AggregationBuckets must be greater than 0")

cfg.CalculationInterval = time.Millisecond
cfg.AggregationBuckets = 1
cfg.BucketsForCalculation = -1
_, err = newProcessor(cfg, "host", nil, nil, metrics.NullFactory, logger)
_, err = newPostAggregator(cfg, "host", nil, nil, metrics.NullFactory, logger)
require.EqualError(t, err, "BucketsForCalculation cannot be less than 1")
}

Expand Down Expand Up @@ -591,7 +591,7 @@ func TestGenerateStrategyResponses(t *testing.T) {
}

func TestUsingAdaptiveSampling(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
throughput := serviceOperationThroughput{
"svc": map[string]*model.Throughput{
"op": {Probabilities: map[string]struct{}{"0.010000": {}}},
Expand All @@ -617,7 +617,7 @@ func TestUsingAdaptiveSampling(t *testing.T) {
}

func TestPrependServiceCache(t *testing.T) {
p := &Processor{}
p := &PostAggregator{}
for i := 0; i < serviceCacheSize*2; i++ {
p.prependServiceCache()
}
Expand All @@ -640,7 +640,7 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) {
},
}

p := &Processor{
p := &PostAggregator{
Options: Options{
TargetSamplesPerSecond: 1.0,
DeltaTolerance: 0.002,
Expand Down
Loading