From 104d18ddb9ca3615b4ef3916749f9c7d1d19a9fe Mon Sep 17 00:00:00 2001 From: Jim Wilson <86026167+jmw51798@users.noreply.github.com> Date: Thu, 27 Jun 2024 12:39:02 -0600 Subject: [PATCH] Add rdnsquurier config, rate limiter, resolver and internal telemetry. --- comp/netflow/server/server.go | 4 +- comp/rdnsquerier/impl/config.go | 103 +++++++++++++++ comp/rdnsquerier/impl/ratelimiter.go | 41 ++++++ comp/rdnsquerier/impl/rdnsquerier.go | 183 ++++++++++++++++++++------- comp/rdnsquerier/impl/resolver.go | 66 ++++++++++ pkg/config/setup/config.go | 12 ++ 6 files changed, 364 insertions(+), 45 deletions(-) create mode 100644 comp/rdnsquerier/impl/config.go create mode 100644 comp/rdnsquerier/impl/ratelimiter.go create mode 100644 comp/rdnsquerier/impl/resolver.go diff --git a/comp/netflow/server/server.go b/comp/netflow/server/server.go index 69dea041142bcb..dab01bf7758985 100644 --- a/comp/netflow/server/server.go +++ b/comp/netflow/server/server.go @@ -57,10 +57,10 @@ func newServer(lc fx.Lifecycle, deps dependencies) (provides, error) { // it with a noop implementation. rdnsQuerier := deps.RDNSQuerier if conf.ReverseDNSEnrichmentEnabled { - deps.Logger.Debugf("NetFlow Reverse DNS Enrichment enabled") + deps.Logger.Infof("Reverse DNS Enrichment is enabled for NDM NetFlow") } else { rdnsQuerier = rdnsquerierimplnone.NewNone().Comp - deps.Logger.Debugf("NetFlow Reverse DNS Enrichment disabled") + deps.Logger.Infof("Reverse DNS Enrichment is disabled for NDM NetFlow") } flowAgg := flowaggregator.NewFlowAggregator(sender, deps.Forwarder, conf, deps.Hostname.GetSafe(context.Background()), deps.Logger, rdnsQuerier) diff --git a/comp/rdnsquerier/impl/config.go b/comp/rdnsquerier/impl/config.go new file mode 100644 index 00000000000000..0fad2b23a9d2a0 --- /dev/null +++ b/comp/rdnsquerier/impl/config.go @@ -0,0 +1,103 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package rdnsquerierimpl + +import ( + "github.com/DataDog/datadog-agent/comp/core/config" + "github.com/DataDog/datadog-agent/comp/core/log" +) + +type rdnsQuerierConfig struct { + enabled bool + workers int + chanSize int + + rateLimiterEnabled bool + rateLimiterLimit int + rateLimiterBurst int + + cacheEnabled bool + cacheEntryTTL int + cacheCleanInterval int + cachePersistInterval int + + // debug - TODO remove + fakeResolver bool + generateFakeQueriesPerSecond int + lookupDelayMs int +} + +func newConfig(agentConfig config.Component, logger log.Component) *rdnsQuerierConfig { + netflowRDNSEnrichmentEnabled := agentConfig.GetBool("network_devices.netflow.reverse_dns_enrichment_enabled") + + c := &rdnsQuerierConfig{ + enabled: netflowRDNSEnrichmentEnabled, + workers: agentConfig.GetInt("reverse_dns_enrichment.workers"), + chanSize: agentConfig.GetInt("reverse_dns_enrichment.chan_size"), + + rateLimiterEnabled: agentConfig.GetBool("reverse_dns_enrichment.rate_limiter.enabled"), + rateLimiterLimit: agentConfig.GetInt("reverse_dns_enrichment.rate_limiter.limit"), + rateLimiterBurst: agentConfig.GetInt("reverse_dns_enrichment.rate_limiter.burst"), + + cacheEnabled: agentConfig.GetBool("reverse_dns_enrichment.cache.enabled"), + cacheEntryTTL: agentConfig.GetInt("reverse_dns_enrichment.cache.entry_ttl"), + cacheCleanInterval: agentConfig.GetInt("reverse_dns_enrichment.cache.clean_interval"), + cachePersistInterval: agentConfig.GetInt("reverse_dns_enrichment.cache.persist_interval"), + + fakeResolver: agentConfig.GetBool("reverse_dns_enrichment.debug.fake_resolver"), + generateFakeQueriesPerSecond: agentConfig.GetInt("reverse_dns_enrichment.generate_fake_queries_per_second"), + lookupDelayMs: agentConfig.GetInt("reverse_dns_enrichment.lookup_delay_ms"), + } + + c.validateConfig(logger) + + return c +} + +func (c *rdnsQuerierConfig) validateConfig(logger log.Component) { + if c.enabled { + logger.Infof("Reverse DNS Enrichment component is enabled") + } else { + logger.Infof("Reverse DNS Enrichment component is disabled") + return + } + + if c.workers <= 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid number of workers %d, setting to 1", c.workers) + c.workers = 1 + } + + if c.chanSize < 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid channel size %d, setting to 0 (unbuffered)", c.chanSize) + c.chanSize = 0 + } + + if c.rateLimiterEnabled { + if c.rateLimiterLimit <= 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid rate limiter limit %d, setting to 1000", c.rateLimiterLimit) + c.rateLimiterLimit = 1000 + } + if c.rateLimiterBurst < 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid rate limiter burst %d, setting to 1", c.rateLimiterBurst) + c.rateLimiterBurst = 1 + } + } + + if c.cacheEnabled { + if c.cacheEntryTTL <= 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid cache entry TTL, setting to 60 minutes") + c.cacheEntryTTL = 60 * 60 + } + if c.cacheCleanInterval <= 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid cache clean interval, setting to 30 minutes") + c.cacheCleanInterval = 30 * 60 + } + if c.cachePersistInterval <= 0 { + logger.Warnf("Reverse DNS Enrichment: Invalid cache persist interval, setting to 30 minutes") + c.cachePersistInterval = 30 * 60 + } + } +} diff --git a/comp/rdnsquerier/impl/ratelimiter.go b/comp/rdnsquerier/impl/ratelimiter.go new file mode 100644 index 00000000000000..0247fe1fa8e89b --- /dev/null +++ b/comp/rdnsquerier/impl/ratelimiter.go @@ -0,0 +1,41 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package rdnsquerierimpl + +import ( + "context" + + "golang.org/x/time/rate" +) + +type rateLimiter interface { + wait(context.Context) error +} + +func newRateLimiter(config *rdnsQuerierConfig) rateLimiter { + if !config.rateLimiterEnabled { + return &rateLimiterNone{} + } + return &rateLimiterImpl{ + limiter: rate.NewLimiter(rate.Limit(config.rateLimiterLimit), config.rateLimiterBurst), + } +} + +// Rate limiter implementation for when rdnsquerier rate limiting is enabled +type rateLimiterImpl struct { + limiter *rate.Limiter +} + +func (r *rateLimiterImpl) wait(ctx context.Context) error { + return r.limiter.Wait(ctx) +} + +// No limit rate limiter for when rdnsquerier rate limiting is disabled +type rateLimiterNone struct{} + +func (r *rateLimiterNone) wait(_ context.Context) error { + return nil +} diff --git a/comp/rdnsquerier/impl/rdnsquerier.go b/comp/rdnsquerier/impl/rdnsquerier.go index d2c36252545ff1..ea0ae0b941391c 100644 --- a/comp/rdnsquerier/impl/rdnsquerier.go +++ b/comp/rdnsquerier/impl/rdnsquerier.go @@ -7,29 +7,26 @@ package rdnsquerierimpl import ( + "context" "net" "net/netip" "sync" + "time" - "context" "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/core/log" + "github.com/DataDog/datadog-agent/comp/core/telemetry" compdef "github.com/DataDog/datadog-agent/comp/def" rdnsquerier "github.com/DataDog/datadog-agent/comp/rdnsquerier/def" rdnsquerierimplnone "github.com/DataDog/datadog-agent/comp/rdnsquerier/impl-none" ) -// TODO add config -const ( - numWorkers = 10 - queryChanSize = 1000 -) - // Requires defines the dependencies for the rdnsquerier component type Requires struct { - Lifecycle compdef.Lifecycle - Config config.Component - Logger log.Component + Lifecycle compdef.Lifecycle + AgentConfig config.Component + Logger log.Component + Telemetry telemetry.Component } // Provides defines the output of the rdnsquerier component @@ -42,30 +39,84 @@ type rdnsQuery struct { updateHostname func(string) } +const moduleName = "reverse_dns_enrichment" + +type rdnsQuerierTelemetry = struct { + total telemetry.Counter + private telemetry.Counter + chanAdded telemetry.Counter + droppedChanFull telemetry.Counter + droppedRateLimiter telemetry.Counter + invalidIPAddress telemetry.Counter + lookupErrNotFound telemetry.Counter + lookupErrTimeout telemetry.Counter + lookupErrTemporary telemetry.Counter + lookupErrOther telemetry.Counter + successful telemetry.Counter +} + type rdnsQuerierImpl struct { - config config.Component - logger log.Component + logger log.Component + config *rdnsQuerierConfig + internalTelemetry *rdnsQuerierTelemetry + + resolver resolver + rateLimiter rateLimiter rdnsQueryChan chan *rdnsQuery - stopChan chan struct{} wg sync.WaitGroup + cancel context.CancelFunc } // NewComponent creates a new rdnsquerier component func NewComponent(reqs Requires) (Provides, error) { - netflowRDNSEnrichmentEnabled := reqs.Config.GetBool("network_devices.netflow.reverse_dns_enrichment_enabled") + config := newConfig(reqs.AgentConfig, reqs.Logger) + reqs.Logger.Infof("Reverse DNS Enrichment config: (enabled=%t workers=%d chan_size=%d rate_limiter.enabled=%t rate_limiter.limit=%d rate_limiter.burst=%d cache.enabled=%t cache.entry_ttl=%d cache.clean_interval=%d cache.persist_interval=%d)", + config.enabled, + config.workers, + config.chanSize, + config.rateLimiterEnabled, + config.rateLimiterLimit, + config.rateLimiterBurst, + config.cacheEnabled, + config.cacheEntryTTL, + config.cacheCleanInterval, + config.cachePersistInterval) + + reqs.Logger.Debugf("Reverse DNS Enrichment debug config: (fake_resolver=%t generate_fake_queries=%t lookup_delay_ms=%d)", + config.fakeResolver, + config.generateFakeQueriesPerSecond, + config.lookupDelayMs) - if !netflowRDNSEnrichmentEnabled { + if !config.enabled { return Provides{ Comp: rdnsquerierimplnone.NewNone().Comp, }, nil } + internalTelemetry := &rdnsQuerierTelemetry{ + reqs.Telemetry.NewCounter(moduleName, "total", []string{}, "Counter measuring the total number of rDNS requests made"), + reqs.Telemetry.NewCounter(moduleName, "private", []string{}, "Counter measuring the number of rDNS requests in the private address space"), + reqs.Telemetry.NewCounter(moduleName, "chan_added", []string{}, "Counter measuring the number of rDNS requests added to the channel"), + reqs.Telemetry.NewCounter(moduleName, "dropped_chan_full", []string{}, "Counter measuring the number of rDNS requests dropped because the channel was full"), + reqs.Telemetry.NewCounter(moduleName, "dropped_rate_limiter", []string{}, "Counter measuring the number of rDNS requests dropped because the rate limiter wait failed"), + reqs.Telemetry.NewCounter(moduleName, "invalid_ip_address", []string{}, "Counter measuring the number of rDNS requests with an invalid IP address"), + reqs.Telemetry.NewCounter(moduleName, "lookup_err_not_found", []string{}, "Counter measuring the number of rDNS lookups that returned a not found error"), + reqs.Telemetry.NewCounter(moduleName, "lookup_err_timeout", []string{}, "Counter measuring the number of rDNS lookups that returned a timeout error"), + reqs.Telemetry.NewCounter(moduleName, "lookup_err_temporary", []string{}, "Counter measuring the number of rDNS lookups that returned a temporary error"), + reqs.Telemetry.NewCounter(moduleName, "lookup_err_other", []string{}, "Counter measuring the number of rDNS lookups that returned error not otherwise classified"), + reqs.Telemetry.NewCounter(moduleName, "successful", []string{}, "Counter measuring the number of successful rDNS requests"), + } + q := &rdnsQuerierImpl{ - config: reqs.Config, - logger: reqs.Logger, - rdnsQueryChan: make(chan *rdnsQuery, queryChanSize), - stopChan: make(chan struct{}), + logger: reqs.Logger, + config: config, + internalTelemetry: internalTelemetry, + + resolver: newResolver(config, reqs.Logger), + rateLimiter: newRateLimiter(config), + + rdnsQueryChan: make(chan *rdnsQuery, config.chanSize), } reqs.Lifecycle.Append(compdef.Hook{ @@ -82,79 +133,125 @@ func NewComponent(reqs Requires) (Provides, error) { // space then a reverse DNS lookup is processed asynchronously. If the lookup is successful then the updateHostname function // will be called asynchronously with the hostname. func (q *rdnsQuerierImpl) GetHostnameAsync(ipAddr []byte, updateHostname func(string)) { + q.internalTelemetry.total.Inc() + ipaddr, ok := netip.AddrFromSlice(ipAddr) if !ok { - q.logger.Tracef("Reverse DNS Enrichment IP address %v is invalid", ipAddr) + q.internalTelemetry.invalidIPAddress.Inc() + q.logger.Debugf("Reverse DNS Enrichment IP address %v is invalid", ipAddr) return } if !ipaddr.IsPrivate() { return } + q.internalTelemetry.private.Inc() - q.rdnsQueryChan <- &rdnsQuery{ + query := &rdnsQuery{ addr: ipaddr.String(), updateHostname: updateHostname, } + + select { + case q.rdnsQueryChan <- query: + q.internalTelemetry.chanAdded.Inc() + default: + q.internalTelemetry.droppedChanFull.Inc() + q.logger.Debugf("Reverse DNS Enrichment channel is full, dropping query for IP address %s", query.addr) + } } -func (q *rdnsQuerierImpl) start(context.Context) error { - for i := 0; i < numWorkers; i++ { +func (q *rdnsQuerierImpl) start(_ context.Context) error { + var ctx context.Context + ctx, q.cancel = context.WithCancel(context.Background()) + for range q.config.workers { q.wg.Add(1) - go q.worker(i) + go q.worker(ctx) + } + q.logger.Infof("Reverse DNS Enrichment started %d rdnsquerier workers", q.config.workers) + + if q.config.generateFakeQueriesPerSecond > 0 { + q.wg.Add(1) + go q.generateFakeQueries(ctx) } - q.logger.Infof("Reverse DNS Enrichment started %d rdnsquerier workers", numWorkers) return nil } func (q *rdnsQuerierImpl) stop(context.Context) error { - close(q.stopChan) + q.cancel() q.wg.Wait() q.logger.Infof("Reverse DNS Enrichment stopped rdnsquerier workers") return nil } -func (q *rdnsQuerierImpl) worker(num int) { +func (q *rdnsQuerierImpl) worker(ctx context.Context) { defer q.wg.Done() for { select { case query := <-q.rdnsQueryChan: - q.logger.Tracef("Reverse DNS Enrichment worker[%d] processing rdnsQuery for IP address %v", num, query.addr) - q.getHostname(query) - case <-q.stopChan: + q.getHostname(ctx, query) + case <-ctx.Done(): + return + } + } +} + +func (q *rdnsQuerierImpl) generateFakeQueries(ctx context.Context) { + defer q.wg.Done() + for { + select { + case <-ctx.Done(): return + case <-time.After(time.Second): + q.logger.Debugf("Reverse DNS Enrichment generating %d fake queries", q.config.generateFakeQueriesPerSecond) + for i := range q.config.generateFakeQueriesPerSecond { + q.GetHostnameAsync( + []byte{192, 168, 1, byte(i)}, + func(hostname string) { + // noop + }, + ) + } } } } -func (q *rdnsQuerierImpl) getHostname(query *rdnsQuery) { - // net.LookupAddr() can return both a non-zero length slice of hostnames and an error, but when - // using the host C library resolver at most one result will be returned. So for now, since - // specifying other DNS resolvers is not supported, if we get an error we know that no valid - // hostname was returned. - hostnames, err := net.LookupAddr(query.addr) +func (q *rdnsQuerierImpl) getHostname(ctx context.Context, query *rdnsQuery) { + err := q.rateLimiter.wait(ctx) + if err != nil { + q.internalTelemetry.droppedRateLimiter.Inc() + q.logger.Debugf("Reverse DNS Enrichment rateLimiter.wait() returned error: %v - dropping query for IP address %s", err, query.addr) + return + } + + hostname, err := q.resolver.lookup(query.addr) if err != nil { if dnsErr, ok := err.(*net.DNSError); ok { if dnsErr.IsNotFound { - q.logger.Tracef("Reverse DNS Enrichment net.LookupAddr returned not found error '%v' for IP address %v", err, query.addr) + q.internalTelemetry.lookupErrNotFound.Inc() + q.logger.Debugf("Reverse DNS Enrichment net.LookupAddr returned not found error '%v' for IP address %v", err, query.addr) + // no match was found for the requested IP address, so call updateHostname() to make the caller aware of that fact + query.updateHostname(hostname) return } if dnsErr.IsTimeout { - q.logger.Tracef("Reverse DNS Enrichment net.LookupAddr returned timeout error '%v' for IP address %v", err, query.addr) + q.internalTelemetry.lookupErrTimeout.Inc() + q.logger.Debugf("Reverse DNS Enrichment net.LookupAddr returned timeout error '%v' for IP address %v", err, query.addr) return } if dnsErr.IsTemporary { - q.logger.Tracef("Reverse DNS Enrichment net.LookupAddr returned temporary error '%v' for IP address %v", err, query.addr) + q.internalTelemetry.lookupErrTemporary.Inc() + q.logger.Debugf("Reverse DNS Enrichment net.LookupAddr returned temporary error '%v' for IP address %v", err, query.addr) return } } - q.logger.Tracef("Reverse DNS Enrichment net.LookupAddr returned unknown error '%v' for IP address %v", err, query.addr) + q.internalTelemetry.lookupErrOther.Inc() + q.logger.Debugf("Reverse DNS Enrichment net.LookupAddr returned error '%v' for IP address %v", err, query.addr) return } - if len(hostnames) > 0 { // if !err then there should be at least one, but just to be safe - query.updateHostname(hostnames[0]) - } + q.internalTelemetry.successful.Inc() + query.updateHostname(hostname) } diff --git a/comp/rdnsquerier/impl/resolver.go b/comp/rdnsquerier/impl/resolver.go new file mode 100644 index 00000000000000..3613588e086f4b --- /dev/null +++ b/comp/rdnsquerier/impl/resolver.go @@ -0,0 +1,66 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package rdnsquerierimpl + +import ( + "fmt" + "net" + "time" + + "github.com/DataDog/datadog-agent/comp/core/log" +) + +type resolver interface { + lookup(string) (string, error) +} + +func newResolver(config *rdnsQuerierConfig, logger log.Component) resolver { + if !config.fakeResolver { + return &resolverFake{ + config: config, + } + } + return &resolverImpl{ + logger: logger, + } +} + +// Resolver implementation for default resolver +type resolverImpl struct { + logger log.Component +} + +func (r *resolverImpl) lookup(addr string) (string, error) { + // net.LookupAddr() can return both a non-zero length slice of hostnames and an error, but when + // using the host C library resolver at most one result will be returned. So for now, since + // specifying other DNS resolvers is not supported, if we get an error we know that no valid + // hostname was returned. + hostnames, err := net.LookupAddr(addr) + + if err != nil { + return "", err + } + + // if !err then there should be at least one, but just to be safe + if len(hostnames) == 0 { + return "", fmt.Errorf("net.LookupAddr returned no hostnames for IP address %v", addr) + } + + return hostnames[0], nil +} + +// Fake resolver for debug and test purposes +type resolverFake struct { + config *rdnsQuerierConfig +} + +func (r *resolverFake) lookup(addr string) (string, error) { + if r.config.lookupDelayMs > 0 { + time.Sleep(time.Duration(r.config.lookupDelayMs) * time.Millisecond) + } + + return "fakehostname-" + addr, nil +} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 860f133c67984a..2d5b0beef64e87 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -415,6 +415,7 @@ func InitConfig(config pkgconfigmodel.Config) { config.SetKnown("network_devices.netflow.aggregator_rollup_tracker_refresh_interval") config.BindEnvAndSetDefault("network_devices.netflow.enabled", "false") bindEnvAndSetLogsConfigKeys(config, "network_devices.netflow.forwarder.") + config.BindEnvAndSetDefault("network_devices.netflow.reverse_dns_enrichment_enabled", false) // Network Path config.BindEnvAndSetDefault("network_path.connections_monitoring.enabled", false) @@ -922,6 +923,17 @@ func InitConfig(config pkgconfigmodel.Config) { // Data Jobs Monitoring config config.BindEnvAndSetDefault("djm_config.enabled", false) + + // Reverse DNS Enrichment + config.BindEnvAndSetDefault("reverse_dns_enrichment.workers", 10) + config.BindEnvAndSetDefault("reverse_dns_enrichment.chan_size", 1000) + config.BindEnvAndSetDefault("reverse_dns_enrichment.rate_limiter.enabled", true) + config.BindEnvAndSetDefault("reverse_dns_enrichment.rate_limiter.limit", 1000) + config.BindEnvAndSetDefault("reverse_dns_enrichment.rate_limiter.burst", 1) + config.BindEnvAndSetDefault("reverse_dns_enrichment.cache.enabled", true) + config.BindEnvAndSetDefault("reverse_dns_enrichment.cache.entry_ttl", 60*60) + config.BindEnvAndSetDefault("reverse_dns_enrichment.cache.clean_interval", 30*60) + config.BindEnvAndSetDefault("reverse_dns_enrichment.cache.persist_interval", 30*60) } func agent(config pkgconfigmodel.Setup) {