Skip to content

Commit

Permalink
Allow to provide custom kafka configs
Browse files Browse the repository at this point in the history
  • Loading branch information
parisni committed Sep 12, 2023
1 parent d46be93 commit d22dc90
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ public static ConfigDef newConfigDef() {
Importance.LOW,
"File event configuration as json format. "
+ "Mandatory Fields: bootstrap_servers, topic_name, schema_registry_url. "
+ "Optional fields: sasl_mechanism, security_protocol, sasl_jaas_config. "
+ "Custom fields can be added in the \"custom\" field as a map of attribute"
+ "By default an empty json.",
group,
++orderInGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,33 @@

package io.confluent.connect.s3.file;

import java.util.Map;
import java.util.Properties;

public class KafkaFileEventConfig extends AbstractFileEventConfig {

Map<String, Object> custom;
private static final String KEY_SERIALIZER =
"org.apache.kafka.common.serialization.StringSerializer";
private static final String VALUE_SERIALIZER =
"io.confluent.kafka.serializers.KafkaAvroSerializer";

private String topicName;
private String bootstrapServers;
private String securityProtocol;
private String schemaRegistryUrl;
private String saslMechanism;
private String saslJaasConfig;

/** empty constructor for jackson */
public KafkaFileEventConfig() {
}
public KafkaFileEventConfig() {}

public KafkaFileEventConfig(
String topicName,
String bootstrapServers,
String schemaRegistryUrl,
String securityProtocol,
String saslMechanism,
String saslJaasConfig) {
Map<String, Object> custom) {
this.topicName = topicName;
this.bootstrapServers = bootstrapServers;
this.schemaRegistryUrl = schemaRegistryUrl;
this.securityProtocol = securityProtocol;
this.saslMechanism = saslMechanism;
this.saslJaasConfig = saslJaasConfig;
this.custom = custom;
}

@Override
Expand All @@ -64,16 +58,15 @@ public String toJson() {
sb.append("\"topic_name\": \"").append(topicName).append('"');
sb.append(", \"bootstrap_servers\": \"").append(bootstrapServers).append('"');
sb.append(", \"schema_registry_url\": \"").append(schemaRegistryUrl).append('"');
if (securityProtocol != null) {
sb.append(", \"security_protocol\": \"").append(securityProtocol).append('"');
}
if (saslMechanism != null) {
sb.append(", \"sasl_mechanism\": \"").append(saslMechanism).append('"');
}
if (saslJaasConfig != null) {
sb.append(", \"sasl_jaas_config\": \"").append(saslJaasConfig).append('"');
sb.append(", \"custom\": {");
String customIncrement = "";
for (Map.Entry<String, Object> custom : custom.entrySet()) {
sb.append(
String.format(
"%s \"%s\": \"%s\"", customIncrement, custom.getKey(), custom.getValue().toString()));
customIncrement = ",";
}
sb.append('}');
sb.append("}}");
return sb.toString();
}

Expand All @@ -82,21 +75,13 @@ public Properties toProps() {
Properties prop = new Properties();
prop.setProperty("key.serializer", KEY_SERIALIZER);
prop.setProperty("value.serializer", VALUE_SERIALIZER);
prop.setProperty("use.latest.version", "true");
prop.setProperty("auto.register.schemas", "false");
// mandatory
prop.setProperty("bootstrap.servers", bootstrapServers);
prop.setProperty("topic.name", topicName);
prop.setProperty("schema.registry.url", schemaRegistryUrl);
// optional
if (saslMechanism != null) {
prop.setProperty("sasl.mechanism", saslMechanism);
}
if (securityProtocol != null) {
prop.setProperty("security.protocol", securityProtocol);
}
if (saslJaasConfig != null) {
prop.setProperty("sasl.jaas.config", saslJaasConfig);
// custom
for (Map.Entry<String, Object> custom : custom.entrySet()) {
prop.setProperty(custom.getKey(), custom.getValue().toString());
}
return prop;
}
Expand All @@ -113,15 +98,7 @@ public String getBootstrapServers() {
return bootstrapServers;
}

public String getSecurityProtocol() {
return securityProtocol;
}

public String getSaslMechanism() {
return saslMechanism;
}

public String getSaslJaasConfig() {
return saslJaasConfig;
public Map<String, Object> getCustom() {
return custom;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,28 @@ public class S3SinkFileEventIT extends BaseConnectorIT {
private static final String CONNECTOR_NAME = "s3-sink";
private static final String DEFAULT_TEST_TOPIC_NAME = "TestTopic";

private static final List<String> KAFKA_TOPICS = Collections.singletonList(DEFAULT_TEST_TOPIC_NAME);
private static final List<String> KAFKA_TOPICS =
Collections.singletonList(DEFAULT_TEST_TOPIC_NAME);

private JsonConverter jsonConverter;
// custom producer to enable sending records with headers
private Producer<byte[], byte[]> producer;
private Map<String, Object> autoCreate =
new HashMap<String, Object>() {
{
put("auto.register.schemas", "true");
put("auto.create.topics.enable", "true");
}
};
;

@Before
public void before() throws InterruptedException {
initializeJsonConverter();
initializeCustomProducer();
setupProperties();
waitForSchemaRegistryToStart();
//add class specific props
// add class specific props
props.put(SinkConnectorConfig.TOPICS_CONFIG, String.join(",", KAFKA_TOPICS));
props.put(FLUSH_SIZE_CONFIG, Integer.toString(FLUSH_SIZE_STANDARD));
props.put(FORMAT_CLASS_CONFIG, AvroFormat.class.getName());
Expand All @@ -94,7 +103,9 @@ public void before() throws InterruptedException {
// file event
props.put(FILE_EVENT_ENABLE, "true");
// TimeBasedPartitioner
props.put(PartitionerConfig.PARTITIONER_CLASS_CONFIG, "io.confluent.connect.storage.partitioner.TimeBasedPartitioner");
props.put(
PartitionerConfig.PARTITIONER_CLASS_CONFIG,
"io.confluent.connect.storage.partitioner.TimeBasedPartitioner");
props.put(PartitionerConfig.PARTITION_DURATION_MS_CONFIG, "100");
props.put(PartitionerConfig.PATH_FORMAT_CONFIG, "'event_date'=YYYY-MM-dd/'event_hour'=HH");
props.put(PartitionerConfig.LOCALE_CONFIG, "FR_fr");
Expand All @@ -113,21 +124,18 @@ public void after() throws Exception {
waitForFilesInBucket(TEST_BUCKET_NAME, 0);
}


@Test
public void testBasicRecordsWrittenParquetAndRelatedFileEvents() throws Throwable {
// add test specific props
props.put(FORMAT_CLASS_CONFIG, ParquetFormat.class.getName());
String topicFileEvent = "TopicFileEvent";
props.put(
FILE_EVENT_CONFIG_JSON,
FILE_EVENT_CONFIG_JSON,
new KafkaFileEventConfig(
topicFileEvent,
connect.kafka().bootstrapServers(),
restApp.restServer.getURI().toString(),
null,
null,
null)
this.autoCreate)
.toJson());
connect.kafka().createTopic(topicFileEvent);
testBasicRecordsWrittenAndRelatedFileEvents(PARQUET_EXTENSION, topicFileEvent);
Expand All @@ -139,51 +147,62 @@ public void testFileEventPartition() {
String fileEventTopic = "file_event_topic";
connect.kafka().createTopic(fileEventTopic);
KafkaFileEventConfig kafkaFileEventConfig =
new KafkaFileEventConfig(
fileEventTopic,
bootstrapServers,
restApp.restServer.getURI().toString(),
null,
null,
null);
new KafkaFileEventConfig(
fileEventTopic,
bootstrapServers,
restApp.restServer.getURI().toString(),
this.autoCreate);
KafkaFileEventProvider fileEvent =
new KafkaFileEventProvider(kafkaFileEventConfig.toJson(), false);
fileEvent.call("baz-topic", "version/event/hour", "file1.avro", 12,
new DateTime(1234L), new DateTime(123L),
34, new DateTime(1234L).withZone(DateTimeZone.UTC));
fileEvent.call("foo-topic", "version/event/hour", "fil2.avro", 8,
new DateTime(12345L), new DateTime(1234L), 12, new DateTime(12345L));
new KafkaFileEventProvider(kafkaFileEventConfig.toJson(), false);
fileEvent.call(
"baz-topic",
"version/event/hour",
"file1.avro",
12,
new DateTime(1234L),
new DateTime(123L),
34,
new DateTime(1234L).withZone(DateTimeZone.UTC));
fileEvent.call(
"foo-topic",
"version/event/hour",
"fil2.avro",
8,
new DateTime(12345L),
new DateTime(1234L),
12,
new DateTime(12345L));

// fails if two records are not present in kafka within 1s
connect.kafka().consume(2, 1000L, fileEventTopic);
}
/**
* Test that the expected records are written for a given file extension
* Optionally, test that topics which have "*.{expectedFileExtension}*" in them are processed
* and written.
* Test that the expected records are written for a given file extension Optionally, test that
* topics which have "*.{expectedFileExtension}*" in them are processed and written.
*
* @param expectedFileExtension The file extension to test against
* @param fileEventTopic The fileEvent topic name
* @throws Throwable
*/
private void testBasicRecordsWrittenAndRelatedFileEvents(
String expectedFileExtension,
String fileEventTopic
) throws Throwable {
String expectedFileExtension, String fileEventTopic) throws Throwable {
// Add an extra topic with this extension inside of the name
// Use a TreeSet for test determinism
Set<String> topicNames = new TreeSet<>(KAFKA_TOPICS);

// start sink connector
connect.configureConnector(CONNECTOR_NAME, props);
// wait for tasks to spin up
EmbeddedConnectUtils.waitForConnectorToStart(connect, CONNECTOR_NAME, Math.min(topicNames.size(), MAX_TASKS));
EmbeddedConnectUtils.waitForConnectorToStart(
connect, CONNECTOR_NAME, Math.min(topicNames.size(), MAX_TASKS));

Schema recordValueSchema = getSampleStructSchema();
Struct recordValueStruct = getSampleStructVal(recordValueSchema);

for (String thisTopicName : topicNames) {
// Create and send records to Kafka using the topic name in the current 'thisTopicName'
SinkRecord sampleRecord = getSampleTopicRecord(thisTopicName, recordValueSchema, recordValueStruct);
SinkRecord sampleRecord =
getSampleTopicRecord(thisTopicName, recordValueSchema, recordValueStruct);
produceRecordsNoHeaders(NUM_RECORDS_INSERT, sampleRecord);
}

Expand All @@ -194,13 +213,13 @@ private void testBasicRecordsWrittenAndRelatedFileEvents(

Set<String> expectedTopicFilenames = new TreeSet<>();
for (String thisTopicName : topicNames) {
List<String> theseFiles = getExpectedFilenames(
List<String> theseFiles =
getExpectedFilenames(
thisTopicName,
TOPIC_PARTITION,
FLUSH_SIZE_STANDARD,
NUM_RECORDS_INSERT,
expectedFileExtension
);
expectedFileExtension);
assertEquals(theseFiles.size(), countPerTopic);
expectedTopicFilenames.addAll(theseFiles);
}
Expand All @@ -221,21 +240,22 @@ private void produceRecords(
SinkRecord record,
boolean withKey,
boolean withValue,
boolean withHeaders
) throws ExecutionException, InterruptedException {
boolean withHeaders)
throws ExecutionException, InterruptedException {
byte[] kafkaKey = null;
byte[] kafkaValue = null;
Iterable<Header> headers = Collections.emptyList();
if (withKey) {
kafkaKey = jsonConverter.fromConnectData(topic, Schema.STRING_SCHEMA, record.key());
}
if (withValue) {
kafkaValue = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
kafkaValue =
jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
}
if (withHeaders) {
headers = sampleHeaders();
}
ProducerRecord<byte[],byte[]> producerRecord =
ProducerRecord<byte[], byte[]> producerRecord =
new ProducerRecord<>(topic, TOPIC_PARTITION, kafkaKey, kafkaValue, headers);
for (long i = 0; i < recordCount; i++) {
producer.send(producerRecord).get();
Expand All @@ -253,9 +273,11 @@ private void initializeJsonConverter() {
private void initializeCustomProducer() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
producerProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
producerProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
producer = new KafkaProducer<>(producerProps);
}
Expand All @@ -270,5 +292,4 @@ private void setupProperties() {
// aws credential if exists
props.putAll(getAWSCredentialFromPath());
}

}

0 comments on commit d22dc90

Please sign in to comment.