From 4608c163319b6a5534400a0b0f1544a60e77b65e Mon Sep 17 00:00:00 2001 From: haanhvu Date: Sun, 12 Nov 2023 22:08:10 +0700 Subject: [PATCH] Implement spans exporting for ClickHouse storage Signed-off-by: haanhvu --- .../exporters/storageexporter/exporter.go | 24 +++- .../exporters/storageexporter/factory.go | 15 +- .../extension/jaegerstorage/config.go | 8 ++ .../extension/jaegerstorage/extension.go | 9 ++ go.mod | 10 +- go.sum | 23 +++ plugin/storage/clickhouse/config.go | 83 +++++++++++ plugin/storage/clickhouse/factory.go | 71 ++++++++++ .../storage/clickhouse/spanstore/exporter.go | 131 ++++++++++++++++++ plugin/storage/clickhouse/spanstore/schema.go | 42 ++++++ 10 files changed, 408 insertions(+), 8 deletions(-) create mode 100644 plugin/storage/clickhouse/config.go create mode 100644 plugin/storage/clickhouse/factory.go create mode 100644 plugin/storage/clickhouse/spanstore/exporter.go create mode 100644 plugin/storage/clickhouse/spanstore/schema.go diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter.go b/cmd/jaeger/internal/exporters/storageexporter/exporter.go index 68aa330a8f4..b9277ab1d36 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter.go @@ -14,13 +14,16 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" "github.com/jaegertracing/jaeger/storage/spanstore" ) type storageExporter struct { - config *Config - logger *zap.Logger - spanWriter spanstore.Writer + config *Config + logger *zap.Logger + spanWriter spanstore.Writer + exportTraces func(ctx context.Context, td ptrace.Traces) error + requireBatchInsert bool } func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter { @@ -30,14 +33,23 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor } } -func (exp *storageExporter) start(_ context.Context, host component.Host) error { +func (exp *storageExporter) start(ctx context.Context, host component.Host) error { f, err := jaegerstorage.GetStorageFactory(exp.config.TraceStorage, host) if err != nil { return fmt.Errorf("cannot find storage factory: %w", err) } - if exp.spanWriter, err = f.CreateSpanWriter(); err != nil { - return fmt.Errorf("cannot create span writer: %w", err) + switch t := f.(type) { + case *ch.Factory: + t.CreateSpansTable(ctx) + exp.requireBatchInsert = true + exp.exportTraces = t.ExportSpans + default: + if exp.spanWriter, err = f.CreateSpanWriter(); err != nil { + return fmt.Errorf("cannot create span writer: %w", err) + } + exp.requireBatchInsert = false + exp.exportTraces = exp.pushTraces } return nil diff --git a/cmd/jaeger/internal/exporters/storageexporter/factory.go b/cmd/jaeger/internal/exporters/storageexporter/factory.go index 5721c45c4e4..b60937d50c3 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/factory.go +++ b/cmd/jaeger/internal/exporters/storageexporter/factory.go @@ -34,8 +34,21 @@ func createDefaultConfig() component.Config { func createTracesExporter(ctx context.Context, set exporter.CreateSettings, config component.Config) (exporter.Traces, error) { cfg := config.(*Config) ex := newExporter(cfg, set.TelemetrySettings) + + if ex.requireBatchInsert == true { + return exporterhelper.NewTracesExporter(ctx, set, cfg, + ex.exportTraces, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}), + //Enable queue settings for batch inserts + exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: true}), + exporterhelper.WithStart(ex.start), + exporterhelper.WithShutdown(ex.close), + ) + } return exporterhelper.NewTracesExporter(ctx, set, cfg, - ex.pushTraces, + ex.exportTraces, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // Disable Timeout/RetryOnFailure and SendingQueue exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), diff --git a/cmd/jaeger/internal/extension/jaegerstorage/config.go b/cmd/jaeger/internal/extension/jaegerstorage/config.go index 69f339926ec..951f958e3e5 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/config.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/config.go @@ -5,6 +5,7 @@ package jaegerstorage import ( memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config" + ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" ) // Config has the configuration for jaeger-query, @@ -13,9 +14,16 @@ type Config struct { // TODO add other storage types here // TODO how will this work with 3rd party storage implementations? // Option: instead of looking for specific name, check interface. + + ClickHouse map[string]ch.Config `mapstructure:"clickhouse"` } type MemoryStorage struct { Name string `mapstructure:"name"` memoryCfg.Configuration } + +type ClickHouseStorage struct { + Name string `mapstructure:"name"` + ch.Config +} diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 60db1d9ec9e..4dd4924a5df 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -14,6 +14,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/plugin/storage/clickhouse" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" ) @@ -71,6 +72,14 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error { ) } // TODO add support for other backends + + for name, chCfg := range s.config.ClickHouse { + if _, ok := s.factories[name]; ok { + return fmt.Errorf("duplicate clickhouse storage name %s", name) + } + s.factories[name] = clickhouse.NewFactory(ctx, chCfg, s.logger.With(zap.String("storage_name", name))) + } + return nil } diff --git a/go.mod b/go.mod index 6567015a552..f140e14004d 100644 --- a/go.mod +++ b/go.mod @@ -95,8 +95,11 @@ require ( require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect + github.com/ClickHouse/ch-go v0.58.2 // indirect + github.com/ClickHouse/clickhouse-go/v2 v2.15.0 // indirect github.com/IBM/sarama v1.42.1 // indirect github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/andybalholm/brotli v1.0.6 // indirect github.com/aws/aws-sdk-go v1.47.10 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -105,12 +108,14 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.14.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-faster/city v1.0.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.3.0 // indirect @@ -170,6 +175,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.89.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/openzipkin/zipkin-go v0.4.2 // indirect + github.com/paulmach/orb v0.10.0 // indirect github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect @@ -183,8 +189,10 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sagikazarmark/locafero v0.3.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/segmentio/asm v1.2.0 // indirect github.com/shirou/gopsutil/v3 v3.23.10 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/shopspring/decimal v1.3.1 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.10.0 // indirect github.com/spf13/cast v1.5.1 // indirect diff --git a/go.sum b/go.sum index fb82f747a31..d0c458bbff6 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,12 @@ contrib.go.opencensus.io/exporter/prometheus v0.4.2/go.mod h1:dvEHbiKmgvbr5pjaF9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/ClickHouse/ch-go v0.58.2 h1:jSm2szHbT9MCAB1rJ3WuCJqmGLi5UTjlNu+f530UTS0= +github.com/ClickHouse/ch-go v0.58.2/go.mod h1:Ap/0bEmiLa14gYjCiRkYGbXvbe8vwdrfTYWhsuQ99aw= +github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= +github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= +github.com/ClickHouse/clickhouse-go/v2 v2.15.0 h1:G0hTKyO8fXXR1bGnZ0DY3vTG01xYfOGW76zgjg5tmC4= +github.com/ClickHouse/clickhouse-go/v2 v2.15.0/go.mod h1:kXt1SRq0PIRa6aKZD7TnFnY9PQKmc2b13sHtOYcK6cQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ= @@ -64,6 +70,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/andybalholm/brotli v1.0.6 h1:Yf9fFpf49Zrxb9NlQaluyE92/+X7UVHlhMNJN2sxfOI= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.19.0 h1:sOqkWPzMj7w6XaYbJQG7m4sGqVolaW/0D28Ln7yPzMk= github.com/apache/thrift v0.19.0/go.mod h1:SUALL216IiaOw2Oy+5Vs9lboJ/t9g40C+G07Dc0QC1I= @@ -124,6 +132,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0= github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= @@ -155,6 +165,10 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= +github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -529,6 +543,9 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY= +github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= +github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= +github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= @@ -602,12 +619,16 @@ github.com/sagikazarmark/locafero v0.3.0 h1:zT7VEGWC2DTflmccN/5T1etyKvxSxpHsjb9c github.com/sagikazarmark/locafero v0.3.0/go.mod h1:w+v7UsPNFwzF1cHuOajOOzoq4U7v/ig1mpRjqV+Bu1U= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= @@ -695,6 +716,7 @@ github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8= +go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.11.6 h1:XM7G6PjiGAO5betLF13BIa5TlLUUE3uJ/2Ox3Lz1K+o= go.mongodb.org/mongo-driver v1.11.6/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -1220,6 +1242,7 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/plugin/storage/clickhouse/config.go b/plugin/storage/clickhouse/config.go new file mode 100644 index 00000000000..fb1f798d7a3 --- /dev/null +++ b/plugin/storage/clickhouse/config.go @@ -0,0 +1,83 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/url" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +type Config struct { + Endpoint string `mapstructure:"endpoint"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + Database string `mapstructure:"database"` + SpansTableName string `mapstructure:"spans_table_name"` + //materialized views' names? +} + +const ( + defaultDatabase = "default" + defaultUsername = "default" + defaultPassword = "" +) + +func (cfg *Config) NewClient(ctx context.Context) (*sql.DB, error) { + if cfg.Endpoint == "" { + return nil, errors.New("no endpoints specified") + } + + dsnURL, err := url.Parse(cfg.Endpoint) + if err != nil { + return nil, err + } + + queryParams := dsnURL.Query() + + if dsnURL.Scheme == "https" { + queryParams.Set("secure", "true") + } + + if cfg.Database != "" { + dsnURL.Path = cfg.Database + } else { + dsnURL.Path = defaultDatabase + } + + if cfg.Username != "" { + cfg.Username = defaultUsername + } + + if cfg.Password != "" { + cfg.Password = defaultPassword + } + + dsnURL.User = url.UserPassword(cfg.Username, cfg.Password) + + dsnURL.RawQuery = queryParams.Encode() + + dsn := dsnURL.String() + + if _, err = clickhouse.ParseDSN(dsn); err != nil { + return nil, err + } + + db, err := sql.Open("clickhouse", dsn) + if err != nil { + return nil, err + } + + createDBquery := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database) + _, err = db.ExecContext(ctx, createDBquery) + if err != nil { + return nil, err + } + + return db, nil +} diff --git a/plugin/storage/clickhouse/factory.go b/plugin/storage/clickhouse/factory.go new file mode 100644 index 00000000000..c14ea652533 --- /dev/null +++ b/plugin/storage/clickhouse/factory.go @@ -0,0 +1,71 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package clickhouse + +import ( + "context" + "database/sql" + + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/metrics" + chSpanStore "github.com/jaegertracing/jaeger/plugin/storage/clickhouse/spanstore" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +type Factory struct { + client *sql.DB + spansTableName string + logger *zap.Logger +} + +func NewFactory(ctx context.Context, cfg Config, logger *zap.Logger) *Factory { + f := &Factory{ + logger: logger, + spansTableName: cfg.SpansTableName, + } + + client, err := cfg.NewClient(ctx) + if err != nil { + f.logger.Error("failed to create ClickHouse client", zap.Error(err)) + } else { + f.client = client + } + + return f + + //TODO: Move some steps to Initialize() +} + +func (f *Factory) CreateSpansTable(ctx context.Context) { + if err := chSpanStore.CreateSpansTable(ctx, f.client, f.spansTableName); err != nil { + f.logger.Error("failed to create spans table", zap.Error(err)) + } +} + +func (f *Factory) ExportSpans(ctx context.Context, td ptrace.Traces) error { + if err := chSpanStore.ExportSpans(ctx, f.client, f.spansTableName, td); err != nil { + return err + } + + return nil +} + +func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { + return nil +} + +func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { + return nil, nil +} + +func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { + return nil, nil +} + +func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { + return nil, nil +} diff --git a/plugin/storage/clickhouse/spanstore/exporter.go b/plugin/storage/clickhouse/spanstore/exporter.go new file mode 100644 index 00000000000..10a3f588002 --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/exporter.go @@ -0,0 +1,131 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "database/sql" + "encoding/hex" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + conventions "go.opentelemetry.io/collector/semconv/v1.18.0" +) + +const ( + insertSpansSQL = ` + INSERT INTO %s ( + Timestamp, + TraceId, + SpanId, + ParentSpanId, + Operation, + Service, + Tags.keys, + Tags.values, + Duration + ) VALUES ( + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ?, + ? + ) + ` +) + +func ExportSpans(ctx context.Context, db *sql.DB, tableName string, td ptrace.Traces) error { + tx, err := db.Begin() + if err != nil { + return err + } + + defer func() { + tx.Rollback() + }() + + if err = insertSpans(ctx, tx, tableName, td); err != nil { + return err + } + + return tx.Commit() +} + +func insertSpans(ctx context.Context, tx *sql.Tx, tableName string, td ptrace.Traces) error { + statement, err := tx.PrepareContext(ctx, fmt.Sprintf(insertSpansSQL, tableName)) + if err != nil { + return err + } + + defer func() { + _ = statement.Close() + }() + + for i := 0; i < td.ResourceSpans().Len(); i++ { + spans := td.ResourceSpans().At(i) + res := spans.Resource() + var serviceName string + if v, ok := res.Attributes().Get(conventions.AttributeServiceName); ok { + serviceName = v.Str() + } + for j := 0; j < spans.ScopeSpans().Len(); j++ { + rs := spans.ScopeSpans().At(j).Spans() + for k := 0; k < rs.Len(); k++ { + r := rs.At(k) + tagKeys, tagValues := attributesToArrays(r.Attributes()) + _, err = statement.ExecContext(ctx, + r.StartTimestamp().AsTime(), + traceIDToHexOrEmptyString(r.TraceID()), + spanIDToHexOrEmptyString(r.SpanID()), + spanIDToHexOrEmptyString(r.ParentSpanID()), + r.Name(), + serviceName, + tagKeys, + tagValues, + uint64(r.EndTimestamp().AsTime().Sub(r.StartTimestamp().AsTime()).Nanoseconds()), + ) + if err != nil { + return err + } + } + } + } + + return nil +} + +func attributesToArrays(attributes pcommon.Map) ([]string, []string) { + keys := make([]string, 0) + values := make([]string, 0) + + attributes.Range(func(k string, v pcommon.Value) bool { + keys = append(keys, k) + values = append(values, v.AsString()) + return true + }) + return keys, values +} + +// spanIDToHexOrEmptyString returns a hex string from SpanID. +// An empty string is returned, if SpanID is empty. +func spanIDToHexOrEmptyString(id pcommon.SpanID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} + +// traceIDToHexOrEmptyString returns a hex string from TraceID. +// An empty string is returned, if TraceID is empty. +func traceIDToHexOrEmptyString(id pcommon.TraceID) string { + if id.IsEmpty() { + return "" + } + return hex.EncodeToString(id[:]) +} diff --git a/plugin/storage/clickhouse/spanstore/schema.go b/plugin/storage/clickhouse/spanstore/schema.go new file mode 100644 index 00000000000..3cc3641637a --- /dev/null +++ b/plugin/storage/clickhouse/spanstore/schema.go @@ -0,0 +1,42 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "database/sql" + "fmt" +) + +const ( + createSpansTableSQL = ` + CREATE TABLE IF NOT EXISTS %s ( + Timestamp DateTime64(9) CODEC(Delta, ZSTD(1)), + TraceId String CODEC(ZSTD(1)), + SpanId String CODEC(ZSTD(1)), + ParentSpanId String CODEC(ZSTD(1)), + Operation LowCardinality(String) CODEC(ZSTD(1)), + Service LowCardinality(String) CODEC(ZSTD(1)), + Tags Nested + ( + keys LowCardinality(String), + values String + ) CODEC (ZSTD(1)), + Duration UInt64 CODEC(ZSTD(1)), + INDEX idx_trace_id TraceId TYPE bloom_filter(0.001) GRANULARITY 1, + INDEX idx_tags_keys Tags.keys TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_tags_values Tags.values TYPE bloom_filter(0.01) GRANULARITY 1, + INDEX idx_duration Duration TYPE minmax GRANULARITY 1 + ) ENGINE MergeTree() + + PARTITION BY toDate(Timestamp) + ORDER BY (Service, Operation, toUnixTimestamp(Timestamp), Duration, TraceId) + SETTINGS index_granularity=8192, ttl_only_drop_parts = 1; + ` +) + +func CreateSpansTable(ctx context.Context, db *sql.DB, tableName string) error { + _, err := db.ExecContext(ctx, fmt.Sprintf(createSpansTableSQL, tableName)) + return err +}