Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
Signed-off-by: Pushkar Mishra <[email protected]>
  • Loading branch information
Pushkarm029 committed May 11, 2024
1 parent 5ddcaa9 commit bdf3e5a
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 31 deletions.
48 changes: 33 additions & 15 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -31,30 +34,38 @@ const (

type aggregator struct {
sync.Mutex
Options

operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
storage samplingstore.Store
processor *Processor
stop chan struct{}
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
stop: make(chan struct{}),
func NewAggregator(options Options, metricsFactory metrics.Factory, logger *zap.Logger, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
// this processor doesn't uses hostname, that's why it's empty
processor, err := newProcessor(options, "", store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
Options: options,
processor: processor,
storage: store,
stop: make(chan struct{}),
}, nil
}

func (a *aggregator) runAggregationLoop() {
ticker := time.NewTicker(a.aggregationInterval)
ticker := time.NewTicker(a.CalculationInterval)
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -111,10 +122,17 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
if err := a.processor.Start(); err != nil {
a.processor.logger.Error("Failed to start adaptive sampling processor", zap.Error(err))
}

go a.runAggregationLoop()
}

func (a *aggregator) Close() error {
if err := a.processor.Close(); err != nil {
return err
}
close(a.stop)
return nil
}
39 changes: 35 additions & 4 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,34 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

func TestAggregator(t *testing.T) {
t.Skip("Skipping flaky unit test")
metricsFactory := metricstest.NewFactory(0)

logger := zap.NewNop()
mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
// mockEP.On("Start").Return(nil)
// mockEP.On("Close").Return(nil)
// mockEP.On("IsLeader").Return(true)
cfg := Options{
CalculationInterval: time.Second * 1,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(cfg, metricsFactory, logger, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,24 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
logger := zap.NewNop()
mockEP := &epmocks.ElectionParticipant{}
cfg := Options{
CalculationInterval: time.Second * 10,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(cfg, metricsFactory, logger, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(cfg, metricsFactory, logger, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +102,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
logger := zap.NewNop()
mockEP := &epmocks.ElectionParticipant{}
cfg := Options{
CalculationInterval: time.Second * 10,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(cfg, metricsFactory, logger, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
17 changes: 14 additions & 3 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand Down Expand Up @@ -83,12 +84,22 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
participant := leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})

p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, participant, f.store)
if err != nil {
return nil, nil, err
}

a, err := NewAggregator(*f.options, f.metricsFactory, f.logger, participant, f.store)
if err != nil {
return nil, nil, err
}
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)

a.Start()
return p, a, nil
}
2 changes: 0 additions & 2 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,8 @@ func (p *Processor) Start() error {
return err
}
p.shutdown = make(chan struct{})
p.loadProbabilities()
p.generateStrategyResponses()
p.runBackground(p.runCalculationLoop)
p.runBackground(p.runUpdateProbabilitiesLoop)
return nil
}

Expand Down
11 changes: 4 additions & 7 deletions plugin/sampling/strategystore/adaptive/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,27 @@ package adaptive
import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

// NewStrategyStore creates a strategy store that holds adaptive sampling strategies.
func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) (*Processor, error) {
func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, participant leaderelection.ElectionParticipant, store samplingstore.Store) (*Processor, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

participant := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: options.LeaderLeaseRefreshInterval,
Logger: logger,
})
p, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

p.loadProbabilities()
p.runBackground(p.runUpdateProbabilitiesLoop)

return p, nil
}

0 comments on commit bdf3e5a

Please sign in to comment.