Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
Signed-off-by: pushkarm029 <[email protected]>
  • Loading branch information
Pushkarm029 committed Jun 6, 2024
1 parent 1460eca commit e4ab125
Show file tree
Hide file tree
Showing 19 changed files with 828 additions and 12 deletions.
17 changes: 15 additions & 2 deletions cmd/jaeger/config-badger.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
initial_sampling_probability: 0.1
strategy_store: badger_main
http:
grpc:


jaeger_query:
trace_storage: badger_main
trace_storage_archive: badger_archive
Expand Down Expand Up @@ -37,6 +48,8 @@ receivers:

processors:
batch:
adaptive_sampling:
strategy_store: badger_main

exporters:
jaeger_storage_exporter:
Expand Down
17 changes: 15 additions & 2 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
initial_sampling_probability: 0.1
strategy_store: cassandra_main
http:
grpc:

jaeger_query:
trace_storage: cassandra_main
trace_storage_archive: cassandra_archive
Expand Down Expand Up @@ -47,6 +57,9 @@ receivers:

processors:
batch:
adaptive_sampling:
strategy_store: cassandra_main


exporters:
jaeger_storage_exporter:
Expand Down
16 changes: 14 additions & 2 deletions cmd/jaeger/config-elasticsearch.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
initial_sampling_probability: 0.1
strategy_store: es_main
http:
grpc:

jaeger_query:
trace_storage: es_main
trace_storage_archive: es_archive
Expand Down Expand Up @@ -35,6 +45,8 @@ receivers:

processors:
batch:
adaptive_sampling:
strategy_store: es_main

exporters:
jaeger_storage_exporter:
Expand Down
16 changes: 14 additions & 2 deletions cmd/jaeger/config-opensearch.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
initial_sampling_probability: 0.1
strategy_store: os_main
http:
grpc:

jaeger_query:
trace_storage: os_main
trace_storage_archive: os_archive
Expand Down Expand Up @@ -36,6 +46,8 @@ receivers:

processors:
batch:
adaptive_sampling:
strategy_store: os_main

exporters:
jaeger_storage_exporter:
Expand Down
16 changes: 14 additions & 2 deletions cmd/jaeger/config.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
service:
extensions: [jaeger_storage, jaeger_query]
extensions: [jaeger_storage, jaeger_query, remote_sampling]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
Expand All @@ -13,6 +13,16 @@ extensions:
# zpages:
# endpoint: 0.0.0.0:55679

remote_sampling:
# You can either use file or adaptive sampling strategy in remote_sampling
# file:
# path: ./cmd/jaeger/sampling-strategies.json
adaptive:
initial_sampling_probability: 0.1
strategy_store: memstore
http:
grpc:

jaeger_query:
trace_storage: memstore
trace_storage_archive: memstore_archive
Expand Down Expand Up @@ -42,6 +52,8 @@ receivers:

processors:
batch:
adaptive_sampling:
strategy_store: memstore

exporters:
jaeger_storage_exporter:
Expand Down
6 changes: 4 additions & 2 deletions cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerquery"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/processors/adaptivesampling"
)

type builders struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (b builders) build() (otelcol.Factories, error) {
jaegerquery.NewFactory(),
jaegerstorage.NewFactory(),
storagecleaner.NewFactory(),
// TODO add adaptive sampling
remotesampling.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down Expand Up @@ -99,7 +101,7 @@ func (b builders) build() (otelcol.Factories, error) {
batchprocessor.NewFactory(),
memorylimiterprocessor.NewFactory(),
// add-ons
// TODO add adaptive sampling
adaptivesampling.NewFactory(),
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
80 changes: 80 additions & 0 deletions cmd/jaeger/internal/extension/remotesampling/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package remotesampling

import (
"errors"
"reflect"
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
)

var (
errNoSource = errors.New("no sampling strategy specified, has to be either 'adaptive' or 'file'")
errMultipleSource = errors.New("only one sampling strategy can be specified, has to be either 'adaptive' or 'file'")
)

type FileConfig struct {
// File specifies a local file as the strategies source
Path string `mapstructure:"path"`
}

type AdaptiveConfig struct {
// name of the strategy storage defined in the jaegerstorage extension
StrategyStore string `mapstructure:"strategy_store"`

// InitialSamplingProbability is the initial sampling probability for all new operations.
InitialSamplingProbability float64 `mapstructure:"initial_sampling_probability"`

// AggregationBuckets is the total number of aggregated throughput buckets kept in memory, ie. if
// the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the
// AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for
// all operations.
// TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice.
AggregationBuckets int `mapstructure:"aggregation_buckets"`

// MinSamplesPerSecond determines the min number of traces that are sampled per second.
// For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do
// its best to sample at least one trace a minute for an operation. This is useful for low QPS operations
// that may never be sampled by the probabilistic sampler.
MinSamplesPerSecond float64 `mapstructure:"min_samples_per_second"`

// LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before
// attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval
// to reduce lock thrashing.
LeaderLeaseRefreshInterval time.Duration `mapstructure:"leader_lease_refresh_interval"`

// FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower
// (ie. failed to gain the leader lock).
FollowerLeaseRefreshInterval time.Duration `mapstructure:"follower_lease_refresh_interval"`
}

type Config struct {
File FileConfig `mapstructure:"file"`
Adaptive AdaptiveConfig `mapstructure:"adaptive"`
HTTP HTTPConfig `mapstructure:"http"`
GRPC GRPCConfig `mapstructure:"grpc"`
}

type HTTPConfig struct {
confighttp.ServerConfig `mapstructure:",squash"`
}

type GRPCConfig struct {
configgrpc.ServerConfig `mapstructure:",squash"`
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
return errNoSource
}

if cfg.File.Path != "" && cfg.Adaptive.StrategyStore != "" {
return errMultipleSource
}
return nil
}
Loading

0 comments on commit e4ab125

Please sign in to comment.