Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Adaptive Sampling Aggregator & Strategy Store #5441

Merged
merged 19 commits into from
May 21, 2024
2 changes: 2 additions & 0 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ type Factory interface {

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, Aggregator, error)

Close() error
}
3 changes: 3 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func main() {
if err := storageFactory.Close(); err != nil {
logger.Error("Failed to close storage factory", zap.Error(err))
}
if err := strategyStoreFactory.Close(); err != nil {
logger.Error("Failed to close sampling strategy store factory", zap.Error(err))
}
})
return nil
},
Expand Down
36 changes: 31 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -35,22 +39,36 @@ type aggregator struct {
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
processor *Processor
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
bgFinished sync.WaitGroup
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
aggregationInterval: options.CalculationInterval,
processor: processor,
storage: store,
stop: make(chan struct{}),
}
}, nil
}

func (a *aggregator) runAggregationLoop() {
Expand All @@ -61,6 +79,7 @@ func (a *aggregator) runAggregationLoop() {
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.processor.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -111,10 +130,17 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa
}

func (a *aggregator) Start() {
go a.runAggregationLoop()
a.processor.Start()

a.bgFinished.Add(1)
go func() {
a.runAggregationLoop()
a.bgFinished.Done()
}()
}

func (a *aggregator) Close() error {
close(a.stop)
a.bgFinished.Wait()
return nil
}
40 changes: 35 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand All @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) {

mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
21 changes: 19 additions & 2 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand All @@ -38,6 +39,7 @@ type Factory struct {
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
participant *leaderelection.DistributedElectionParticipant
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -70,6 +72,11 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S
var err error
f.logger = logger
f.metricsFactory = metricsFactory
f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})
f.lock, err = ssFactory.CreateLock()
if err != nil {
return err
Expand All @@ -83,12 +90,22 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, ssFactory storage.S

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
f.participant.Start()
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
p, err := NewStrategyStore(*f.options, f.logger, f.participant, f.store)
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, err
}
p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, f.participant, f.store)
if err != nil {
return nil, nil, err
}
a.Start()

return p, a, nil
}

// Closes the factory
func (f *Factory) Close() error {
return f.participant.Close()
}
Loading
Loading