Skip to content

Commit

Permalink
Encode decimal values as numeric rather than base64 (#46)
Browse files Browse the repository at this point in the history
Currently incoming `BigDecimal` values (e.g. from avro decoding) are
encoded as base64 strings (the default) by the JsonConverter. Instead,
it seems to make more sense to encode them as numeric values when
publishing to Datadog.

I considered making this configurable, but it seems unlikely that users
would want the current behavior since it makes the data much less
scruitable in Datadog. If we find this not to be the case (through an
issue report) we can add a configuration option later. As it stands,
I think it is a bug.

Fixes: #27

Signed-off-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jszwedko committed Apr 24, 2024
1 parent f9ca534 commit 8ad004d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<url>https://www.datadoghq.com/</url>
</organization>
<name>datadog-kafka-connect-logs</name>
<version>1.1.2</version>
<version>1.1.3-SNAPSHOT</version>
<description>A Kafka Connect Connector that sends Kafka Connect records as logs to the Datadog API.</description>
<url>https://github.com/DataDog/datadog-kafka-connect-logs</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) {
this.config = config;
this.batches = new HashMap<>();
this.jsonConverter = new JsonConverter();
jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false);

Map<String,String> jsonConverterConfig = new HashMap<String,String>();
jsonConverterConfig.put("schemas.enable", "false");
jsonConverterConfig.put("decimal.format", "NUMERIC");

jsonConverter.configure(jsonConverterConfig, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
import com.datadoghq.connect.logs.util.Project;

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -22,6 +24,8 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.math.BigDecimal;
import java.math.BigInteger;

public class DatadogLogsApiWriterTest {
private Map<String, String> props;
Expand Down Expand Up @@ -65,6 +69,22 @@ public void writer_givenConfigs_sendsPOSTToURL() throws IOException {
Assert.assertTrue(request.getHeaders().contains("User-Agent:datadog-kafka-connect-logs/" + Project.getVersion()));
}

@Test
public void writer_handles_bigDecimal() throws IOException {
DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 500, props);
DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config);

Schema schema = Decimal.schema(2);
BigDecimal value = new BigDecimal(new BigInteger("156"), 2);

records.add(new SinkRecord("someTopic", 0, null, "someKey", schema, value, 0));
writer.write(records);

Assert.assertEquals(1, restHelper.getCapturedRequests().size());
RequestInfo request = restHelper.getCapturedRequests().get(0);
Assert.assertEquals("[{\"message\":1.56,\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic\"}]", request.getBody());
}

@Test
public void writer_batchAtMax_shouldSendBatched() throws IOException {
DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props);
Expand Down

0 comments on commit 8ad004d

Please sign in to comment.