Skip to content

Commit

Permalink
Add rdnsquurier config, rate limiter, resolver and internal telemetry.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmw51798 committed Jul 1, 2024
1 parent 1248ec4 commit feb20d0
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 45 deletions.
4 changes: 2 additions & 2 deletions comp/netflow/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func newServer(lc fx.Lifecycle, deps dependencies) (provides, error) {
// it with a noop implementation.
rdnsQuerier := deps.RDNSQuerier
if conf.ReverseDNSEnrichmentEnabled {
deps.Logger.Debugf("NetFlow Reverse DNS Enrichment enabled")
deps.Logger.Infof("Reverse DNS Enrichment is enabled for NDM NetFlow")
} else {
rdnsQuerier = rdnsquerierimplnone.NewNone().Comp
deps.Logger.Debugf("NetFlow Reverse DNS Enrichment disabled")
deps.Logger.Infof("Reverse DNS Enrichment is disabled for NDM NetFlow")
}

flowAgg := flowaggregator.NewFlowAggregator(sender, deps.Forwarder, conf, deps.Hostname.GetSafe(context.Background()), deps.Logger, rdnsQuerier)
Expand Down
103 changes: 103 additions & 0 deletions comp/rdnsquerier/impl/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package rdnsquerierimpl

import (
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log"
)

type rdnsQuerierConfig struct {
enabled bool
workers int
chanSize int

rateLimiterEnabled bool
rateLimiterLimit int
rateLimiterBurst int

cacheEnabled bool
cacheEntryTTL int
cacheCleanInterval int
cachePersistInterval int

// debug - TODO remove
fakeResolver bool
generateFakeQueriesPerSecond int
lookupDelayMs int
}

func newConfig(agentConfig config.Component, logger log.Component) *rdnsQuerierConfig {
netflowRDNSEnrichmentEnabled := agentConfig.GetBool("network_devices.netflow.reverse_dns_enrichment_enabled")

c := &rdnsQuerierConfig{
enabled: netflowRDNSEnrichmentEnabled,
workers: agentConfig.GetInt("reverse_dns_enrichment.workers"),
chanSize: agentConfig.GetInt("reverse_dns_enrichment.chan_size"),

rateLimiterEnabled: agentConfig.GetBool("reverse_dns_enrichment.rate_limiter.enabled"),
rateLimiterLimit: agentConfig.GetInt("reverse_dns_enrichment.rate_limiter.limit"),
rateLimiterBurst: agentConfig.GetInt("reverse_dns_enrichment.rate_limiter.burst"),

cacheEnabled: agentConfig.GetBool("reverse_dns_enrichment.cache.enabled"),
cacheEntryTTL: agentConfig.GetInt("reverse_dns_enrichment.cache.entry_ttl"),
cacheCleanInterval: agentConfig.GetInt("reverse_dns_enrichment.cache.clean_interval"),
cachePersistInterval: agentConfig.GetInt("reverse_dns_enrichment.cache.persist_interval"),

fakeResolver: agentConfig.GetBool("reverse_dns_enrichment.debug.fake_resolver"),
generateFakeQueriesPerSecond: agentConfig.GetInt("reverse_dns_enrichment.generate_fake_queries_per_second"),
lookupDelayMs: agentConfig.GetInt("reverse_dns_enrichment.lookup_delay_ms"),
}

c.validateConfig(logger)

return c
}

func (c *rdnsQuerierConfig) validateConfig(logger log.Component) {
if c.enabled {
logger.Infof("Reverse DNS Enrichment component is enabled")
} else {
logger.Infof("Reverse DNS Enrichment component is disabled")
return
}

if c.workers <= 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid number of workers %d, setting to 1", c.workers)
c.workers = 1
}

if c.chanSize < 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid channel size %d, setting to 0 (unbuffered)", c.chanSize)
c.chanSize = 0
}

if c.rateLimiterEnabled {
if c.rateLimiterLimit <= 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid rate limiter limit %d, setting to 1000", c.rateLimiterLimit)
c.rateLimiterLimit = 1000
}
if c.rateLimiterBurst < 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid rate limiter burst %d, setting to 1", c.rateLimiterBurst)
c.rateLimiterBurst = 1
}
}

if c.cacheEnabled {
if c.cacheEntryTTL <= 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid cache entry TTL, setting to 60 minutes")
c.cacheEntryTTL = 60 * 60
}
if c.cacheCleanInterval <= 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid cache clean interval, setting to 30 minutes")
c.cacheCleanInterval = 30 * 60
}
if c.cachePersistInterval <= 0 {
logger.Warnf("Reverse DNS Enrichment: Invalid cache persist interval, setting to 30 minutes")
c.cachePersistInterval = 30 * 60
}
}
}
41 changes: 41 additions & 0 deletions comp/rdnsquerier/impl/ratelimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package rdnsquerierimpl

import (
"context"

"golang.org/x/time/rate"
)

type rateLimiter interface {
wait(context.Context) error
}

func newRateLimiter(config *rdnsQuerierConfig) rateLimiter {
if !config.rateLimiterEnabled {
return &rateLimiterNone{}
}
return &rateLimiterImpl{
limiter: rate.NewLimiter(rate.Limit(config.rateLimiterLimit), config.rateLimiterBurst),
}
}

// Rate limiter implementation for when rdnsquerier rate limiting is enabled
type rateLimiterImpl struct {
limiter *rate.Limiter
}

func (r *rateLimiterImpl) wait(ctx context.Context) error {
return r.limiter.Wait(ctx)
}

// No limit rate limiter for when rdnsquerier rate limiting is disabled
type rateLimiterNone struct{}

func (r *rateLimiterNone) wait(_ context.Context) error {
return nil
}
Loading

0 comments on commit feb20d0

Please sign in to comment.