Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Adaptive Sampling Aggregator & Strategy Store #5441

Merged
merged 19 commits into from
May 21, 2024
3 changes: 3 additions & 0 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ type Factory interface {

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, Aggregator, error)

// Close closes the factory
Close() error
}
3 changes: 3 additions & 0 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func main() {
if err := storageFactory.Close(); err != nil {
logger.Error("Failed to close storage factory", zap.Error(err))
}
if err := strategyStoreFactory.Close(); err != nil {
logger.Error("Failed to close sampling strategy store factory", zap.Error(err))
}
})
return nil
},
Expand Down
36 changes: 31 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
"sync"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
span_model "github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/hostname"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

Expand All @@ -35,22 +39,36 @@
operationsCounter metrics.Counter
servicesCounter metrics.Counter
currentThroughput serviceOperationThroughput
processor *Processor
aggregationInterval time.Duration
storage samplingstore.Store
stop chan struct{}
bgFinished sync.WaitGroup
}

// NewAggregator creates a throughput aggregator that simply emits metrics
// about the number of operations seen over the aggregationInterval.
func NewAggregator(metricsFactory metrics.Factory, interval time.Duration, storage samplingstore.Store) strategystore.Aggregator {
func NewAggregator(options Options, logger *zap.Logger, metricsFactory metrics.Factory, participant leaderelection.ElectionParticipant, store samplingstore.Store) (strategystore.Aggregator, error) {
hostname, err := hostname.AsIdentifier()
if err != nil {
return nil, err

Check warning on line 54 in plugin/sampling/strategystore/adaptive/aggregator.go

View check run for this annotation

Codecov / codecov/patch

plugin/sampling/strategystore/adaptive/aggregator.go#L54

Added line #L54 was not covered by tests
}
logger.Info("Using unique participantName in adaptive sampling", zap.String("participantName", hostname))

processor, err := newProcessor(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}

return &aggregator{
operationsCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_operations"}),
servicesCounter: metricsFactory.Counter(metrics.Options{Name: "sampling_services"}),
currentThroughput: make(serviceOperationThroughput),
aggregationInterval: interval,
storage: storage,
aggregationInterval: options.CalculationInterval,
processor: processor,
storage: store,
stop: make(chan struct{}),
}
}, nil
}

func (a *aggregator) runAggregationLoop() {
Expand All @@ -61,6 +79,7 @@
a.Lock()
a.saveThroughput()
a.currentThroughput = make(serviceOperationThroughput)
a.processor.runCalculation()
a.Unlock()
case <-a.stop:
ticker.Stop()
Expand Down Expand Up @@ -111,10 +130,17 @@
}

func (a *aggregator) Start() {
go a.runAggregationLoop()
a.processor.Start()

a.bgFinished.Add(1)
go func() {
a.runAggregationLoop()
a.bgFinished.Done()
}()
}

func (a *aggregator) Close() error {
close(a.stop)
a.bgFinished.Wait()
return nil
}
40 changes: 35 additions & 5 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
)

Expand All @@ -32,8 +35,19 @@ func TestAggregator(t *testing.T) {

mockStorage := &mocks.Store{}
mockStorage.On("InsertThroughput", mock.AnythingOfType("[]*model.Throughput")).Return(nil)
mockEP := &epmocks.ElectionParticipant{}
mockEP.On("Start").Return(nil)
mockEP.On("Close").Return(nil)
mockEP.On("IsLeader").Return(true)
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("B", "POST", model.SamplerTypeProbabilistic, 0.001)
a.RecordThroughput("C", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -60,15 +74,23 @@ func TestAggregator(t *testing.T) {
func TestIncrementThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 different probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001*float64(i))
}
assert.Len(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities, 10)

a = NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err = NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
// 20 of the same probabilities
for i := 0; i < 20; i++ {
a.RecordThroughput("A", "GET", model.SamplerTypeProbabilistic, 0.001)
Expand All @@ -79,8 +101,16 @@ func TestIncrementThroughput(t *testing.T) {
func TestLowerboundThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()

a := NewAggregator(metricsFactory, 5*time.Millisecond, mockStorage)
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)
a.RecordThroughput("A", "GET", model.SamplerTypeLowerBound, 0.001)
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
Expand Down
24 changes: 21 additions & 3 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)
Expand All @@ -38,6 +39,7 @@
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
participant *leaderelection.DistributedElectionParticipant
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -66,7 +68,6 @@
if ssFactory == nil {
return errors.New("sampling store factory is nil. Please configure a backend that supports adaptive sampling")
}

var err error
f.logger = logger
f.metricsFactory = metricsFactory
Expand All @@ -78,17 +79,34 @@
if err != nil {
return err
}
f.participant = leaderelection.NewElectionParticipant(f.lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: f.options.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: f.options.LeaderLeaseRefreshInterval,
Logger: f.logger,
})
f.participant.Start()

return nil
}

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, strategystore.Aggregator, error) {
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
p, err := NewStrategyStore(*f.options, f.logger, f.participant, f.store)
Pushkarm029 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, err

Check warning on line 96 in plugin/sampling/strategystore/adaptive/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/sampling/strategystore/adaptive/factory.go#L96

Added line #L96 was not covered by tests
}
a, err := NewAggregator(*f.options, f.logger, f.metricsFactory, f.participant, f.store)
if err != nil {
return nil, nil, err
}

p.Start()
a := NewAggregator(f.metricsFactory, f.options.CalculationInterval, f.store)
a.Start()

return p, a, nil
}

// Closes the factory
func (f *Factory) Close() error {
return f.participant.Close()
}
2 changes: 2 additions & 0 deletions plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, store.Close())
require.NoError(t, aggregator.Close())
require.NoError(t, f.Close())
}

func TestBadConfigFail(t *testing.T) {
Expand All @@ -97,6 +98,7 @@ func TestBadConfigFail(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, &mockSamplingStoreFactory{}, zap.NewNop()))
_, _, err := f.CreateStrategyStore()
require.Error(t, err)
require.NoError(t, f.Close())
}
}

Expand Down
Loading
Loading