From e4ab1253ac3752aab87a72954500aebf25c17c8a Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 6 Jun 2024 23:41:36 +0530 Subject: [PATCH] rebase Signed-off-by: pushkarm029 --- cmd/jaeger/config-badger.yaml | 17 +- cmd/jaeger/config-cassandra.yaml | 17 +- cmd/jaeger/config-elasticsearch.yaml | 16 +- cmd/jaeger/config-opensearch.yaml | 16 +- cmd/jaeger/config.yaml | 16 +- cmd/jaeger/internal/components.go | 6 +- .../extension/remotesampling/config.go | 80 ++++++ .../extension/remotesampling/extension.go | 259 ++++++++++++++++++ .../remotesampling/extension_test.go | 33 +++ .../extension/remotesampling/factory.go | 70 +++++ .../extension/remotesampling/factory_test.go | 28 ++ .../extension/remotesampling/package_test.go | 14 + .../processors/adaptivesampling/config.go | 47 ++++ .../processors/adaptivesampling/factory.go | 66 +++++ .../adaptivesampling/package_test.go | 14 + .../processors/adaptivesampling/processor.go | 95 +++++++ cmd/jaeger/sampling-strategies.json | 18 ++ pkg/clientcfg/clientcfghttp/handler.go | 11 + .../strategystore/adaptive/aggregator.go | 17 ++ 19 files changed, 828 insertions(+), 12 deletions(-) create mode 100644 cmd/jaeger/internal/extension/remotesampling/config.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/extension.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/extension_test.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/factory.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/factory_test.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/package_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/config.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/factory.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/package_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/processor.go create mode 100644 cmd/jaeger/sampling-strategies.json diff --git a/cmd/jaeger/config-badger.yaml b/cmd/jaeger/config-badger.yaml index 4643c9cc75a..a683eb2a52f 100644 --- a/cmd/jaeger/config-badger.yaml +++ b/cmd/jaeger/config-badger.yaml @@ -1,12 +1,23 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: badger_main + http: + grpc: + + jaeger_query: trace_storage: badger_main trace_storage_archive: badger_archive @@ -37,6 +48,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: badger_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 39cfb319489..26b7485c89e 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: cassandra_main + http: + grpc: + jaeger_query: trace_storage: cassandra_main trace_storage_archive: cassandra_archive @@ -47,6 +57,9 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: cassandra_main + exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 21257f920d6..028b4b46084 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: es_main + http: + grpc: + jaeger_query: trace_storage: es_main trace_storage_archive: es_archive @@ -35,6 +45,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: es_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index c7acf7d8f53..453f638bedf 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: os_main + http: + grpc: + jaeger_query: trace_storage: os_main trace_storage_archive: os_archive @@ -36,6 +46,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: os_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 3be3a37af5d..7d76ea0d02f 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -1,9 +1,9 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp, jaeger, zipkin] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: @@ -13,6 +13,16 @@ extensions: # zpages: # endpoint: 0.0.0.0:55679 + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: memstore + http: + grpc: + jaeger_query: trace_storage: memstore trace_storage_archive: memstore_archive @@ -42,6 +52,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: memstore exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index c7b9957dbd8..f7161ffef22 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -29,7 +29,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling" ) type builders struct { @@ -62,7 +64,7 @@ func (b builders) build() (otelcol.Factories, error) { jaegerquery.NewFactory(), jaegerstorage.NewFactory(), storagecleaner.NewFactory(), - // TODO add adaptive sampling + remotesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err @@ -99,7 +101,7 @@ func (b builders) build() (otelcol.Factories, error) { batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), // add-ons - // TODO add adaptive sampling + adaptivesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go new file mode 100644 index 00000000000..97536bf3398 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -0,0 +1,80 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "errors" + "reflect" + "time" + + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" +) + +var ( + errNoSource = errors.New("no sampling strategy specified, has to be either 'adaptive' or 'file'") + errMultipleSource = errors.New("only one sampling strategy can be specified, has to be either 'adaptive' or 'file'") +) + +type FileConfig struct { + // File specifies a local file as the strategies source + Path string `mapstructure:"path"` +} + +type AdaptiveConfig struct { + // name of the strategy storage defined in the jaegerstorage extension + StrategyStore string `mapstructure:"strategy_store"` + + // InitialSamplingProbability is the initial sampling probability for all new operations. + InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` + + // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if + // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the + // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for + // all operations. + // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. + AggregationBuckets int `mapstructure:"aggregation_buckets"` + + // MinSamplesPerSecond determines the min number of traces that are sampled per second. + // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do + // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations + // that may never be sampled by the probabilistic sampler. + MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` + + // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before + // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval + // to reduce lock thrashing. + LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` + + // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower + // (ie. failed to gain the leader lock). + FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` +} + +type Config struct { + File FileConfig `mapstructure:"file"` + Adaptive AdaptiveConfig `mapstructure:"adaptive"` + HTTP HTTPConfig `mapstructure:"http"` + GRPC GRPCConfig `mapstructure:"grpc"` +} + +type HTTPConfig struct { + confighttp.ServerConfig `mapstructure:",squash"` +} + +type GRPCConfig struct { + configgrpc.ServerConfig `mapstructure:",squash"` +} + +func (cfg *Config) Validate() error { + emptyCfg := createDefaultConfig().(*Config) + if reflect.DeepEqual(*cfg, *emptyCfg) { + return errNoSource + } + + if cfg.File.Path != "" && cfg.Adaptive.StrategyStore != "" { + return errMultipleSource + } + return nil +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go new file mode 100644 index 00000000000..fc272e0e85b --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -0,0 +1,259 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +var _ extension.Extension = (*rsExtension)(nil) + +const defaultResourceName = "sampling_store_leader" + +type rsExtension struct { + cfg *Config + telemetry component.TelemetrySettings + httpServer *http.Server + grpcServer *grpc.Server + strategyStore strategystore.StrategyStore + shutdownWG sync.WaitGroup +} + +func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension { + return &rsExtension{ + cfg: cfg, + telemetry: telemetry, + } +} + +func GetExtensionConfig(host component.Host) (AdaptiveConfig, error) { + var comp component.Component + for id, ext := range host.GetExtensions() { + if id.Type() == componentType { + comp = ext + break + } + } + if comp == nil { + return AdaptiveConfig{}, fmt.Errorf( + "cannot find extension '%s' (make sure it's defined earlier in the config)", + componentType, + ) + } + return comp.(*rsExtension).cfg.Adaptive, nil +} + +// GetSamplingStorage retrieves a storage factory from `jaegerstorage` extension +// and uses it to create a sampling store and a loader/follower implementation. +func GetSamplingStorage( + samplingStorage string, + host component.Host, + opts adaptive.Options, + logger *zap.Logger, +) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) { + f, err := jaegerstorage.GetStorageFactory(samplingStorage, host) + if err != nil { + return nil, nil, fmt.Errorf("cannot find storage factory: %w", err) + } + + ssStore, ok := f.(storage.SamplingStoreFactory) + if !ok { + return nil, nil, fmt.Errorf("storage factory of type %s does not support sampling store", samplingStorage) + } + + lock, err := ssStore.CreateLock() + if err != nil { + return nil, nil, err + } + + store, err := ssStore.CreateSamplingStore(opts.AggregationBuckets) + if err != nil { + return nil, nil, err + } + + ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{ + FollowerLeaseRefreshInterval: opts.FollowerLeaseRefreshInterval, + LeaderLeaseRefreshInterval: opts.LeaderLeaseRefreshInterval, + Logger: logger, + }) + + return store, ep, nil +} + +func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { + if ext.cfg.File.Path != "" { + // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. + //nolint + if err := ext.startFileStrategyStore(); err != nil { + return err + } + } + + if ext.cfg.Adaptive.StrategyStore != "" { + if err := ext.startAdaptiveStrategyStore(host); err != nil { + return err + } + } + + if err := ext.startHTTPServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling http server: %w", err) + } + + if err := ext.startGRPCServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling gRPC server: %w", err) + } + + return nil +} + +func (ext *rsExtension) Shutdown(ctx context.Context) error { + var errs []error + + if ext.httpServer != nil { + if err := ext.httpServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the sampling HTTP server: %w", err)) + } + } + + if ext.grpcServer != nil { + ext.grpcServer.GracefulStop() + } + + if ext.strategyStore != nil { + if err := ext.strategyStore.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop strategy store: %w", err)) + } + } + return errors.Join(errs...) +} + +func (ext *rsExtension) startFileStrategyStore() error { + opts := static.Options{ + StrategiesFile: ext.cfg.File.Path, + } + + // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. + //nolint + ss, err := static.NewStrategyStore(opts, ext.telemetry.Logger) + if err != nil { + return fmt.Errorf("failed to create the local file strategy store: %w", err) + } + + ext.strategyStore = ss + return nil +} + +func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { + opts := adaptive.Options{ + InitialSamplingProbability: ext.cfg.Adaptive.InitialSamplingProbability, + MinSamplesPerSecond: ext.cfg.Adaptive.MinSamplesPerSecond, + LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, + AggregationBuckets: ext.cfg.Adaptive.AggregationBuckets, + } + + store, ep, err := GetSamplingStorage(ext.cfg.Adaptive.StrategyStore, host, opts, ext.telemetry.Logger) + if err != nil { + return err + } + + ss := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ep, store) + ss.Start() + ext.strategyStore = ss + return nil +} + +func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { + var err error + if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { + return err + } + + healthServer := health.NewServer() + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyStore)) + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) + ext.telemetry.Logger.Info("Starting GRPC server", zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint)) + + var gln net.Listener + if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + if errGrpc := ext.grpcServer.Serve(gln); errGrpc != nil && !errors.Is(errGrpc, grpc.ErrServerStopped) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(errGrpc)) + } + }() + + return nil +} + +func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { + httpMux := http.NewServeMux() + + handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ + ConfigManager: &clientcfghttp.ConfigManager{ + SamplingStrategyStore: ext.strategyStore, + }, + MetricsFactory: metrics.NullFactory, + BasePath: "/api", + }) + + handler.RegisterRoutesWithHTTP(httpMux) + + var err error + if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { + return err + } + + ext.telemetry.Logger.Info("Starting HTTP server", zap.String("endpoint", ext.cfg.HTTP.ServerConfig.Endpoint)) + var hln net.Listener + if hln, err = ext.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + err := ext.httpServer.Serve(hln) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (*rsExtension) Dependencies() []component.ID { + return []component.ID{jaegerstorage.ID} +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go new file mode 100644 index 00000000000..e5b60eb7cb1 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestNewExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) + + assert.NotNil(t, e) +} + +func TestStartAndShutdownLocalFile(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + + e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, e) + require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + + require.NoError(t, e.Shutdown(context.Background())) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go new file mode 100644 index 00000000000..d65d2646e2b --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -0,0 +1,70 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + + "github.com/jaegertracing/jaeger/ports" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("remote_sampling") + +const ( + defaultAggregationBuckets = 10 + defaultInitialSamplingProbability = 0.001 + defaultMinSamplesPerSecond = 1.0 / float64(time.Minute/time.Second) // once every 1 minute + defaultLeaderLeaseRefreshInterval = 5 * time.Second + defaultFollowerLeaseRefreshInterval = 60 * time.Second +) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + componentType, + createDefaultConfig, + createExtension, + component.StabilityLevelBeta, + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + HTTP: HTTPConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP), + }, + }, + GRPC: GRPCConfig{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorGRPC), + Transport: confignet.TransportTypeTCP, + }, + }, + }, + File: FileConfig{ + Path: "", + }, + Adaptive: AdaptiveConfig{ + InitialSamplingProbability: defaultInitialSamplingProbability, + MinSamplesPerSecond: defaultMinSamplesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + AggregationBuckets: defaultAggregationBuckets, + }, + } +} + +func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + return newExtension(cfg.(*Config), set.TelemetrySettings), nil +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory_test.go b/cmd/jaeger/internal/extension/remotesampling/factory_test.go new file mode 100644 index 00000000000..706edf437ed --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory_test.go @@ -0,0 +1,28 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension/extensiontest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + f := NewFactory() + r, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/package_test.go b/cmd/jaeger/internal/extension/remotesampling/package_test.go new file mode 100644 index 00000000000..5bd9ea71735 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go new file mode 100644 index 00000000000..3abd0df99c4 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -0,0 +1,47 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import "time" + +type Config struct { + // name of the strategy storage defined in the jaegerstorage extension + StrategyStore string `mapstructure:"strategy_store"` + + // TargetSamplesPerSecond is the global target rate of samples per operation. + // TODO implement manual overrides per service/operation. + TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` + + // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) + // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) + // means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough" + // and the system does not need to send an updated sampling probability (the "control signal" u(t) + // in the PID Controller terminology) to the sampler in the application. + // + // Increase this to reduce the amount of fluctuation in the calculated probabilities. + DeltaTolerance float64 `mapstructure:"delta_tolerance"` + + // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, + // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth + // of aggregated throughput data. + CalculationInterval time.Duration `mapstructure:"calculation_interval"` + + // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, + // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. + BucketsForCalculation int `mapstructure:"buckets_for_calculation"` + + // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval + // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time + // we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations + // off of. This delay is necessary to counteract the rate at which the SDKs poll for + // the latest sampling probabilities. The default client poll rate is 1 minute, which means that + // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly + // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can + // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. + Delay time.Duration `mapstructure:"delay"` + + // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling + // probability will be in the range [MinSamplingProbability, 1.0]. + MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go new file mode 100644 index 00000000000..cf872ad322c --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -0,0 +1,66 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("adaptive_sampling") + +const ( + defaultTargetSamplesPerSecond = 1 + defaultDeltaTolerance = 0.3 + defaultBucketsForCalculation = 1 + defaultCalculationInterval = time.Minute + defaultAggregationBuckets = 10 + defaultDelay = time.Minute * 2 + defaultMinSamplingProbability = 1e-5 // one in 100k requests +) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelBeta), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + TargetSamplesPerSecond: defaultTargetSamplesPerSecond, + DeltaTolerance: defaultDeltaTolerance, + CalculationInterval: defaultCalculationInterval, + BucketsForCalculation: defaultBucketsForCalculation, + Delay: defaultDelay, + MinSamplingProbability: defaultMinSamplingProbability, + } +} + +func createTracesProcessor( + ctx context.Context, + set processor.CreateSettings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + sp := newTraceProcessor(*oCfg, set.TelemetrySettings) + return processorhelper.NewTracesProcessor( + ctx, + set, + cfg, + nextConsumer, + sp.processTraces, + processorhelper.WithStart(sp.start), + processorhelper.WithShutdown(sp.close), + ) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/package_test.go b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go new file mode 100644 index 00000000000..10d464704eb --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go new file mode 100644 index 00000000000..df654559472 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -0,0 +1,95 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "fmt" + + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" +) + +type traceProcessor struct { + config *Config + logger *zap.Logger + aggregator strategystore.Aggregator +} + +func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProcessor { + return &traceProcessor{ + config: &cfg, + logger: otel.Logger, + } +} + +func (tp *traceProcessor) start(_ context.Context, host component.Host) error { + extCfg, err := remotesampling.GetExtensionConfig(host) + + tp.logger.Info("Adaptive sampling extension config", zap.Any("config", extCfg)) + + if err != nil { + return err + } + opts := adaptive.Options{ + InitialSamplingProbability: extCfg.InitialSamplingProbability, + TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond, + DeltaTolerance: tp.config.DeltaTolerance, + CalculationInterval: tp.config.CalculationInterval, + AggregationBuckets: extCfg.AggregationBuckets, + BucketsForCalculation: tp.config.BucketsForCalculation, + Delay: tp.config.Delay, + MinSamplingProbability: tp.config.MinSamplingProbability, + LeaderLeaseRefreshInterval: extCfg.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: extCfg.FollowerLeaseRefreshInterval, + } + + store, ep, err := remotesampling.GetSamplingStorage(tp.config.StrategyStore, host, opts, tp.logger) + if err != nil { + return err + } + + agg, err := adaptive.NewAggregator(opts, tp.logger, metrics.NullFactory, ep, store) + if err != nil { + return err + } + + agg.Start() + tp.aggregator = agg + + return nil +} + +func (tp *traceProcessor) close(context.Context) error { + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the adpative sampling aggregator : %w", err) + } + } + return nil +} + +func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches, err := otlp2jaeger.ProtoFromTraces(td) + if err != nil { + return td, fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err) + } + + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + adaptive.RecordThroughput(tp.aggregator, span, tp.logger) + } + } + return td, nil +} diff --git a/cmd/jaeger/sampling-strategies.json b/cmd/jaeger/sampling-strategies.json new file mode 100644 index 00000000000..6928e6d0436 --- /dev/null +++ b/cmd/jaeger/sampling-strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.1 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 1 + } + ] +} diff --git a/pkg/clientcfg/clientcfghttp/handler.go b/pkg/clientcfg/clientcfghttp/handler.go index ab2cef08c5d..7bc22be9a4e 100644 --- a/pkg/clientcfg/clientcfghttp/handler.go +++ b/pkg/clientcfg/clientcfghttp/handler.go @@ -109,6 +109,17 @@ func (h *HTTPHandler) RegisterRoutes(router *mux.Router) { }).Methods(http.MethodGet) } +// RegisterRoutes registers configuration handlers with HTTP Router. +func (h *HTTPHandler) RegisterRoutesWithHTTP(router *http.ServeMux) { + prefix := h.params.BasePath + router.HandleFunc( + prefix+"/", + func(w http.ResponseWriter, r *http.Request) { + h.serveSamplingHTTP(w, r, h.encodeThriftLegacy) + }, + ) +} + func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) { services := r.URL.Query()["service"] if len(services) != 1 { diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index f933be71cf6..33b64b3a615 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -129,6 +129,23 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } } +func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logger *zap.Logger) { + // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} + func (a *aggregator) Start() { a.postAggregator.Start()