From e4ab1253ac3752aab87a72954500aebf25c17c8a Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Thu, 6 Jun 2024 23:41:36 +0530 Subject: [PATCH 01/12] rebase Signed-off-by: pushkarm029 --- cmd/jaeger/config-badger.yaml | 17 +- cmd/jaeger/config-cassandra.yaml | 17 +- cmd/jaeger/config-elasticsearch.yaml | 16 +- cmd/jaeger/config-opensearch.yaml | 16 +- cmd/jaeger/config.yaml | 16 +- cmd/jaeger/internal/components.go | 6 +- .../extension/remotesampling/config.go | 80 ++++++ .../extension/remotesampling/extension.go | 259 ++++++++++++++++++ .../remotesampling/extension_test.go | 33 +++ .../extension/remotesampling/factory.go | 70 +++++ .../extension/remotesampling/factory_test.go | 28 ++ .../extension/remotesampling/package_test.go | 14 + .../processors/adaptivesampling/config.go | 47 ++++ .../processors/adaptivesampling/factory.go | 66 +++++ .../adaptivesampling/package_test.go | 14 + .../processors/adaptivesampling/processor.go | 95 +++++++ cmd/jaeger/sampling-strategies.json | 18 ++ pkg/clientcfg/clientcfghttp/handler.go | 11 + .../strategystore/adaptive/aggregator.go | 17 ++ 19 files changed, 828 insertions(+), 12 deletions(-) create mode 100644 cmd/jaeger/internal/extension/remotesampling/config.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/extension.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/extension_test.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/factory.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/factory_test.go create mode 100644 cmd/jaeger/internal/extension/remotesampling/package_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/config.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/factory.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/package_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/processor.go create mode 100644 cmd/jaeger/sampling-strategies.json diff --git a/cmd/jaeger/config-badger.yaml b/cmd/jaeger/config-badger.yaml index 4643c9cc75a..a683eb2a52f 100644 --- a/cmd/jaeger/config-badger.yaml +++ b/cmd/jaeger/config-badger.yaml @@ -1,12 +1,23 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: badger_main + http: + grpc: + + jaeger_query: trace_storage: badger_main trace_storage_archive: badger_archive @@ -37,6 +48,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: badger_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 39cfb319489..26b7485c89e 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: cassandra_main + http: + grpc: + jaeger_query: trace_storage: cassandra_main trace_storage_archive: cassandra_archive @@ -47,6 +57,9 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: cassandra_main + exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 21257f920d6..028b4b46084 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: es_main + http: + grpc: + jaeger_query: trace_storage: es_main trace_storage_archive: es_archive @@ -35,6 +45,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: es_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index c7acf7d8f53..453f638bedf 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -1,12 +1,22 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: os_main + http: + grpc: + jaeger_query: trace_storage: os_main trace_storage_archive: os_archive @@ -36,6 +46,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: os_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 3be3a37af5d..7d76ea0d02f 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -1,9 +1,9 @@ service: - extensions: [jaeger_storage, jaeger_query] + extensions: [jaeger_storage, jaeger_query, remote_sampling] pipelines: traces: receivers: [otlp, jaeger, zipkin] - processors: [batch] + processors: [batch, adaptive_sampling] exporters: [jaeger_storage_exporter] extensions: @@ -13,6 +13,16 @@ extensions: # zpages: # endpoint: 0.0.0.0:55679 + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + initial_sampling_probability: 0.1 + strategy_store: memstore + http: + grpc: + jaeger_query: trace_storage: memstore trace_storage_archive: memstore_archive @@ -42,6 +52,8 @@ receivers: processors: batch: + adaptive_sampling: + strategy_store: memstore exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index c7b9957dbd8..f7161ffef22 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -29,7 +29,9 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling" ) type builders struct { @@ -62,7 +64,7 @@ func (b builders) build() (otelcol.Factories, error) { jaegerquery.NewFactory(), jaegerstorage.NewFactory(), storagecleaner.NewFactory(), - // TODO add adaptive sampling + remotesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err @@ -99,7 +101,7 @@ func (b builders) build() (otelcol.Factories, error) { batchprocessor.NewFactory(), memorylimiterprocessor.NewFactory(), // add-ons - // TODO add adaptive sampling + adaptivesampling.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go new file mode 100644 index 00000000000..97536bf3398 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -0,0 +1,80 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "errors" + "reflect" + "time" + + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" +) + +var ( + errNoSource = errors.New("no sampling strategy specified, has to be either 'adaptive' or 'file'") + errMultipleSource = errors.New("only one sampling strategy can be specified, has to be either 'adaptive' or 'file'") +) + +type FileConfig struct { + // File specifies a local file as the strategies source + Path string `mapstructure:"path"` +} + +type AdaptiveConfig struct { + // name of the strategy storage defined in the jaegerstorage extension + StrategyStore string `mapstructure:"strategy_store"` + + // InitialSamplingProbability is the initial sampling probability for all new operations. + InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` + + // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if + // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the + // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for + // all operations. + // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. + AggregationBuckets int `mapstructure:"aggregation_buckets"` + + // MinSamplesPerSecond determines the min number of traces that are sampled per second. + // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do + // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations + // that may never be sampled by the probabilistic sampler. + MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` + + // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before + // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval + // to reduce lock thrashing. + LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` + + // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower + // (ie. failed to gain the leader lock). + FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` +} + +type Config struct { + File FileConfig `mapstructure:"file"` + Adaptive AdaptiveConfig `mapstructure:"adaptive"` + HTTP HTTPConfig `mapstructure:"http"` + GRPC GRPCConfig `mapstructure:"grpc"` +} + +type HTTPConfig struct { + confighttp.ServerConfig `mapstructure:",squash"` +} + +type GRPCConfig struct { + configgrpc.ServerConfig `mapstructure:",squash"` +} + +func (cfg *Config) Validate() error { + emptyCfg := createDefaultConfig().(*Config) + if reflect.DeepEqual(*cfg, *emptyCfg) { + return errNoSource + } + + if cfg.File.Path != "" && cfg.Adaptive.StrategyStore != "" { + return errMultipleSource + } + return nil +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go new file mode 100644 index 00000000000..fc272e0e85b --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -0,0 +1,259 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +var _ extension.Extension = (*rsExtension)(nil) + +const defaultResourceName = "sampling_store_leader" + +type rsExtension struct { + cfg *Config + telemetry component.TelemetrySettings + httpServer *http.Server + grpcServer *grpc.Server + strategyStore strategystore.StrategyStore + shutdownWG sync.WaitGroup +} + +func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension { + return &rsExtension{ + cfg: cfg, + telemetry: telemetry, + } +} + +func GetExtensionConfig(host component.Host) (AdaptiveConfig, error) { + var comp component.Component + for id, ext := range host.GetExtensions() { + if id.Type() == componentType { + comp = ext + break + } + } + if comp == nil { + return AdaptiveConfig{}, fmt.Errorf( + "cannot find extension '%s' (make sure it's defined earlier in the config)", + componentType, + ) + } + return comp.(*rsExtension).cfg.Adaptive, nil +} + +// GetSamplingStorage retrieves a storage factory from `jaegerstorage` extension +// and uses it to create a sampling store and a loader/follower implementation. +func GetSamplingStorage( + samplingStorage string, + host component.Host, + opts adaptive.Options, + logger *zap.Logger, +) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) { + f, err := jaegerstorage.GetStorageFactory(samplingStorage, host) + if err != nil { + return nil, nil, fmt.Errorf("cannot find storage factory: %w", err) + } + + ssStore, ok := f.(storage.SamplingStoreFactory) + if !ok { + return nil, nil, fmt.Errorf("storage factory of type %s does not support sampling store", samplingStorage) + } + + lock, err := ssStore.CreateLock() + if err != nil { + return nil, nil, err + } + + store, err := ssStore.CreateSamplingStore(opts.AggregationBuckets) + if err != nil { + return nil, nil, err + } + + ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{ + FollowerLeaseRefreshInterval: opts.FollowerLeaseRefreshInterval, + LeaderLeaseRefreshInterval: opts.LeaderLeaseRefreshInterval, + Logger: logger, + }) + + return store, ep, nil +} + +func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { + if ext.cfg.File.Path != "" { + // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. + //nolint + if err := ext.startFileStrategyStore(); err != nil { + return err + } + } + + if ext.cfg.Adaptive.StrategyStore != "" { + if err := ext.startAdaptiveStrategyStore(host); err != nil { + return err + } + } + + if err := ext.startHTTPServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling http server: %w", err) + } + + if err := ext.startGRPCServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling gRPC server: %w", err) + } + + return nil +} + +func (ext *rsExtension) Shutdown(ctx context.Context) error { + var errs []error + + if ext.httpServer != nil { + if err := ext.httpServer.Shutdown(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the sampling HTTP server: %w", err)) + } + } + + if ext.grpcServer != nil { + ext.grpcServer.GracefulStop() + } + + if ext.strategyStore != nil { + if err := ext.strategyStore.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop strategy store: %w", err)) + } + } + return errors.Join(errs...) +} + +func (ext *rsExtension) startFileStrategyStore() error { + opts := static.Options{ + StrategiesFile: ext.cfg.File.Path, + } + + // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. + //nolint + ss, err := static.NewStrategyStore(opts, ext.telemetry.Logger) + if err != nil { + return fmt.Errorf("failed to create the local file strategy store: %w", err) + } + + ext.strategyStore = ss + return nil +} + +func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { + opts := adaptive.Options{ + InitialSamplingProbability: ext.cfg.Adaptive.InitialSamplingProbability, + MinSamplesPerSecond: ext.cfg.Adaptive.MinSamplesPerSecond, + LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, + AggregationBuckets: ext.cfg.Adaptive.AggregationBuckets, + } + + store, ep, err := GetSamplingStorage(ext.cfg.Adaptive.StrategyStore, host, opts, ext.telemetry.Logger) + if err != nil { + return err + } + + ss := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ep, store) + ss.Start() + ext.strategyStore = ss + return nil +} + +func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { + var err error + if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { + return err + } + + healthServer := health.NewServer() + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyStore)) + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) + ext.telemetry.Logger.Info("Starting GRPC server", zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint)) + + var gln net.Listener + if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + if errGrpc := ext.grpcServer.Serve(gln); errGrpc != nil && !errors.Is(errGrpc, grpc.ErrServerStopped) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(errGrpc)) + } + }() + + return nil +} + +func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { + httpMux := http.NewServeMux() + + handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ + ConfigManager: &clientcfghttp.ConfigManager{ + SamplingStrategyStore: ext.strategyStore, + }, + MetricsFactory: metrics.NullFactory, + BasePath: "/api", + }) + + handler.RegisterRoutesWithHTTP(httpMux) + + var err error + if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { + return err + } + + ext.telemetry.Logger.Info("Starting HTTP server", zap.String("endpoint", ext.cfg.HTTP.ServerConfig.Endpoint)) + var hln net.Listener + if hln, err = ext.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + return err + } + + ext.shutdownWG.Add(1) + go func() { + defer ext.shutdownWG.Done() + + err := ext.httpServer.Serve(hln) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() + + return nil +} + +func (*rsExtension) Dependencies() []component.ID { + return []component.ID{jaegerstorage.ID} +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go new file mode 100644 index 00000000000..e5b60eb7cb1 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestNewExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) + + assert.NotNil(t, e) +} + +func TestStartAndShutdownLocalFile(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + + e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) + require.NotNil(t, e) + require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) + + require.NoError(t, e.Shutdown(context.Background())) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go new file mode 100644 index 00000000000..d65d2646e2b --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -0,0 +1,70 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/extension" + + "github.com/jaegertracing/jaeger/ports" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("remote_sampling") + +const ( + defaultAggregationBuckets = 10 + defaultInitialSamplingProbability = 0.001 + defaultMinSamplesPerSecond = 1.0 / float64(time.Minute/time.Second) // once every 1 minute + defaultLeaderLeaseRefreshInterval = 5 * time.Second + defaultFollowerLeaseRefreshInterval = 60 * time.Second +) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + componentType, + createDefaultConfig, + createExtension, + component.StabilityLevelBeta, + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + HTTP: HTTPConfig{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP), + }, + }, + GRPC: GRPCConfig{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorGRPC), + Transport: confignet.TransportTypeTCP, + }, + }, + }, + File: FileConfig{ + Path: "", + }, + Adaptive: AdaptiveConfig{ + InitialSamplingProbability: defaultInitialSamplingProbability, + MinSamplesPerSecond: defaultMinSamplesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + AggregationBuckets: defaultAggregationBuckets, + }, + } +} + +func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + return newExtension(cfg.(*Config), set.TelemetrySettings), nil +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory_test.go b/cmd/jaeger/internal/extension/remotesampling/factory_test.go new file mode 100644 index 00000000000..706edf437ed --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/factory_test.go @@ -0,0 +1,28 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension/extensiontest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateExtension(t *testing.T) { + cfg := createDefaultConfig().(*Config) + f := NewFactory() + r, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/cmd/jaeger/internal/extension/remotesampling/package_test.go b/cmd/jaeger/internal/extension/remotesampling/package_test.go new file mode 100644 index 00000000000..5bd9ea71735 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package remotesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go new file mode 100644 index 00000000000..3abd0df99c4 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -0,0 +1,47 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import "time" + +type Config struct { + // name of the strategy storage defined in the jaegerstorage extension + StrategyStore string `mapstructure:"strategy_store"` + + // TargetSamplesPerSecond is the global target rate of samples per operation. + // TODO implement manual overrides per service/operation. + TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` + + // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) + // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) + // means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough" + // and the system does not need to send an updated sampling probability (the "control signal" u(t) + // in the PID Controller terminology) to the sampler in the application. + // + // Increase this to reduce the amount of fluctuation in the calculated probabilities. + DeltaTolerance float64 `mapstructure:"delta_tolerance"` + + // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, + // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth + // of aggregated throughput data. + CalculationInterval time.Duration `mapstructure:"calculation_interval"` + + // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, + // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. + BucketsForCalculation int `mapstructure:"buckets_for_calculation"` + + // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval + // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time + // we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations + // off of. This delay is necessary to counteract the rate at which the SDKs poll for + // the latest sampling probabilities. The default client poll rate is 1 minute, which means that + // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly + // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can + // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. + Delay time.Duration `mapstructure:"delay"` + + // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling + // probability will be in the range [MinSamplingProbability, 1.0]. + MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go new file mode 100644 index 00000000000..cf872ad322c --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -0,0 +1,66 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorhelper" +) + +// componentType is the name of this extension in configuration. +var componentType = component.MustNewType("adaptive_sampling") + +const ( + defaultTargetSamplesPerSecond = 1 + defaultDeltaTolerance = 0.3 + defaultBucketsForCalculation = 1 + defaultCalculationInterval = time.Minute + defaultAggregationBuckets = 10 + defaultDelay = time.Minute * 2 + defaultMinSamplingProbability = 1e-5 // one in 100k requests +) + +// NewFactory creates a factory for the jaeger remote sampling extension. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelBeta), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + TargetSamplesPerSecond: defaultTargetSamplesPerSecond, + DeltaTolerance: defaultDeltaTolerance, + CalculationInterval: defaultCalculationInterval, + BucketsForCalculation: defaultBucketsForCalculation, + Delay: defaultDelay, + MinSamplingProbability: defaultMinSamplingProbability, + } +} + +func createTracesProcessor( + ctx context.Context, + set processor.CreateSettings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + sp := newTraceProcessor(*oCfg, set.TelemetrySettings) + return processorhelper.NewTracesProcessor( + ctx, + set, + cfg, + nextConsumer, + sp.processTraces, + processorhelper.WithStart(sp.start), + processorhelper.WithShutdown(sp.close), + ) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/package_test.go b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go new file mode 100644 index 00000000000..10d464704eb --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go new file mode 100644 index 00000000000..df654559472 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -0,0 +1,95 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "fmt" + + otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" +) + +type traceProcessor struct { + config *Config + logger *zap.Logger + aggregator strategystore.Aggregator +} + +func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProcessor { + return &traceProcessor{ + config: &cfg, + logger: otel.Logger, + } +} + +func (tp *traceProcessor) start(_ context.Context, host component.Host) error { + extCfg, err := remotesampling.GetExtensionConfig(host) + + tp.logger.Info("Adaptive sampling extension config", zap.Any("config", extCfg)) + + if err != nil { + return err + } + opts := adaptive.Options{ + InitialSamplingProbability: extCfg.InitialSamplingProbability, + TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond, + DeltaTolerance: tp.config.DeltaTolerance, + CalculationInterval: tp.config.CalculationInterval, + AggregationBuckets: extCfg.AggregationBuckets, + BucketsForCalculation: tp.config.BucketsForCalculation, + Delay: tp.config.Delay, + MinSamplingProbability: tp.config.MinSamplingProbability, + LeaderLeaseRefreshInterval: extCfg.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: extCfg.FollowerLeaseRefreshInterval, + } + + store, ep, err := remotesampling.GetSamplingStorage(tp.config.StrategyStore, host, opts, tp.logger) + if err != nil { + return err + } + + agg, err := adaptive.NewAggregator(opts, tp.logger, metrics.NullFactory, ep, store) + if err != nil { + return err + } + + agg.Start() + tp.aggregator = agg + + return nil +} + +func (tp *traceProcessor) close(context.Context) error { + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the adpative sampling aggregator : %w", err) + } + } + return nil +} + +func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches, err := otlp2jaeger.ProtoFromTraces(td) + if err != nil { + return td, fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err) + } + + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + adaptive.RecordThroughput(tp.aggregator, span, tp.logger) + } + } + return td, nil +} diff --git a/cmd/jaeger/sampling-strategies.json b/cmd/jaeger/sampling-strategies.json new file mode 100644 index 00000000000..6928e6d0436 --- /dev/null +++ b/cmd/jaeger/sampling-strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.1 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 1 + } + ] +} diff --git a/pkg/clientcfg/clientcfghttp/handler.go b/pkg/clientcfg/clientcfghttp/handler.go index ab2cef08c5d..7bc22be9a4e 100644 --- a/pkg/clientcfg/clientcfghttp/handler.go +++ b/pkg/clientcfg/clientcfghttp/handler.go @@ -109,6 +109,17 @@ func (h *HTTPHandler) RegisterRoutes(router *mux.Router) { }).Methods(http.MethodGet) } +// RegisterRoutes registers configuration handlers with HTTP Router. +func (h *HTTPHandler) RegisterRoutesWithHTTP(router *http.ServeMux) { + prefix := h.params.BasePath + router.HandleFunc( + prefix+"/", + func(w http.ResponseWriter, r *http.Request) { + h.serveSamplingHTTP(w, r, h.encodeThriftLegacy) + }, + ) +} + func (h *HTTPHandler) serviceFromRequest(w http.ResponseWriter, r *http.Request) (string, error) { services := r.URL.Query()["service"] if len(services) != 1 { diff --git a/plugin/sampling/strategystore/adaptive/aggregator.go b/plugin/sampling/strategystore/adaptive/aggregator.go index f933be71cf6..33b64b3a615 100644 --- a/plugin/sampling/strategystore/adaptive/aggregator.go +++ b/plugin/sampling/strategystore/adaptive/aggregator.go @@ -129,6 +129,23 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } } +func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logger *zap.Logger) { + // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, + // we can be sure that only a root span will have sampler tags. + if span.ParentSpanID() != span_model.NewSpanID(0) { + return + } + service := span.Process.ServiceName + if service == "" || span.OperationName == "" { + return + } + samplerType, samplerParam := span.GetSamplerParams(logger) + if samplerType == span_model.SamplerTypeUnrecognized { + return + } + agg.RecordThroughput(service, span.OperationName, samplerType, samplerParam) +} + func (a *aggregator) Start() { a.postAggregator.Start() From 0da465582bca96e1d3ef5c3e589cb59bf428547e Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 15 Jun 2024 20:17:08 -0400 Subject: [PATCH 02/12] fix lint issues Signed-off-by: Yuri Shkuro --- cmd/jaeger/internal/extension/remotesampling/extension.go | 8 +++----- .../internal/processors/adaptivesampling/processor.go | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index fc272e0e85b..8fdd9de4d24 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -107,9 +107,7 @@ func GetSamplingStorage( func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { if ext.cfg.File.Path != "" { - // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. - //nolint - if err := ext.startFileStrategyStore(); err != nil { + if err := ext.startFileStrategyStore(ctx); err != nil { return err } } @@ -152,12 +150,12 @@ func (ext *rsExtension) Shutdown(ctx context.Context) error { return errors.Join(errs...) } -func (ext *rsExtension) startFileStrategyStore() error { +func (ext *rsExtension) startFileStrategyStore(_ context.Context) error { opts := static.Options{ StrategiesFile: ext.cfg.File.Path, } - // TODO contextcheck linter complains about next line that context is not passed. It is not wrong. + // contextcheck linter complains about next line that context is not passed. //nolint ss, err := static.NewStrategyStore(opts, ext.telemetry.Logger) if err != nil { diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index df654559472..b5ff86c7ad7 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -77,7 +77,7 @@ func (tp *traceProcessor) close(context.Context) error { return nil } -func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { +func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { batches, err := otlp2jaeger.ProtoFromTraces(td) if err != nil { return td, fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err) From aff467e60c5665a7a430ebdcdc43dc8ed308a4d0 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 15 Jun 2024 21:05:14 -0400 Subject: [PATCH 03/12] fix Signed-off-by: Yuri Shkuro --- .../extension/jaegerstorage/extension.go | 2 +- .../extension/remotesampling/extension.go | 133 ++++++++++-------- .../processors/adaptivesampling/processor.go | 30 ++-- 3 files changed, 86 insertions(+), 79 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 3bb1073ddf7..bc0301dc6ad 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -55,7 +55,7 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error f, ok := comp.(Extension).Factory(name) if !ok { return nil, fmt.Errorf( - "cannot find storage '%s' declared by '%s' extension", + "cannot find definition of storage '%s' in the configuration for extension '%s'", name, componentType, ) } diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index 8fdd9de4d24..2d4a93ee719 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -36,12 +36,14 @@ var _ extension.Extension = (*rsExtension)(nil) const defaultResourceName = "sampling_store_leader" type rsExtension struct { - cfg *Config - telemetry component.TelemetrySettings - httpServer *http.Server - grpcServer *grpc.Server - strategyStore strategystore.StrategyStore - shutdownWG sync.WaitGroup + cfg *Config + telemetry component.TelemetrySettings + httpServer *http.Server + grpcServer *grpc.Server + strategyProvider strategystore.StrategyStore // TODO we should rename this to Provider, not "store" + adaptiveStore samplingstore.Store + distLock *leaderelection.DistributedElectionParticipant + shutdownWG sync.WaitGroup } func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension { @@ -51,58 +53,35 @@ func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtensi } } -func GetExtensionConfig(host component.Host) (AdaptiveConfig, error) { +// GetAdaptiveSamplingStore locates the `remotesampling` extension in Host +// and returns the sampling store and a loader/follower implementation, provided +// that the extension is configured with adaptive sampling (vs. file-based config). +func GetAdaptiveSamplingStore( + host component.Host, +) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) { var comp component.Component + var compID component.ID for id, ext := range host.GetExtensions() { if id.Type() == componentType { comp = ext + compID = id break } } if comp == nil { - return AdaptiveConfig{}, fmt.Errorf( + return nil, nil, fmt.Errorf( "cannot find extension '%s' (make sure it's defined earlier in the config)", componentType, ) } - return comp.(*rsExtension).cfg.Adaptive, nil -} - -// GetSamplingStorage retrieves a storage factory from `jaegerstorage` extension -// and uses it to create a sampling store and a loader/follower implementation. -func GetSamplingStorage( - samplingStorage string, - host component.Host, - opts adaptive.Options, - logger *zap.Logger, -) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) { - f, err := jaegerstorage.GetStorageFactory(samplingStorage, host) - if err != nil { - return nil, nil, fmt.Errorf("cannot find storage factory: %w", err) - } - - ssStore, ok := f.(storage.SamplingStoreFactory) + ext, ok := comp.(*rsExtension) if !ok { - return nil, nil, fmt.Errorf("storage factory of type %s does not support sampling store", samplingStorage) + return nil, nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, componentType) } - - lock, err := ssStore.CreateLock() - if err != nil { - return nil, nil, err + if ext.adaptiveStore == nil || ext.distLock == nil { + return nil, nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID) } - - store, err := ssStore.CreateSamplingStore(opts.AggregationBuckets) - if err != nil { - return nil, nil, err - } - - ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{ - FollowerLeaseRefreshInterval: opts.FollowerLeaseRefreshInterval, - LeaderLeaseRefreshInterval: opts.LeaderLeaseRefreshInterval, - Logger: logger, - }) - - return store, ep, nil + return ext.adaptiveStore, ext.distLock, nil } func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { @@ -142,9 +121,15 @@ func (ext *rsExtension) Shutdown(ctx context.Context) error { ext.grpcServer.GracefulStop() } - if ext.strategyStore != nil { - if err := ext.strategyStore.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to stop strategy store: %w", err)) + if ext.distLock != nil { + if err := ext.distLock.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop the distributed lock: %w", err)) + } + } + + if ext.strategyProvider != nil { + if err := ext.strategyProvider.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to stop strategy provider: %w", err)) } } return errors.Join(errs...) @@ -157,16 +142,52 @@ func (ext *rsExtension) startFileStrategyStore(_ context.Context) error { // contextcheck linter complains about next line that context is not passed. //nolint - ss, err := static.NewStrategyStore(opts, ext.telemetry.Logger) + provider, err := static.NewStrategyStore(opts, ext.telemetry.Logger) if err != nil { return fmt.Errorf("failed to create the local file strategy store: %w", err) } - ext.strategyStore = ss + ext.strategyProvider = provider return nil } func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { + storageName := ext.cfg.Adaptive.StrategyStore + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("cannot find storage factory: %w", err) + } + + storeFactory, ok := f.(storage.SamplingStoreFactory) + if !ok { + return fmt.Errorf("storage '%s' does not support sampling store", storageName) + } + + store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets) + if err != nil { + return fmt.Errorf("failed to create the sampling store: %w", err) + } + ext.adaptiveStore = store + + { + lock, err := storeFactory.CreateLock() + if err != nil { + return fmt.Errorf("failed to create the distributed lock: %w", err) + } + + ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, + leaderelection.ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, + Logger: ext.telemetry.Logger, + }) + if err := ep.Start(); err != nil { + return fmt.Errorf("failed to start the leader election participant: %w", err) + } + ext.distLock = ep + } + + // TODO it is unlikely all these options are needed, we should refactor more opts := adaptive.Options{ InitialSamplingProbability: ext.cfg.Adaptive.InitialSamplingProbability, MinSamplesPerSecond: ext.cfg.Adaptive.MinSamplesPerSecond, @@ -174,15 +195,11 @@ func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, AggregationBuckets: ext.cfg.Adaptive.AggregationBuckets, } - - store, ep, err := GetSamplingStorage(ext.cfg.Adaptive.StrategyStore, host, opts, ext.telemetry.Logger) - if err != nil { - return err + provider := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ext.distLock, store) + if err := provider.Start(); err != nil { + return fmt.Errorf("failed to start the adaptive strategy store: %w", err) } - - ss := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ep, store) - ss.Start() - ext.strategyStore = ss + ext.strategyProvider = provider return nil } @@ -193,7 +210,7 @@ func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host } healthServer := health.NewServer() - api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyStore)) + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider)) healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) ext.telemetry.Logger.Info("Starting GRPC server", zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint)) @@ -220,7 +237,7 @@ func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ ConfigManager: &clientcfghttp.ConfigManager{ - SamplingStrategyStore: ext.strategyStore, + SamplingStrategyStore: ext.strategyProvider, }, MetricsFactory: metrics.NullFactory, BasePath: "/api", diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index b5ff86c7ad7..c1912f4bc0a 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -32,34 +32,24 @@ func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProce } func (tp *traceProcessor) start(_ context.Context, host component.Host) error { - extCfg, err := remotesampling.GetExtensionConfig(host) - - tp.logger.Info("Adaptive sampling extension config", zap.Any("config", extCfg)) - + store, ep, err := remotesampling.GetAdaptiveSamplingStore(host) if err != nil { return err } - opts := adaptive.Options{ - InitialSamplingProbability: extCfg.InitialSamplingProbability, - TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond, - DeltaTolerance: tp.config.DeltaTolerance, - CalculationInterval: tp.config.CalculationInterval, - AggregationBuckets: extCfg.AggregationBuckets, - BucketsForCalculation: tp.config.BucketsForCalculation, - Delay: tp.config.Delay, - MinSamplingProbability: tp.config.MinSamplingProbability, - LeaderLeaseRefreshInterval: extCfg.LeaderLeaseRefreshInterval, - FollowerLeaseRefreshInterval: extCfg.FollowerLeaseRefreshInterval, - } - store, ep, err := remotesampling.GetSamplingStorage(tp.config.StrategyStore, host, opts, tp.logger) - if err != nil { - return err + opts := adaptive.Options{ + TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond, + DeltaTolerance: tp.config.DeltaTolerance, + CalculationInterval: tp.config.CalculationInterval, + BucketsForCalculation: tp.config.BucketsForCalculation, + Delay: tp.config.Delay, + MinSamplingProbability: tp.config.MinSamplingProbability, } + // TODO it is unlikely that aggregator needs the full Options object, we need to refactor. agg, err := adaptive.NewAggregator(opts, tp.logger, metrics.NullFactory, ep, store) if err != nil { - return err + return fmt.Errorf("failed to create the adpative sampling aggregator : %w", err) } agg.Start() From 75f0ad119865573f12a9010047860d540bfa6290 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 21:13:23 -0400 Subject: [PATCH 04/12] refactor Signed-off-by: Yuri Shkuro --- .../extension/remotesampling/config.go | 66 ++++++------------- .../extension/remotesampling/extension.go | 59 +++++++++-------- .../extension/remotesampling/factory.go | 35 +++------- .../processors/adaptivesampling/config.go | 49 ++++---------- .../processors/adaptivesampling/factory.go | 9 +-- .../processors/adaptivesampling/processor.go | 25 ++++--- .../strategyprovider/adaptive/aggregator.go | 2 +- .../strategyprovider/adaptive/options.go | 38 +++++++---- 8 files changed, 111 insertions(+), 172 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go index 97536bf3398..31da11ec03d 100644 --- a/cmd/jaeger/internal/extension/remotesampling/config.go +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -6,65 +6,38 @@ package remotesampling import ( "errors" "reflect" - "time" + "github.com/asaskevich/govalidator" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" ) var ( - errNoSource = errors.New("no sampling strategy specified, has to be either 'adaptive' or 'file'") - errMultipleSource = errors.New("only one sampling strategy can be specified, has to be either 'adaptive' or 'file'") + errNoSource = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'") + errMultipleSource = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") ) -type FileConfig struct { - // File specifies a local file as the strategies source - Path string `mapstructure:"path"` -} - -type AdaptiveConfig struct { - // name of the strategy storage defined in the jaegerstorage extension - StrategyStore string `mapstructure:"strategy_store"` - - // InitialSamplingProbability is the initial sampling probability for all new operations. - InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` - - // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if - // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the - // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for - // all operations. - // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. - AggregationBuckets int `mapstructure:"aggregation_buckets"` - - // MinSamplesPerSecond determines the min number of traces that are sampled per second. - // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do - // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations - // that may never be sampled by the probabilistic sampler. - MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` - - // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before - // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval - // to reduce lock thrashing. - LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` - - // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower - // (ie. failed to gain the leader lock). - FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` -} +var _ component.ConfigValidator = (*Config)(nil) type Config struct { - File FileConfig `mapstructure:"file"` - Adaptive AdaptiveConfig `mapstructure:"adaptive"` - HTTP HTTPConfig `mapstructure:"http"` - GRPC GRPCConfig `mapstructure:"grpc"` + File FileConfig `mapstructure:"file"` + Adaptive AdaptiveConfig `mapstructure:"adaptive"` + HTTP confighttp.ServerConfig `mapstructure:"http"` + GRPC configgrpc.ServerConfig `mapstructure:"grpc"` } -type HTTPConfig struct { - confighttp.ServerConfig `mapstructure:",squash"` +type FileConfig struct { + // File specifies a local file as the source of sampling strategies. + Path string `valid:"required" mapstructure:"path"` } -type GRPCConfig struct { - configgrpc.ServerConfig `mapstructure:",squash"` +type AdaptiveConfig struct { + // StrategyStore is the name of the strategy storage defined in the jaegerstorage extension. + StrategyStore string `valid:"required" mapstructure:"strategy_store"` + + adaptive.Options `mapstructure:",squash"` } func (cfg *Config) Validate() error { @@ -76,5 +49,6 @@ func (cfg *Config) Validate() error { if cfg.File.Path != "" && cfg.Adaptive.StrategyStore != "" { return errMultipleSource } - return nil + _, err := govalidator.ValidateStruct(cfg) + return err } diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index 2d4a93ee719..3aeb4e02064 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -19,13 +19,13 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection" - "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" - "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/samplingstore" @@ -40,7 +40,7 @@ type rsExtension struct { telemetry component.TelemetrySettings httpServer *http.Server grpcServer *grpc.Server - strategyProvider strategystore.StrategyStore // TODO we should rename this to Provider, not "store" + strategyProvider samplingstrategy.Provider // TODO we should rename this to Provider, not "store" adaptiveStore samplingstore.Store distLock *leaderelection.DistributedElectionParticipant shutdownWG sync.WaitGroup @@ -53,12 +53,17 @@ func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtensi } } -// GetAdaptiveSamplingStore locates the `remotesampling` extension in Host +// AdaptiveSamplingComponents is a struct that holds the components needed for adaptive sampling. +type AdaptiveSamplingComponents struct { + SamplingStore samplingstore.Store + DistLock *leaderelection.DistributedElectionParticipant + Options *adaptive.Options +} + +// GetAdaptiveSamplingComponents locates the `remotesampling` extension in Host // and returns the sampling store and a loader/follower implementation, provided // that the extension is configured with adaptive sampling (vs. file-based config). -func GetAdaptiveSamplingStore( - host component.Host, -) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) { +func GetAdaptiveSamplingComponents(host component.Host) (*AdaptiveSamplingComponents, error) { var comp component.Component var compID component.ID for id, ext := range host.GetExtensions() { @@ -69,30 +74,34 @@ func GetAdaptiveSamplingStore( } } if comp == nil { - return nil, nil, fmt.Errorf( + return nil, fmt.Errorf( "cannot find extension '%s' (make sure it's defined earlier in the config)", componentType, ) } ext, ok := comp.(*rsExtension) if !ok { - return nil, nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, componentType) + return nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, componentType) } if ext.adaptiveStore == nil || ext.distLock == nil { - return nil, nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID) + return nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID) } - return ext.adaptiveStore, ext.distLock, nil + return &AdaptiveSamplingComponents{ + SamplingStore: ext.adaptiveStore, + DistLock: ext.distLock, + Options: &ext.cfg.Adaptive.Options, + }, nil } func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { if ext.cfg.File.Path != "" { - if err := ext.startFileStrategyStore(ctx); err != nil { + if err := ext.startFileBasedStrategyProvider(ctx); err != nil { return err } } if ext.cfg.Adaptive.StrategyStore != "" { - if err := ext.startAdaptiveStrategyStore(host); err != nil { + if err := ext.startAdaptiveStrategyProvider(host); err != nil { return err } } @@ -135,14 +144,14 @@ func (ext *rsExtension) Shutdown(ctx context.Context) error { return errors.Join(errs...) } -func (ext *rsExtension) startFileStrategyStore(_ context.Context) error { +func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error { opts := static.Options{ StrategiesFile: ext.cfg.File.Path, } // contextcheck linter complains about next line that context is not passed. //nolint - provider, err := static.NewStrategyStore(opts, ext.telemetry.Logger) + provider, err := static.NewProvider(opts, ext.telemetry.Logger) if err != nil { return fmt.Errorf("failed to create the local file strategy store: %w", err) } @@ -151,7 +160,7 @@ func (ext *rsExtension) startFileStrategyStore(_ context.Context) error { return nil } -func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { +func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { storageName := ext.cfg.Adaptive.StrategyStore f, err := jaegerstorage.GetStorageFactory(storageName, host) if err != nil { @@ -187,15 +196,7 @@ func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error { ext.distLock = ep } - // TODO it is unlikely all these options are needed, we should refactor more - opts := adaptive.Options{ - InitialSamplingProbability: ext.cfg.Adaptive.InitialSamplingProbability, - MinSamplesPerSecond: ext.cfg.Adaptive.MinSamplesPerSecond, - LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval, - FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval, - AggregationBuckets: ext.cfg.Adaptive.AggregationBuckets, - } - provider := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ext.distLock, store) + provider := adaptive.NewProvider(ext.cfg.Adaptive.Options, ext.telemetry.Logger, ext.distLock, store) if err := provider.Start(); err != nil { return fmt.Errorf("failed to start the adaptive strategy store: %w", err) } @@ -237,7 +238,7 @@ func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ ConfigManager: &clientcfghttp.ConfigManager{ - SamplingStrategyStore: ext.strategyProvider, + SamplingProvider: ext.strategyProvider, }, MetricsFactory: metrics.NullFactory, BasePath: "/api", @@ -250,9 +251,9 @@ func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host return err } - ext.telemetry.Logger.Info("Starting HTTP server", zap.String("endpoint", ext.cfg.HTTP.ServerConfig.Endpoint)) + ext.telemetry.Logger.Info("Starting HTTP server", zap.String("endpoint", ext.cfg.HTTP.Endpoint)) var hln net.Listener - if hln, err = ext.cfg.HTTP.ServerConfig.ToListener(ctx); err != nil { + if hln, err = ext.cfg.HTTP.ToListener(ctx); err != nil { return err } diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go index d65d2646e2b..ac24a9adf31 100644 --- a/cmd/jaeger/internal/extension/remotesampling/factory.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -5,7 +5,6 @@ package remotesampling import ( "context" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -13,20 +12,13 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/extension" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" "github.com/jaegertracing/jaeger/ports" ) // componentType is the name of this extension in configuration. var componentType = component.MustNewType("remote_sampling") -const ( - defaultAggregationBuckets = 10 - defaultInitialSamplingProbability = 0.001 - defaultMinSamplesPerSecond = 1.0 / float64(time.Minute/time.Second) // once every 1 minute - defaultLeaderLeaseRefreshInterval = 5 * time.Second - defaultFollowerLeaseRefreshInterval = 60 * time.Second -) - // NewFactory creates a factory for the jaeger remote sampling extension. func NewFactory() extension.Factory { return extension.NewFactory( @@ -39,28 +31,21 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - HTTP: HTTPConfig{ - ServerConfig: confighttp.ServerConfig{ - Endpoint: ports.PortToHostPort(ports.CollectorHTTP), - }, + HTTP: confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP), }, - GRPC: GRPCConfig{ - ServerConfig: configgrpc.ServerConfig{ - NetAddr: confignet.AddrConfig{ - Endpoint: ports.PortToHostPort(ports.CollectorGRPC), - Transport: confignet.TransportTypeTCP, - }, + GRPC: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorGRPC), + Transport: confignet.TransportTypeTCP, }, }, File: FileConfig{ - Path: "", + Path: "", // path needs to be specified }, Adaptive: AdaptiveConfig{ - InitialSamplingProbability: defaultInitialSamplingProbability, - MinSamplesPerSecond: defaultMinSamplesPerSecond, - LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, - FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, - AggregationBuckets: defaultAggregationBuckets, + StrategyStore: "", // storage name needs to be specified + Options: adaptive.DefaultOptions(), }, } } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go index 3abd0df99c4..0ac0cefd219 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/config.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -3,45 +3,18 @@ package adaptivesampling -import "time" +import ( + "github.com/asaskevich/govalidator" + "go.opentelemetry.io/collector/component" +) -type Config struct { - // name of the strategy storage defined in the jaegerstorage extension - StrategyStore string `mapstructure:"strategy_store"` - - // TargetSamplesPerSecond is the global target rate of samples per operation. - // TODO implement manual overrides per service/operation. - TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` - - // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) - // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) - // means that if abs((actual-expected) / expected) < 0.3, then the actual sampling rate is "close enough" - // and the system does not need to send an updated sampling probability (the "control signal" u(t) - // in the PID Controller terminology) to the sampler in the application. - // - // Increase this to reduce the amount of fluctuation in the calculated probabilities. - DeltaTolerance float64 `mapstructure:"delta_tolerance"` - - // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, - // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth - // of aggregated throughput data. - CalculationInterval time.Duration `mapstructure:"calculation_interval"` +var _ component.ConfigValidator = (*Config)(nil) - // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, - // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. - BucketsForCalculation int `mapstructure:"buckets_for_calculation"` - - // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval - // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time - // we'll have [now()-12m,now()-2m] range of throughput data in memory to base the calculations - // off of. This delay is necessary to counteract the rate at which the SDKs poll for - // the latest sampling probabilities. The default client poll rate is 1 minute, which means that - // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly - // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can - // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. - Delay time.Duration `mapstructure:"delay"` +type Config struct { + // all configuration for the processor is in the remotesampling extension +} - // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling - // probability will be in the range [MinSamplingProbability, 1.0]. - MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` +func (cfg *Config) Validate() error { + _, err := govalidator.ValidateStruct(cfg) + return err } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go index cf872ad322c..4c3c8ec127c 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/factory.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -36,14 +36,7 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{ - TargetSamplesPerSecond: defaultTargetSamplesPerSecond, - DeltaTolerance: defaultDeltaTolerance, - CalculationInterval: defaultCalculationInterval, - BucketsForCalculation: defaultBucketsForCalculation, - Delay: defaultDelay, - MinSamplingProbability: defaultMinSamplingProbability, - } + return &Config{} } func createTracesProcessor( diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index c1912f4bc0a..f1853912aaa 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -12,16 +12,16 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/pkg/metrics" - "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive" + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" ) type traceProcessor struct { config *Config logger *zap.Logger - aggregator strategystore.Aggregator + aggregator samplingstrategy.Aggregator } func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProcessor { @@ -32,22 +32,19 @@ func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProce } func (tp *traceProcessor) start(_ context.Context, host component.Host) error { - store, ep, err := remotesampling.GetAdaptiveSamplingStore(host) + parts, err := remotesampling.GetAdaptiveSamplingComponents(host) if err != nil { return err } - opts := adaptive.Options{ - TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond, - DeltaTolerance: tp.config.DeltaTolerance, - CalculationInterval: tp.config.CalculationInterval, - BucketsForCalculation: tp.config.BucketsForCalculation, - Delay: tp.config.Delay, - MinSamplingProbability: tp.config.MinSamplingProbability, - } - // TODO it is unlikely that aggregator needs the full Options object, we need to refactor. - agg, err := adaptive.NewAggregator(opts, tp.logger, metrics.NullFactory, ep, store) + agg, err := adaptive.NewAggregator( + *parts.Options, + tp.logger, + metrics.NullFactory, + parts.DistLock, + parts.SamplingStore, + ) if err != nil { return fmt.Errorf("failed to create the adpative sampling aggregator : %w", err) } diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator.go b/plugin/sampling/strategyprovider/adaptive/aggregator.go index 749ec1fbbc7..5fe4fcd59dd 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator.go @@ -129,7 +129,7 @@ func (a *aggregator) RecordThroughput(service, operation string, samplerType spa } } -func RecordThroughput(agg strategystore.Aggregator, span *span_model.Span, logger *zap.Logger) { +func RecordThroughput(agg samplingstrategy.Aggregator, span *span_model.Span, logger *zap.Logger) { // TODO simply checking parentId to determine if a span is a root span is not sufficient. However, // we can be sure that only a root span will have sampler tags. if span.ParentSpanID() != span_model.NewSpanID(0) { diff --git a/plugin/sampling/strategyprovider/adaptive/options.go b/plugin/sampling/strategyprovider/adaptive/options.go index 64288632dbf..4eb13361443 100644 --- a/plugin/sampling/strategyprovider/adaptive/options.go +++ b/plugin/sampling/strategyprovider/adaptive/options.go @@ -53,7 +53,7 @@ const ( type Options struct { // TargetSamplesPerSecond is the global target rate of samples per operation. // TODO implement manual overrides per service/operation. - TargetSamplesPerSecond float64 + TargetSamplesPerSecond float64 `mapstructure:"target_samples_per_second"` // DeltaTolerance is the acceptable amount of deviation between the observed and the desired (target) // throughput for an operation, expressed as a ratio. For example, the value of 0.3 (30% deviation) @@ -62,23 +62,23 @@ type Options struct { // in the PID Controller terminology) to the sampler in the application. // // Increase this to reduce the amount of fluctuation in the calculated probabilities. - DeltaTolerance float64 + DeltaTolerance float64 `mapstructure:"delta_tolerance"` // CalculationInterval determines how often new probabilities are calculated. E.g. if it is 1 minute, // new sampling probabilities are calculated once a minute and each bucket will contain 1 minute worth // of aggregated throughput data. - CalculationInterval time.Duration + CalculationInterval time.Duration `mapstructure:"calculation_interval"` // AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if // the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the // AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for // all operations. // TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. - AggregationBuckets int + AggregationBuckets int `mapstructure:"aggregation_buckets"` // BucketsForCalculation determines how many previous buckets used in calculating the weighted QPS, // ie. if BucketsForCalculation is 1, only the most recent bucket will be used in calculating the weighted QPS. - BucketsForCalculation int + BucketsForCalculation int `mapstructure:"calculation_buckets"` // Delay is the amount of time to delay probability generation by, ie. if the CalculationInterval // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time @@ -88,29 +88,45 @@ type Options struct { // during any 1 minute interval, the clients will be fetching new probabilities in a uniformly // distributed manner throughout the 1 minute window. By setting the delay to 2 minutes, we can // guarantee that all clients can use the latest calculated probabilities for at least 1 minute. - Delay time.Duration + Delay time.Duration `mapstructure:"calculation_delay"` // InitialSamplingProbability is the initial sampling probability for all new operations. - InitialSamplingProbability float64 + InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"` // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling // probability will be in the range [MinSamplingProbability, 1.0]. - MinSamplingProbability float64 + MinSamplingProbability float64 `mapstructure:"min_sampling_probability"` // MinSamplesPerSecond determines the min number of traces that are sampled per second. // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do // its best to sample at least one trace a minute for an operation. This is useful for low QPS operations // that may never be sampled by the probabilistic sampler. - MinSamplesPerSecond float64 + MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"` // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval // to reduce lock thrashing. - LeaderLeaseRefreshInterval time.Duration + LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"` // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower // (ie. failed to gain the leader lock). - FollowerLeaseRefreshInterval time.Duration + FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"` +} + +func DefaultOptions() Options { + return Options{ + TargetSamplesPerSecond: defaultTargetSamplesPerSecond, + DeltaTolerance: defaultDeltaTolerance, + BucketsForCalculation: defaultBucketsForCalculation, + CalculationInterval: defaultCalculationInterval, + AggregationBuckets: defaultAggregationBuckets, + Delay: defaultDelay, + InitialSamplingProbability: defaultInitialSamplingProbability, + MinSamplingProbability: defaultMinSamplingProbability, + MinSamplesPerSecond: defaultMinSamplesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + } } // AddFlags adds flags for Options From adda2b26dffd0f56df71271989945331a94e3c9b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 21:14:31 -0400 Subject: [PATCH 05/12] restore most configs Signed-off-by: Yuri Shkuro --- cmd/jaeger/config-badger.yaml | 17 ++--------------- cmd/jaeger/config-cassandra.yaml | 17 ++--------------- cmd/jaeger/config-elasticsearch.yaml | 16 ++-------------- cmd/jaeger/config-opensearch.yaml | 16 ++-------------- 4 files changed, 8 insertions(+), 58 deletions(-) diff --git a/cmd/jaeger/config-badger.yaml b/cmd/jaeger/config-badger.yaml index a683eb2a52f..4643c9cc75a 100644 --- a/cmd/jaeger/config-badger.yaml +++ b/cmd/jaeger/config-badger.yaml @@ -1,23 +1,12 @@ service: - extensions: [jaeger_storage, jaeger_query, remote_sampling] + extensions: [jaeger_storage, jaeger_query] pipelines: traces: receivers: [otlp] - processors: [batch, adaptive_sampling] + processors: [batch] exporters: [jaeger_storage_exporter] extensions: - remote_sampling: - # You can either use file or adaptive sampling strategy in remote_sampling - # file: - # path: ./cmd/jaeger/sampling-strategies.json - adaptive: - initial_sampling_probability: 0.1 - strategy_store: badger_main - http: - grpc: - - jaeger_query: trace_storage: badger_main trace_storage_archive: badger_archive @@ -48,8 +37,6 @@ receivers: processors: batch: - adaptive_sampling: - strategy_store: badger_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 26b7485c89e..39cfb319489 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -1,22 +1,12 @@ service: - extensions: [jaeger_storage, jaeger_query, remote_sampling] + extensions: [jaeger_storage, jaeger_query] pipelines: traces: receivers: [otlp] - processors: [batch, adaptive_sampling] + processors: [batch] exporters: [jaeger_storage_exporter] extensions: - remote_sampling: - # You can either use file or adaptive sampling strategy in remote_sampling - # file: - # path: ./cmd/jaeger/sampling-strategies.json - adaptive: - initial_sampling_probability: 0.1 - strategy_store: cassandra_main - http: - grpc: - jaeger_query: trace_storage: cassandra_main trace_storage_archive: cassandra_archive @@ -57,9 +47,6 @@ receivers: processors: batch: - adaptive_sampling: - strategy_store: cassandra_main - exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-elasticsearch.yaml b/cmd/jaeger/config-elasticsearch.yaml index 028b4b46084..21257f920d6 100644 --- a/cmd/jaeger/config-elasticsearch.yaml +++ b/cmd/jaeger/config-elasticsearch.yaml @@ -1,22 +1,12 @@ service: - extensions: [jaeger_storage, jaeger_query, remote_sampling] + extensions: [jaeger_storage, jaeger_query] pipelines: traces: receivers: [otlp] - processors: [batch, adaptive_sampling] + processors: [batch] exporters: [jaeger_storage_exporter] extensions: - remote_sampling: - # You can either use file or adaptive sampling strategy in remote_sampling - # file: - # path: ./cmd/jaeger/sampling-strategies.json - adaptive: - initial_sampling_probability: 0.1 - strategy_store: es_main - http: - grpc: - jaeger_query: trace_storage: es_main trace_storage_archive: es_archive @@ -45,8 +35,6 @@ receivers: processors: batch: - adaptive_sampling: - strategy_store: es_main exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/config-opensearch.yaml b/cmd/jaeger/config-opensearch.yaml index 453f638bedf..c7acf7d8f53 100644 --- a/cmd/jaeger/config-opensearch.yaml +++ b/cmd/jaeger/config-opensearch.yaml @@ -1,22 +1,12 @@ service: - extensions: [jaeger_storage, jaeger_query, remote_sampling] + extensions: [jaeger_storage, jaeger_query] pipelines: traces: receivers: [otlp] - processors: [batch, adaptive_sampling] + processors: [batch] exporters: [jaeger_storage_exporter] extensions: - remote_sampling: - # You can either use file or adaptive sampling strategy in remote_sampling - # file: - # path: ./cmd/jaeger/sampling-strategies.json - adaptive: - initial_sampling_probability: 0.1 - strategy_store: os_main - http: - grpc: - jaeger_query: trace_storage: os_main trace_storage_archive: os_archive @@ -46,8 +36,6 @@ receivers: processors: batch: - adaptive_sampling: - strategy_store: os_main exporters: jaeger_storage_exporter: From fe93d6e61e7c33cb368e6fc9e0e1c4c67539de82 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 22:04:25 -0400 Subject: [PATCH 06/12] make config sections optional Signed-off-by: Yuri Shkuro --- cmd/jaeger/config.yaml | 21 ++--- .../extension/remotesampling/config.go | 61 +++++++++--- .../extension/remotesampling/extension.go | 94 +++++++++++-------- .../extension/remotesampling/factory.go | 14 +-- 4 files changed, 119 insertions(+), 71 deletions(-) diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 7d76ea0d02f..82677c5d890 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -13,16 +13,6 @@ extensions: # zpages: # endpoint: 0.0.0.0:55679 - remote_sampling: - # You can either use file or adaptive sampling strategy in remote_sampling - # file: - # path: ./cmd/jaeger/sampling-strategies.json - adaptive: - initial_sampling_probability: 0.1 - strategy_store: memstore - http: - grpc: - jaeger_query: trace_storage: memstore trace_storage_archive: memstore_archive @@ -35,6 +25,16 @@ extensions: memstore_archive: max_traces: 100000 + remote_sampling: + # You can either use file or adaptive sampling strategy in remote_sampling + # file: + # path: ./cmd/jaeger/sampling-strategies.json + adaptive: + sampling_store: memstore + initial_sampling_probability: 0.1 + http: + grpc: + receivers: otlp: protocols: @@ -53,7 +53,6 @@ receivers: processors: batch: adaptive_sampling: - strategy_store: memstore exporters: jaeger_storage_exporter: diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go index 31da11ec03d..8eba9d4f612 100644 --- a/cmd/jaeger/internal/extension/remotesampling/config.go +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -5,27 +5,28 @@ package remotesampling import ( "errors" - "reflect" "github.com/asaskevich/govalidator" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap" ) var ( - errNoSource = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'") - errMultipleSource = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") + errNoProvider = errors.New("no sampling strategy provider specified, expecting 'adaptive' or 'file'") + errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") ) var _ component.ConfigValidator = (*Config)(nil) +var _ confmap.Unmarshaler = (*Config)(nil) type Config struct { - File FileConfig `mapstructure:"file"` - Adaptive AdaptiveConfig `mapstructure:"adaptive"` - HTTP confighttp.ServerConfig `mapstructure:"http"` - GRPC configgrpc.ServerConfig `mapstructure:"grpc"` + File *FileConfig `mapstructure:"file"` + Adaptive *AdaptiveConfig `mapstructure:"adaptive"` + HTTP *confighttp.ServerConfig `mapstructure:"http"` + GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` } type FileConfig struct { @@ -34,21 +35,53 @@ type FileConfig struct { } type AdaptiveConfig struct { - // StrategyStore is the name of the strategy storage defined in the jaegerstorage extension. - StrategyStore string `valid:"required" mapstructure:"strategy_store"` + // SamplingStore is the name of the storage defined in the jaegerstorage extension. + SamplingStore string `valid:"required" mapstructure:"sampling_store"` adaptive.Options `mapstructure:",squash"` } +// Unmarshal is a custom unmarshaler that allows the factory to provide default values +// for nested configs (like GRPC endpoint) yes still reset the pointers to nil if the +// config did not contain the corresponding sections. +// This is a workaround for the lack of opional fields support in OTEL confmap. +// Issue: https://github.com/open-telemetry/opentelemetry-collector/issues/10266 +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + // first load the config normally + err := conf.Unmarshal(cfg) + if err != nil { + return err + } + + // use string names of fields to see if they are set in the confmap + if !conf.IsSet("file") { + cfg.File = nil + } + + if !conf.IsSet("adaptive") { + cfg.Adaptive = nil + } + + if !conf.IsSet("grpc") { + cfg.GRPC = nil + } + + if !conf.IsSet("http") { + cfg.HTTP = nil + } + + return nil +} + func (cfg *Config) Validate() error { - emptyCfg := createDefaultConfig().(*Config) - if reflect.DeepEqual(*cfg, *emptyCfg) { - return errNoSource + if cfg.File == nil && cfg.Adaptive == nil { + return errNoProvider } - if cfg.File.Path != "" && cfg.Adaptive.StrategyStore != "" { - return errMultipleSource + if cfg.File != nil && cfg.Adaptive != nil { + return errMultipleProviders } + _, err := govalidator.ValidateStruct(cfg) return err } diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index 3aeb4e02064..6d772ed57e0 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -94,24 +94,36 @@ func GetAdaptiveSamplingComponents(host component.Host) (*AdaptiveSamplingCompon } func (ext *rsExtension) Start(ctx context.Context, host component.Host) error { - if ext.cfg.File.Path != "" { + if ext.cfg.File != nil { + ext.telemetry.Logger.Info( + "Starting file-based sampling strategy provider", + zap.String("path", ext.cfg.File.Path), + ) if err := ext.startFileBasedStrategyProvider(ctx); err != nil { return err } } - if ext.cfg.Adaptive.StrategyStore != "" { + if ext.cfg.Adaptive != nil { + ext.telemetry.Logger.Info( + "Starting adaptive sampling strategy provider", + zap.String("sampling_store", ext.cfg.Adaptive.SamplingStore), + ) if err := ext.startAdaptiveStrategyProvider(host); err != nil { return err } } - if err := ext.startHTTPServer(ctx, host); err != nil { - return fmt.Errorf("failed to start sampling http server: %w", err) + if ext.cfg.HTTP != nil { + if err := ext.startHTTPServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling http server: %w", err) + } } - if err := ext.startGRPCServer(ctx, host); err != nil { - return fmt.Errorf("failed to start sampling gRPC server: %w", err) + if ext.cfg.GRPC != nil { + if err := ext.startGRPCServer(ctx, host); err != nil { + return fmt.Errorf("failed to start sampling gRPC server: %w", err) + } } return nil @@ -161,7 +173,7 @@ func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error } func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { - storageName := ext.cfg.Adaptive.StrategyStore + storageName := ext.cfg.Adaptive.SamplingStore f, err := jaegerstorage.GetStorageFactory(storageName, host) if err != nil { return fmt.Errorf("cannot find storage factory: %w", err) @@ -204,20 +216,28 @@ func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error return nil } -func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { +func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { + handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ + ConfigManager: &clientcfghttp.ConfigManager{ + SamplingProvider: ext.strategyProvider, + }, + MetricsFactory: metrics.NullFactory, + BasePath: "/api", // TODO is /api correct? + }) + httpMux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(httpMux) + var err error - if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { + if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { return err } - healthServer := health.NewServer() - api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider)) - healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) - grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) - ext.telemetry.Logger.Info("Starting GRPC server", zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint)) - - var gln net.Listener - if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { + ext.telemetry.Logger.Info( + "Starting remote sampling HTTP server", + zap.String("endpoint", ext.cfg.HTTP.Endpoint), + ) + var hln net.Listener + if hln, err = ext.cfg.HTTP.ToListener(ctx); err != nil { return err } @@ -225,44 +245,40 @@ func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host go func() { defer ext.shutdownWG.Done() - if errGrpc := ext.grpcServer.Serve(gln); errGrpc != nil && !errors.Is(errGrpc, grpc.ErrServerStopped) { - ext.telemetry.ReportStatus(component.NewFatalErrorEvent(errGrpc)) + err := ext.httpServer.Serve(hln) + if err != nil && !errors.Is(err, http.ErrServerClosed) { + ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) } }() return nil } -func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error { - httpMux := http.NewServeMux() - - handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{ - ConfigManager: &clientcfghttp.ConfigManager{ - SamplingProvider: ext.strategyProvider, - }, - MetricsFactory: metrics.NullFactory, - BasePath: "/api", - }) - - handler.RegisterRoutesWithHTTP(httpMux) - +func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host) error { var err error - if ext.httpServer, err = ext.cfg.HTTP.ToServer(ctx, host, ext.telemetry, httpMux); err != nil { + if ext.grpcServer, err = ext.cfg.GRPC.ToServer(ctx, host, ext.telemetry); err != nil { return err } - ext.telemetry.Logger.Info("Starting HTTP server", zap.String("endpoint", ext.cfg.HTTP.Endpoint)) - var hln net.Listener - if hln, err = ext.cfg.HTTP.ToListener(ctx); err != nil { + api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider)) + + healthServer := health.NewServer() // support health checks on the gRPC server + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer) + + ext.telemetry.Logger.Info( + "Starting remote sampling GRPC server", + zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint), + ) + var gln net.Listener + if gln, err = ext.cfg.GRPC.NetAddr.Listen(ctx); err != nil { return err } ext.shutdownWG.Add(1) go func() { defer ext.shutdownWG.Done() - - err := ext.httpServer.Serve(hln) - if err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := ext.grpcServer.Serve(gln); err != nil && !errors.Is(err, grpc.ErrServerStopped) { ext.telemetry.ReportStatus(component.NewFatalErrorEvent(err)) } }() diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go index ac24a9adf31..100244237cd 100644 --- a/cmd/jaeger/internal/extension/remotesampling/factory.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -31,20 +31,20 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - HTTP: confighttp.ServerConfig{ - Endpoint: ports.PortToHostPort(ports.CollectorHTTP), + HTTP: &confighttp.ServerConfig{ + Endpoint: ports.PortToHostPort(ports.CollectorHTTP + 100), }, - GRPC: configgrpc.ServerConfig{ + GRPC: &configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ports.PortToHostPort(ports.CollectorGRPC), + Endpoint: ports.PortToHostPort(ports.CollectorGRPC + 100), Transport: confignet.TransportTypeTCP, }, }, - File: FileConfig{ + File: &FileConfig{ Path: "", // path needs to be specified }, - Adaptive: AdaptiveConfig{ - StrategyStore: "", // storage name needs to be specified + Adaptive: &AdaptiveConfig{ + SamplingStore: "", // storage name needs to be specified Options: adaptive.DefaultOptions(), }, } From ae73998b19bda02fa4ea3d0e0299077f3624e01b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 22:06:07 -0400 Subject: [PATCH 07/12] fmt Signed-off-by: Yuri Shkuro --- .../internal/extension/remotesampling/config.go | 9 ++++++--- .../internal/processors/adaptivesampling/factory.go | 11 ----------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/config.go b/cmd/jaeger/internal/extension/remotesampling/config.go index 8eba9d4f612..c19f3ab48e0 100644 --- a/cmd/jaeger/internal/extension/remotesampling/config.go +++ b/cmd/jaeger/internal/extension/remotesampling/config.go @@ -7,11 +7,12 @@ import ( "errors" "github.com/asaskevich/govalidator" - "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" + + "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" ) var ( @@ -19,8 +20,10 @@ var ( errMultipleProviders = errors.New("only one sampling strategy provider can be specified, 'adaptive' or 'file'") ) -var _ component.ConfigValidator = (*Config)(nil) -var _ confmap.Unmarshaler = (*Config)(nil) +var ( + _ component.ConfigValidator = (*Config)(nil) + _ confmap.Unmarshaler = (*Config)(nil) +) type Config struct { File *FileConfig `mapstructure:"file"` diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go index 4c3c8ec127c..fd13682800a 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/factory.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -5,7 +5,6 @@ package adaptivesampling import ( "context" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -16,16 +15,6 @@ import ( // componentType is the name of this extension in configuration. var componentType = component.MustNewType("adaptive_sampling") -const ( - defaultTargetSamplesPerSecond = 1 - defaultDeltaTolerance = 0.3 - defaultBucketsForCalculation = 1 - defaultCalculationInterval = time.Minute - defaultAggregationBuckets = 10 - defaultDelay = time.Minute * 2 - defaultMinSamplingProbability = 1e-5 // one in 100k requests -) - // NewFactory creates a factory for the jaeger remote sampling extension. func NewFactory() processor.Factory { return processor.NewFactory( From c5f95444d6f780b7da34420b0b61a0bbd620a1be Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 22:23:43 -0400 Subject: [PATCH 08/12] better error logging Signed-off-by: Yuri Shkuro --- cmd/jaeger/config.yaml | 2 ++ cmd/jaeger/internal/processors/adaptivesampling/processor.go | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/jaeger/config.yaml b/cmd/jaeger/config.yaml index 82677c5d890..e34fb2ff7d6 100644 --- a/cmd/jaeger/config.yaml +++ b/cmd/jaeger/config.yaml @@ -52,6 +52,8 @@ receivers: processors: batch: + # Adaptive Sampling Processor is required to support adaptive sampling. + # It expects remote_sampling extension with `adaptive:` config to be enabled. adaptive_sampling: exporters: diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index f1853912aaa..d556d731e44 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -34,10 +34,9 @@ func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProce func (tp *traceProcessor) start(_ context.Context, host component.Host) error { parts, err := remotesampling.GetAdaptiveSamplingComponents(host) if err != nil { - return err + return fmt.Errorf("cannot start the '%s' processor: %w", componentType, err) } - // TODO it is unlikely that aggregator needs the full Options object, we need to refactor. agg, err := adaptive.NewAggregator( *parts.Options, tp.logger, @@ -46,7 +45,7 @@ func (tp *traceProcessor) start(_ context.Context, host component.Host) error { parts.SamplingStore, ) if err != nil { - return fmt.Errorf("failed to create the adpative sampling aggregator : %w", err) + return fmt.Errorf("failed to create the adaptive sampling aggregator: %w", err) } agg.Start() From 88ec66f4ca0a74d3fc12218c84f31e96ee795c81 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 23:11:54 -0400 Subject: [PATCH 09/12] add tests Signed-off-by: Yuri Shkuro --- .../remotesampling/extension_test.go | 89 ++++++++++++++++--- 1 file changed, 77 insertions(+), 12 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go index e5b60eb7cb1..8a805696ee8 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -8,26 +8,91 @@ import ( "path/filepath" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" ) -func TestNewExtension(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") - e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) +type storageHost struct { + t *testing.T + storageExtension component.Component +} + +func (host storageHost) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + jaegerstorage.ID: host.storageExtension, + } +} + +func (host storageHost) ReportFatalError(err error) { + host.t.Fatal(err) +} + +func (storageHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } +func (storageHost) GetExporters() map[component.DataType]map[component.ID]component.Component { + return nil +} - assert.NotNil(t, e) +func makeStorageExtension(t *testing.T, memstoreName string) storageHost { + extensionFactory := jaegerstorage.NewFactory() + storageExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.CreateSettings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + &jaegerstorage.Config{Memory: map[string]memoryCfg.Configuration{ + memstoreName: {MaxTraces: 10000}, + }}) + require.NoError(t, err) + host := storageHost{t: t, storageExtension: storageExtension} + + err = storageExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, storageExtension.Shutdown(context.Background())) }) + return host } -func TestStartAndShutdownLocalFile(t *testing.T) { - cfg := createDefaultConfig().(*Config) +func TestStartFileBasedProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) cfg.File.Path = filepath.Join("..", "..", "..", "sampling-strategies.json") + cfg.Adaptive = nil + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) + + ext, err := factory.CreateExtension(context.Background(), extension.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) +} - e := newExtension(cfg, componenttest.NewNopTelemetrySettings()) - require.NotNil(t, e) - require.NoError(t, e.Start(context.Background(), componenttest.NewNopHost())) +func TestStartAdaptiveProvider(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.File = nil + cfg.Adaptive.SamplingStore = "foobar" + cfg.HTTP = nil + cfg.GRPC = nil + require.NoError(t, cfg.Validate()) - require.NoError(t, e.Shutdown(context.Background())) + ext, err := factory.CreateExtension(context.Background(), extension.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + }, cfg) + require.NoError(t, err) + host := makeStorageExtension(t, "foobar") + require.NoError(t, ext.Start(context.Background(), host)) + require.NoError(t, ext.Shutdown(context.Background())) } From ff73c573481a10214b48bc96b372f8a60d483181 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Mon, 17 Jun 2024 23:21:24 -0400 Subject: [PATCH 10/12] fix Signed-off-by: Yuri Shkuro --- cmd/jaeger/internal/extension/jaegerstorage/extension_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index d5a6c6c29bf..595dad7223d 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -99,8 +99,8 @@ func TestStorageFactoryBadHostError(t *testing.T) { func TestStorageFactoryBadNameError(t *testing.T) { host := storageHost{t: t, storageExtension: startStorageExtension(t, "foo")} - _, err := GetStorageFactory("bar", host) - require.ErrorContains(t, err, "cannot find storage 'bar'") + _, err := GetStorageFactory("foobar", host) + require.ErrorContains(t, err, "'foobar'") } func TestStorageFactoryBadShutdownError(t *testing.T) { From 1a2c505707e6abc84804b8ae7a9e0bd039669460 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sun, 23 Jun 2024 11:14:28 +0530 Subject: [PATCH 11/12] fix lint Signed-off-by: pushkarm029 --- .../internal/extension/remotesampling/extension_test.go | 6 +++--- cmd/jaeger/internal/extension/remotesampling/factory.go | 2 +- .../internal/extension/remotesampling/factory_test.go | 2 +- cmd/jaeger/internal/processors/adaptivesampling/factory.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go index 8a805696ee8..0d646a4e4f7 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -43,7 +43,7 @@ func makeStorageExtension(t *testing.T, memstoreName string) storageHost { extensionFactory := jaegerstorage.NewFactory() storageExtension, err := extensionFactory.CreateExtension( context.Background(), - extension.CreateSettings{ + extension.Settings{ TelemetrySettings: component.TelemetrySettings{ Logger: zap.L(), TracerProvider: nooptrace.NewTracerProvider(), @@ -70,7 +70,7 @@ func TestStartFileBasedProvider(t *testing.T) { cfg.GRPC = nil require.NoError(t, cfg.Validate()) - ext, err := factory.CreateExtension(context.Background(), extension.CreateSettings{ + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ TelemetrySettings: componenttest.NewNopTelemetrySettings(), }, cfg) require.NoError(t, err) @@ -88,7 +88,7 @@ func TestStartAdaptiveProvider(t *testing.T) { cfg.GRPC = nil require.NoError(t, cfg.Validate()) - ext, err := factory.CreateExtension(context.Background(), extension.CreateSettings{ + ext, err := factory.CreateExtension(context.Background(), extension.Settings{ TelemetrySettings: componenttest.NewNopTelemetrySettings(), }, cfg) require.NoError(t, err) diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go index 100244237cd..b6c8fd82d64 100644 --- a/cmd/jaeger/internal/extension/remotesampling/factory.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -50,6 +50,6 @@ func createDefaultConfig() component.Config { } } -func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) { +func createExtension(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { return newExtension(cfg.(*Config), set.TelemetrySettings), nil } diff --git a/cmd/jaeger/internal/extension/remotesampling/factory_test.go b/cmd/jaeger/internal/extension/remotesampling/factory_test.go index 706edf437ed..a6de996e02e 100644 --- a/cmd/jaeger/internal/extension/remotesampling/factory_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory_test.go @@ -22,7 +22,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateExtension(t *testing.T) { cfg := createDefaultConfig().(*Config) f := NewFactory() - r, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + r, err := f.CreateExtension(context.Background(), extensiontest.NewNopSettings(), cfg) require.NoError(t, err) assert.NotNil(t, r) } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory.go b/cmd/jaeger/internal/processors/adaptivesampling/factory.go index fd13682800a..601dcd60317 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/factory.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory.go @@ -30,7 +30,7 @@ func createDefaultConfig() component.Config { func createTracesProcessor( ctx context.Context, - set processor.CreateSettings, + set processor.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { From 4c54c2efb06f169fc1b0b2b721fe2a15ae82e984 Mon Sep 17 00:00:00 2001 From: pushkarm029 Date: Sun, 7 Jul 2024 21:02:48 +0530 Subject: [PATCH 12/12] some prog Signed-off-by: pushkarm029 --- .../extension/remotesampling/config_test.go | 129 +++++++++++++++ .../extension/remotesampling/extension.go | 5 + .../remotesampling/extension_test.go | 134 ++++++++++++++++ .../extension/remotesampling/factory.go | 2 + .../processors/adaptivesampling/config.go | 4 +- .../adaptivesampling/factory_test.go | 35 ++++ .../adaptivesampling/processor_test.go | 89 +++++++++++ go.mod | 4 +- pkg/clientcfg/clientcfghttp/handler_test.go | 151 ++++++++++++------ .../adaptive/aggregator_test.go | 41 +++++ .../strategyprovider/adaptive/options_test.go | 15 ++ 11 files changed, 560 insertions(+), 49 deletions(-) create mode 100644 cmd/jaeger/internal/extension/remotesampling/config_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/factory_test.go create mode 100644 cmd/jaeger/internal/processors/adaptivesampling/processor_test.go diff --git a/cmd/jaeger/internal/extension/remotesampling/config_test.go b/cmd/jaeger/internal/extension/remotesampling/config_test.go new file mode 100644 index 00000000000..5127153a6e5 --- /dev/null +++ b/cmd/jaeger/internal/extension/remotesampling/config_test.go @@ -0,0 +1,129 @@ +package remotesampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func Test_Validate(t *testing.T) { + tests := []struct { + name string + config *Config + expectedErr string + }{ + { + name: "No provider specified", + config: &Config{}, + expectedErr: "no sampling strategy provider specified, expecting 'adaptive' or 'file'", + }, + { + name: "Both providers specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "only one sampling strategy provider can be specified, 'adaptive' or 'file'", + }, + { + name: "Only File provider specified", + config: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Only Adaptive provider specified", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Invalid File provider", + config: &Config{ + File: &FileConfig{Path: ""}, + }, + expectedErr: "File.Path: non zero value required", + }, + { + name: "Invalid Adaptive provider", + config: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: ""}, + }, + expectedErr: "Adaptive.SamplingStore: non zero value required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.expectedErr == "" { + require.NoError(t, err) + } else { + assert.Equal(t, tt.expectedErr, err.Error()) + } + }) + } +} + +func Test_Unmarshal(t *testing.T) { + tests := []struct { + name string + input map[string]interface{} + expectedCfg *Config + expectedErr string + }{ + { + name: "Valid config with File", + input: map[string]interface{}{ + "file": map[string]interface{}{ + "path": "test-path", + }, + }, + expectedCfg: &Config{ + File: &FileConfig{Path: "test-path"}, + }, + expectedErr: "", + }, + { + name: "Valid config with Adaptive", + input: map[string]interface{}{ + "adaptive": map[string]interface{}{ + "sampling_store": "test-store", + }, + }, + expectedCfg: &Config{ + Adaptive: &AdaptiveConfig{SamplingStore: "test-store"}, + }, + expectedErr: "", + }, + { + name: "Empty config", + input: map[string]interface{}{}, + expectedCfg: &Config{ + File: nil, + Adaptive: nil, + HTTP: nil, + GRPC: nil, + }, + expectedErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := confmap.NewFromStringMap(tt.input) + var cfg Config + err := cfg.Unmarshal(conf) + if tt.expectedErr == "" { + require.NoError(t, err) + assert.Equal(t, tt.expectedCfg, &cfg) + } else { + assert.EqualError(t, err, tt.expectedErr) + } + }) + } +} diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index 6d772ed57e0..1e243901b35 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -33,6 +33,11 @@ import ( var _ extension.Extension = (*rsExtension)(nil) +// type Extension interface { +// extension.Extension +// // rs *rsExtension +// } + const defaultResourceName = "sampling_store_leader" type rsExtension struct { diff --git a/cmd/jaeger/internal/extension/remotesampling/extension_test.go b/cmd/jaeger/internal/extension/remotesampling/extension_test.go index 0d646a4e4f7..b9c9e9c185f 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension_test.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension_test.go @@ -39,6 +39,26 @@ func (storageHost) GetExporters() map[component.DataType]map[component.ID]compon return nil } +type samplingHost struct { + t *testing.T + samplingExtension component.Component +} + +func (host samplingHost) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + ID: host.samplingExtension, + } +} + +func (host samplingHost) ReportFatalError(err error) { + host.t.Fatal(err) +} + +func (samplingHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } +func (samplingHost) GetExporters() map[component.DataType]map[component.ID]component.Component { + return nil +} + func makeStorageExtension(t *testing.T, memstoreName string) storageHost { extensionFactory := jaegerstorage.NewFactory() storageExtension, err := extensionFactory.CreateExtension( @@ -61,6 +81,27 @@ func makeStorageExtension(t *testing.T, memstoreName string) storageHost { return host } +func makeRemoteSamplingExtension(t *testing.T, cfg component.Config) samplingHost { + extensionFactory := NewFactory() + samplingExtension, err := extensionFactory.CreateExtension( + context.Background(), + extension.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.L(), + TracerProvider: nooptrace.NewTracerProvider(), + }, + }, + cfg, + ) + require.NoError(t, err) + host := samplingHost{t: t, samplingExtension: samplingExtension} + + err = samplingExtension.Start(context.Background(), host) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, samplingExtension.Shutdown(context.Background())) }) + return host +} + func TestStartFileBasedProvider(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) @@ -96,3 +137,96 @@ func TestStartAdaptiveProvider(t *testing.T) { require.NoError(t, ext.Start(context.Background(), host)) require.NoError(t, ext.Shutdown(context.Background())) } + +// storageExtension, err := extensionFactory.CreateExtension( +// context.Background(), +// extension.Settings{ +// TelemetrySettings: component.TelemetrySettings{ +// Logger: zap.L(), +// TracerProvider: nooptrace.NewTracerProvider(), +// }, +// }, +// &jaegerstorage.Config{Memory: map[string]memoryCfg.Configuration{ +// memstoreName: {MaxTraces: 10000}, +// }}) + +func TestGetAdaptiveSamplingComponents(t *testing.T) { + host := makeStorageExtension(t, "foobar") + _, err := GetAdaptiveSamplingComponents(host) + require.Error(t, err) + + samplingG := makeRemoteSamplingExtension(t, &Config{}) + _, err = GetAdaptiveSamplingComponents(samplingG) + require.NoError(t, err) + + // host = makeRemoteSamplingExtension(t, &Config{}) + // t.Log(host.GetExtensions()) + // _, err = GetAdaptiveSamplingComponents(host) + // require.NoError(t, err) + + // tests := []struct { + // name string + // setupHost func(*testing.T) component.Host + // expectedError string + // }{ + // { + // name: "successful retrieval", + // setupHost: func(t *testing.T) component.Host { + // factory := NewFactory() + // cfg := factory.CreateDefaultConfig().(*Config) + // cfg.File = nil + // cfg.Adaptive.SamplingStore = "foobar" + // cfg.HTTP = nil + // cfg.GRPC = nil + // require.NoError(t, cfg.Validate()) + + // ext, err := factory.CreateExtension(context.Background(), extension.Settings{ + // TelemetrySettings: componenttest.NewNopTelemetrySettings(), + // }, cfg) + // require.NoError(t, err) + // host := makeStorageExtension(t, "foobar") + // require.NoError(t, ext.Start(context.Background(), host)) + // t.Cleanup(func() { + // require.NoError(t, ext.Shutdown(context.Background())) + // }) + // return host + // }, + // expectedError: "", + // }, + // { + // name: "extension not found", + // setupHost: func(t *testing.T) component.Host { + // return storageHost{t: t} + // }, + // expectedError: "cannot find extension 'jaeger.remote_sampling' (make sure it's defined earlier in the config)", + // }, + // // { + // // name: "incorrect extension type", + // // setupHost: func(t *testing.T) component.Host { + // // host := storageHost{t: t} + // // host.storageExtension = componenttest.NewNopExtension() + // // return host + // // }, + // // expectedError: "extension 'jaeger.remote_sampling' is not of type 'jaeger.remote_sampling'", + // // }, + // } + + // for _, tt := range tests { + // t.Run(tt.name, func(t *testing.T) { + // host := tt.setupHost(t) + // components, err := GetAdaptiveSamplingComponents(host) + + // if tt.expectedError != "" { + // require.Error(t, err) + // require.Contains(t, err.Error(), tt.expectedError) + // require.Nil(t, components) + // } else { + // require.NoError(t, err) + // require.NotNil(t, components) + // require.NotNil(t, components.SamplingStore) + // require.NotNil(t, components.DistLock) + // require.NotNil(t, components.Options) + // } + // }) + // } +} diff --git a/cmd/jaeger/internal/extension/remotesampling/factory.go b/cmd/jaeger/internal/extension/remotesampling/factory.go index b6c8fd82d64..b1a51028c08 100644 --- a/cmd/jaeger/internal/extension/remotesampling/factory.go +++ b/cmd/jaeger/internal/extension/remotesampling/factory.go @@ -19,6 +19,8 @@ import ( // componentType is the name of this extension in configuration. var componentType = component.MustNewType("remote_sampling") +var ID = component.NewID(componentType) + // NewFactory creates a factory for the jaeger remote sampling extension. func NewFactory() extension.Factory { return extension.NewFactory( diff --git a/cmd/jaeger/internal/processors/adaptivesampling/config.go b/cmd/jaeger/internal/processors/adaptivesampling/config.go index 0ac0cefd219..6bd5db7a9ab 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/config.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/config.go @@ -4,7 +4,6 @@ package adaptivesampling import ( - "github.com/asaskevich/govalidator" "go.opentelemetry.io/collector/component" ) @@ -15,6 +14,5 @@ type Config struct { } func (cfg *Config) Validate() error { - _, err := govalidator.ValidateStruct(cfg) - return err + return nil } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go new file mode 100644 index 00000000000..3b2e3423143 --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/factory_test.go @@ -0,0 +1,35 @@ +package adaptivesampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processortest" +) + +func TestCreateDefaultConfig(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NotNil(t, cfg, "failed to create default config") + require.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateTracesProcessor(t *testing.T) { + ctx := context.Background() + cfg := createDefaultConfig().(*Config) + + nextConsumer := consumertest.NewNop() + set := processortest.NewNopSettings() + + tracesProcessor, err := createTracesProcessor(ctx, set, cfg, nextConsumer) + require.NoError(t, err) + assert.NotNil(t, tracesProcessor) +} + +func TestFactoryType(t *testing.T) { + factory := NewFactory() + assert.Equal(t, componentType, factory.Type()) +} diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go new file mode 100644 index 00000000000..aeecfd6bdef --- /dev/null +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor_test.go @@ -0,0 +1,89 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adaptivesampling + +import ( + "context" + "testing" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +type storageHost struct { + extension component.Component +} + +func (storageHost) ReportFatalError(error) { +} + +func (host storageHost) GetExtensions() map[component.ID]component.Component { + return map[component.ID]component.Component{ + remotesampling.ID: host.extension, + } +} + +func (storageHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { + return nil +} + +func (storageHost) GetExporters() map[component.DataType]map[component.ID]component.Component { + return nil +} + +// type fakeRSExtension struct{} + +// var _ remotesampling.Extension = (*fakeRSExtension)(nil) + +// func (f *fakeRSExtension) Start(ctx context.Context, host component.Host) error { +// return nil +// } + +// func (f *fakeRSExtension) Shutdown(ctx context.Context) error { +// return nil +// } + +func TestNewTraceProcessor(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + } + config, ok := createDefaultConfig().(*Config) + require.True(t, ok) + newTraceProcessor := newTraceProcessor(*config, telemetrySettings) + require.NotNil(t, newTraceProcessor) +} + +func TestTraceProcessorStart(t *testing.T) { + telemetrySettings := component.TelemetrySettings{ + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + } + config, ok := createDefaultConfig().(*Config) + require.True(t, ok) + traceProcessor := newTraceProcessor(*config, telemetrySettings) + err := traceProcessor.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) +} + +// func TestTraceProcessorStarttw(t *testing.T) { +// telemetrySettings := component.TelemetrySettings{ +// Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), +// } +// traceProcessor := newTraceProcessor(Config{}, telemetrySettings) +// getAdaptiveSamplingComponents := remotesampling.AdaptiveSamplingComponents{ +// SamplingStore: , +// } +// testHost := storageHost{ +// extension: &remotesampling.RsExtension{ +// AdaptiveStore: getAdaptiveSamplingComponents.SamplingStore, +// DistLock: getAdaptiveSamplingComponents.DistLock, +// }, +// } + +// err := traceProcessor.start(context.Background(), testHost) +// require.NoError(t, err) +// } diff --git a/go.mod b/go.mod index d45b4b4711d..ebd2bd4fcf2 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) +require go.opentelemetry.io/collector/pdata/testdata v0.103.0 // indirect + require ( github.com/IBM/sarama v1.43.2 // indirect github.com/aws/aws-sdk-go v1.53.11 // indirect @@ -188,7 +190,7 @@ require ( go.opentelemetry.io/collector v0.103.0 // indirect go.opentelemetry.io/collector/config/configauth v0.103.0 go.opentelemetry.io/collector/config/configcompression v1.10.0 // indirect - go.opentelemetry.io/collector/config/confignet v0.103.0 // indirect + go.opentelemetry.io/collector/config/confignet v0.103.0 go.opentelemetry.io/collector/config/configopaque v1.10.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.103.0 // indirect go.opentelemetry.io/collector/config/internal v0.103.0 // indirect diff --git a/pkg/clientcfg/clientcfghttp/handler_test.go b/pkg/clientcfg/clientcfghttp/handler_test.go index 6afc30d61b2..723047c1ed3 100644 --- a/pkg/clientcfg/clientcfghttp/handler_test.go +++ b/pkg/clientcfg/clientcfghttp/handler_test.go @@ -47,6 +47,7 @@ func withServer( basePath string, mockSamplingResponse *api_v2.SamplingStrategyResponse, mockBaggageResponse []*baggage.BaggageRestriction, + withGorilla bool, testFn func(server *testServer), ) { metricsFactory := metricstest.NewFactory(0) @@ -62,9 +63,18 @@ func withServer( BasePath: basePath, LegacySamplingEndpoint: true, }) - r := mux.NewRouter() - handler.RegisterRoutes(r) - server := httptest.NewServer(r) + + var server *httptest.Server + if withGorilla { + r := mux.NewRouter() + handler.RegisterRoutes(r) + server = httptest.NewServer(r) + } else { + mux := http.NewServeMux() + handler.RegisterRoutesWithHTTP(mux) + server = httptest.NewServer(mux) + } + defer server.Close() testFn(&testServer{ metricsFactory: metricsFactory, @@ -76,15 +86,17 @@ func withServer( } func TestHTTPHandler(t *testing.T) { + testGorillaHTTPHandler(t, "") testHTTPHandler(t, "") } func TestHTTPHandlerWithBasePath(t *testing.T) { + testGorillaHTTPHandler(t, "/foo") testHTTPHandler(t, "/foo") } -func testHTTPHandler(t *testing.T, basePath string) { - withServer(basePath, rateLimiting(42), restrictions("luggage", 10), func(ts *testServer) { +func testGorillaHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), true, func(ts *testServer) { tests := []struct { endpoint string expOutput string @@ -146,6 +158,49 @@ func testHTTPHandler(t *testing.T, basePath string) { }) } +func testHTTPHandler(t *testing.T, basePath string) { + withServer(basePath, rateLimiting(42), restrictions("luggage", 10), false, func(ts *testServer) { + tests := []struct { + endpoint string + expOutput string + }{ + { + endpoint: "/", + expOutput: `{"strategyType":1,"rateLimitingSampling":{"maxTracesPerSecond":42}}`, + }, + } + for _, test := range tests { + t.Run("endpoint="+test.endpoint, func(t *testing.T) { + resp, err := http.Get(ts.server.URL + basePath + test.endpoint + "?service=Y") + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + err = resp.Body.Close() + require.NoError(t, err) + assert.Equal(t, test.expOutput, string(body)) + if test.endpoint == "/" { + objResp := &tSampling092.SamplingStrategyResponse{} + require.NoError(t, json.Unmarshal(body, objResp)) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetStrategyType(), + objResp.GetStrategyType()) + assert.EqualValues(t, + ts.samplingProvider.samplingResponse.GetRateLimitingSampling().GetMaxTracesPerSecond(), + objResp.GetRateLimitingSampling().GetMaxTracesPerSecond()) + } else { + objResp, err := p2json.SamplingStrategyResponseFromJSON(body) + require.NoError(t, err) + assert.EqualValues(t, ts.samplingProvider.samplingResponse, objResp) + } + }) + } + + // handler must emit metrics + ts.metricsFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{Name: "http-server.requests", Tags: map[string]string{"type": "sampling-legacy"}, Value: 1}) + }) +} + func TestHTTPHandlerErrors(t *testing.T) { testCases := []struct { description string @@ -215,61 +270,67 @@ func TestHTTPHandlerErrors(t *testing.T) { for _, tc := range testCases { testCase := tc // capture loop var t.Run(testCase.description, func(t *testing.T) { - withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, func(ts *testServer) { - resp, err := http.Get(ts.server.URL + testCase.url) - require.NoError(t, err) - assert.Equal(t, testCase.statusCode, resp.StatusCode) - if testCase.body != "" { - body, err := io.ReadAll(resp.Body) + for _, withGorilla := range []bool{true, false} { + withServer("", testCase.mockSamplingResponse, testCase.mockBaggageResponse, withGorilla, func(ts *testServer) { + resp, err := http.Get(ts.server.URL + testCase.url) require.NoError(t, err) - assert.Equal(t, testCase.body, string(body)) - } + assert.Equal(t, testCase.statusCode, resp.StatusCode) + if testCase.body != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, testCase.body, string(body)) + } - if len(testCase.metrics) > 0 { - ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) - } - }) + if len(testCase.metrics) > 0 { + ts.metricsFactory.AssertCounterMetrics(t, testCase.metrics...) + } + }) + } }) } t.Run("failure to write a response", func(t *testing.T) { - withServer("", probabilistic(0.001), restrictions("luggage", 10), func(ts *testServer) { - handler := ts.handler + for _, withGorilla := range []bool{true, false} { + withServer("", probabilistic(0.001), restrictions("luggage", 10), withGorilla, func(ts *testServer) { + handler := ts.handler - req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) - w := &mockWriter{header: make(http.Header)} - handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) + req := httptest.NewRequest("GET", "http://localhost:80/?service=X", nil) + w := &mockWriter{header: make(http.Header)} + handler.serveSamplingHTTP(w, req, handler.encodeThriftLegacy) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 1}) - req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) - handler.serveBaggageHTTP(w, req) + req = httptest.NewRequest("GET", "http://localhost:80/baggageRestrictions?service=X", nil) + handler.serveBaggageHTTP(w, req) - ts.metricsFactory.AssertCounterMetrics(t, - metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) - }) + ts.metricsFactory.AssertCounterMetrics(t, + metricstest.ExpectedMetric{Name: "http-server.errors", Tags: map[string]string{"source": "write", "status": "5xx"}, Value: 2}) + }) + } }) } func TestEncodeErrors(t *testing.T) { - withServer("", nil, nil, func(server *testServer) { - _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ - StrategyType: -1, - }) - require.Error(t, err) - assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, - }...) + for _, withGorilla := range []bool{true, false} { + withServer("", nil, nil, withGorilla, func(server *testServer) { + _, err := server.handler.encodeThriftLegacy(&api_v2.SamplingStrategyResponse{ + StrategyType: -1, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ConvertSamplingResponseFromDomain failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "thrift", "status": "5xx"}, Value: 1}, + }...) - _, err = server.handler.encodeProto(nil) - require.Error(t, err) - assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") - server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ - {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, - }...) - }) + _, err = server.handler.encodeProto(nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "SamplingStrategyResponseToJSON failed") + server.metricsFactory.AssertCounterMetrics(t, []metricstest.ExpectedMetric{ + {Name: "http-server.errors", Tags: map[string]string{"source": "proto", "status": "5xx"}, Value: 1}, + }...) + }) + } } func rateLimiting(rate int32) *api_v2.SamplingStrategyResponse { diff --git a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go index bd34c3cb016..7e5a1313c03 100644 --- a/plugin/sampling/strategyprovider/adaptive/aggregator_test.go +++ b/plugin/sampling/strategyprovider/adaptive/aggregator_test.go @@ -155,3 +155,44 @@ func TestRecordThroughput(t *testing.T) { a.HandleRootSpan(span, logger) assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) } + +// func TestRecordThroughputFunc(t *testing.T) { +// metricsFactory := metricstest.NewFactory(0) +// mockStorage := &mocks.Store{} +// mockEP := &epmocks.ElectionParticipant{} +// logger := zap.NewNop() +// testOpts := Options{ +// CalculationInterval: 1 * time.Second, +// AggregationBuckets: 1, +// BucketsForCalculation: 1, +// } + +// a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage) +// require.NoError(t, err) + +// // Testing non-root span +// span := &span_model.Span{References: []span_model.SpanRef{{SpanID: span_model.NewSpanID(1), RefType: span_model.ChildOf}}} +// RecordThroughput(a, span, logger) +// require.Empty(t, a.(*aggregator).currentThroughput) + +// // Testing span with service name but no operation +// span.References = []span_model.SpanRef{} +// span.Process = &span_model.Process{ +// ServiceName: "A", +// } +// RecordThroughput(a, span, logger) +// require.Empty(t, a.(*aggregator).currentThroughput) + +// // Testing span with service name and operation but no probabilistic sampling tags +// span.OperationName = "GET" +// RecordThroughput(a, span, logger) +// require.Empty(t, a.(*aggregator).currentThroughput) + +// // Testing span with service name, operation, and probabilistic sampling tags +// span.Tags = span_model.KeyValues{ +// span_model.String("sampler.type", "probabilistic"), +// span_model.String("sampler.param", "0.001"), +// } +// RecordThroughput(a, span, logger) +// assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count) +// } diff --git a/plugin/sampling/strategyprovider/adaptive/options_test.go b/plugin/sampling/strategyprovider/adaptive/options_test.go index 1ab64c0589c..e441ad1472b 100644 --- a/plugin/sampling/strategyprovider/adaptive/options_test.go +++ b/plugin/sampling/strategyprovider/adaptive/options_test.go @@ -43,3 +43,18 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, time.Duration(5000000000), opts.LeaderLeaseRefreshInterval) assert.Equal(t, time.Duration(60000000000), opts.FollowerLeaseRefreshInterval) } + +func TestDefaultOptions(t *testing.T) { + options := DefaultOptions() + assert.Equal(t, float64(defaultTargetSamplesPerSecond), options.TargetSamplesPerSecond) + assert.Equal(t, defaultDeltaTolerance, options.DeltaTolerance) + assert.Equal(t, defaultBucketsForCalculation, options.BucketsForCalculation) + assert.Equal(t, defaultCalculationInterval, options.CalculationInterval) + assert.Equal(t, defaultAggregationBuckets, options.AggregationBuckets) + assert.Equal(t, defaultDelay, options.Delay) + assert.Equal(t, defaultInitialSamplingProbability, options.InitialSamplingProbability) + assert.Equal(t, defaultMinSamplingProbability, options.MinSamplingProbability) + assert.Equal(t, defaultMinSamplesPerSecond, options.MinSamplesPerSecond) + assert.Equal(t, defaultLeaderLeaseRefreshInterval, options.LeaderLeaseRefreshInterval) + assert.Equal(t, defaultFollowerLeaseRefreshInterval, options.FollowerLeaseRefreshInterval) +}