Skip to content

Commit

Permalink
Implement spans exporting in ClickHouse storage
Browse files Browse the repository at this point in the history
Signed-off-by: haanhvu <[email protected]>
  • Loading branch information
haanhvu committed Jan 2, 2024
1 parent d489e3a commit fd9c731
Show file tree
Hide file tree
Showing 14 changed files with 599 additions and 10 deletions.
23 changes: 20 additions & 3 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageExporter struct {
config *Config
logger *zap.Logger
spanWriter spanstore.Writer
clickhouse bool
// Separate traces exporting function for ClickHouse storage.
// This is temporary until we have v2 storage API.
chExportTraces func(ctx context.Context, td ptrace.Traces) error
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -30,14 +35,22 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor
}
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
func (exp *storageExporter) start(ctx context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
switch t := f.(type) {
case *ch.Factory:
exp.clickhouse = true
t.CreateSpansTable(ctx)
exp.chExportTraces = t.ExportSpans
default:
exp.clickhouse = false
if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
}
}

return nil
Expand All @@ -49,6 +62,10 @@ func (exp *storageExporter) close(_ context.Context) error {
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
if exp.clickhouse {
return exp.chExportTraces(ctx, td)
}

batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/exporters/storageexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func createTracesExporter(ctx context.Context, set exporter.CreateSettings, conf
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
// Enable queue settings for Clickhouse only
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
exporterhelper.WithStart(ex.start),
exporterhelper.WithShutdown(ex.close),
)
Expand Down
6 changes: 2 additions & 4 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package jaegerstorage

import (
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
)

// Config has the configuration for jaeger-query,
Expand All @@ -13,9 +14,6 @@ type Config struct {
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
ClickHouse map[string]ch.Config `mapstructure:"clickhouse"`
}
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -71,6 +72,14 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
)
}
// TODO add support for other backends

for name, chCfg := range s.config.ClickHouse {
if _, ok := s.factories[name]; ok {
return fmt.Errorf("duplicate clickhouse storage name %s", name)
}
s.factories[name] = clickhouse.NewFactory(ctx, chCfg, s.logger.With(zap.String("storage_name", name)))
}

return nil
}

Expand Down
51 changes: 51 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
# health_check:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679

jaeger_query:
trace_storage: ch_store
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000
clickhouse:
ch_store:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
spans_table_name: jaeger_spans

receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: ch_store
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ require (

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 // indirect
github.com/IBM/sarama v1.42.1 // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/aws/aws-sdk-go v1.48.14 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -93,13 +96,15 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
Expand Down Expand Up @@ -157,6 +162,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.91.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
Expand All @@ -172,8 +178,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.11 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand Down
Loading

0 comments on commit fd9c731

Please sign in to comment.