Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide feature file event provider #678

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6f2d87a
Add new file callback configs
parisni Jul 11, 2023
b95f4e0
Add file callback interface/ dummy impl
parisni Jul 11, 2023
9b81126
Add first callback call
parisni Jul 11, 2023
06057ed
Init callback in constructor
parisni Jul 11, 2023
cdd4a5e
Add kafka configs
parisni Jul 11, 2023
f9c1201
WIP: Add kafka producer
parisni Jul 11, 2023
a9728db
Use MinioContainer as replacement of real S3
parisni Jul 26, 2023
14cfc09
Test callback on embedded kafka w/ string serializer
parisni Jul 26, 2023
e2ac130
Turn value into avro serde
parisni Jul 26, 2023
34ceb09
Build avsc from maven
parisni Jul 28, 2023
f8fa534
Allow to skip errors
parisni Jul 28, 2023
ea1b58e
Implement security
parisni Jul 28, 2023
71eb9f6
Fix style
parisni Jul 28, 2023
4dbe5f6
Add k-connect test and fix impl
parisni Jul 28, 2023
3010b54
Add timebasedpartitioner to get ts
parisni Jul 28, 2023
b1dc388
Document the avro record
parisni Jul 28, 2023
dae74e2
Use avro to serialize key
parisni Aug 17, 2023
cec27e7
Add event datetime
parisni Aug 17, 2023
48f8536
Add logs and fix it
parisni Aug 22, 2023
5379081
Rm useless imports
parisni Aug 25, 2023
a9288c7
Rename callback to file-event
parisni Aug 31, 2023
62ef7dd
Use snake case
parisni Sep 1, 2023
48da579
Separate config and security file event
parisni Sep 1, 2023
b3e0eaa
Revert "Separate config and security file event"
parisni Sep 1, 2023
b854cdf
Fix typo sasl mechanism
parisni Sep 7, 2023
f38d201
Use string serializer for key
parisni Sep 8, 2023
d46be93
Disable auto create topic/registry schema
parisni Sep 8, 2023
d22dc90
Allow to provide custom kafka configs
parisni Sep 11, 2023
3d1a180
Reuse kafka producer at task level
parisni Sep 12, 2023
b5de512
Fix the file event life cycle
parisni Sep 15, 2023
1fd0c65
Fix record counts.
parisni Feb 9, 2024
74c8f2d
Add db/tbl to the output topic
parisni Mar 13, 2024
961852b
Don't fail on unknown properties
parisni Mar 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@
files="(TopicPartitionWriter).java"
/>

<suppress
checks="JavaNCSS"
files="(S3SinkConnectorConfig).java"
/>

</suppressions>
20 changes: 19 additions & 1 deletion kafka-connect-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
<parquet.tools.version>1.11.1</parquet.tools.version>
<formatters.version>0.2.2</formatters.version>
<confluent.connect.testcontainers.version>1.0.1</confluent.connect.testcontainers.version>
<org.testcontainer.version>1.15.0</org.testcontainer.version>
<org.testcontainer.version>1.18.3</org.testcontainer.version>
<maven.failsafe.plugin.version>2.22.1</maven.failsafe.plugin.version>
<maven.avro.plugin.version>1.9.2</maven.avro.plugin.version>
<skipIntegrationTests>false</skipIntegrationTests>
<surefire-junit47.version>2.22.1</surefire-junit47.version>
<guava.version>30.1.1-jre</guava.version>
Expand Down Expand Up @@ -319,6 +320,23 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${maven.avro.plugin.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-main-avro-java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.s3.file.FileEventProvider;
import io.confluent.connect.s3.file.KafkaFileEventProvider;
import io.confluent.connect.storage.common.util.StringUtils;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
Expand Down Expand Up @@ -194,6 +196,19 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
public static final String TOMBSTONE_ENCODED_PARTITION = "tombstone.encoded.partition";
public static final String TOMBSTONE_ENCODED_PARTITION_DEFAULT = "tombstone";

/**
* File event configs
*/
public static final String FILE_EVENT_ENABLE = "s3.file.event.enable";
public static final boolean FILE_EVENT_ENABLE_DEFAULT = false;
public static final String FILE_EVENT_SKIP_ERROR = "s3.file.event.skip.error";
public static final boolean FILE_EVENT_SKIP_ERROR_DEFAULT = false;
public static final String FILE_EVENT_CLASS = "s3.file.event.class";
public static final Class<? extends FileEventProvider> FILE_EVENT_CLASS_DEFAULT =
KafkaFileEventProvider.class;
public static final String FILE_EVENT_CONFIG_JSON = "s3.file.event.config.json";
public static final String FILE_EVENT_CONFIG_JSON_DEFAULT = "{}";

/**
* Append schema name in s3-path
*/
Expand Down Expand Up @@ -774,7 +789,67 @@ public static ConfigDef newConfigDef() {
Width.LONG,
"Elastic buffer initial capacity"
);
}

{
final String group = "File event";
int orderInGroup = 0;

configDef.define(
FILE_EVENT_ENABLE,
Type.BOOLEAN,
FILE_EVENT_ENABLE_DEFAULT,
Importance.LOW,
"Enables the file event to be specified and configured",
group,
++orderInGroup,
Width.LONG,
"Enable s3 file event"
);

configDef.define(
FILE_EVENT_SKIP_ERROR,
Type.BOOLEAN,
FILE_EVENT_SKIP_ERROR_DEFAULT,
Importance.LOW,
"In case of file event error, then raise or fail silently. Default raise an error.",
group,
++orderInGroup,
Width.LONG,
"Fail when s3 file event error"
);

configDef.define(
FILE_EVENT_CLASS,
Type.CLASS,
FILE_EVENT_CLASS_DEFAULT,
new FileEventProviderValidator(),
Importance.LOW,
"File event to push notification for each file written on s3. By default "
+ "the connector uses ``"
+ FILE_EVENT_CLASS_DEFAULT.getSimpleName()
+ "``.",

group,
++orderInGroup,
Width.LONG,
"File event class"
);

configDef.define(
FILE_EVENT_CONFIG_JSON,
Type.STRING,
FILE_EVENT_CONFIG_JSON_DEFAULT,
Importance.LOW,
"File event configuration as json format. "
+ "Mandatory Fields: bootstrap_servers, topic_name, schema_registry_url. "
+ "Custom fields can be added in the \"custom\" field as a map of attribute"
+ "By default an empty json.",
group,
++orderInGroup,
Width.LONG,
"File event config json"
);
}
return configDef;
}
Expand Down Expand Up @@ -953,6 +1028,22 @@ public int getElasticBufferInitCap() {
return getInt(ELASTIC_BUFFER_INIT_CAPACITY);
}

public boolean getFileEventEnable() {
return getBoolean(FILE_EVENT_ENABLE);
}

public boolean getFileEventSkipError() {
return getBoolean(FILE_EVENT_SKIP_ERROR);
}

public Class getFileEventClass() {
return getClass(FILE_EVENT_CLASS);
}

public String getFileEventConfigJson() {
return getString(FILE_EVENT_CONFIG_JSON);
}

public boolean isTombstoneWriteEnabled() {
return OutputWriteBehavior.WRITE.toString().equalsIgnoreCase(nullValueBehavior());
}
Expand Down Expand Up @@ -1189,12 +1280,33 @@ public void ensureValid(String name, Object provider) {
);
}


@Override
public String toString() {
return "Any class implementing: " + AWSCredentialsProvider.class;
}
}

private static class FileEventProviderValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object provider) {
if (provider != null && provider instanceof Class
&& FileEventProvider.class.isAssignableFrom((Class<?>) provider)) {
return;
}
throw new ConfigException(
name,
provider,
"Class must extend: " + FileEventProvider.class
);
}

@Override
public String toString() {
return "Any class implementing: " + FileEventProvider.class;
}
}

private static class SseAlgorithmRecommender implements ConfigDef.Recommender {
@Override
public List<Object> validValues(String name, Map<String, Object> connectorConfigs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.amazonaws.AmazonClientException;
import io.confluent.connect.s3.S3SinkConnectorConfig.OutputWriteBehavior;
import io.confluent.connect.s3.file.FileEventProvider;
import io.confluent.connect.s3.util.TombstoneSupportedPartitioner;
import io.confluent.connect.s3.util.SchemaPartitioner;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -31,10 +32,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
Expand Down Expand Up @@ -64,6 +67,7 @@ public class S3SinkTask extends SinkTask {
private RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
private final Time time;
private ErrantRecordReporter reporter;
private Optional<FileEventProvider> fileEventProvider;

/**
* No-arg constructor. Used by Connect framework.
Expand Down Expand Up @@ -150,10 +154,36 @@ public String version() {

@Override
public void open(Collection<TopicPartition> partitions) {
initFileEventProvider();
for (TopicPartition tp : partitions) {
topicPartitionWriters.put(tp, newTopicPartitionWriter(tp));
}
}
private void initFileEventProvider() {
// Initialize fileEvent if enabled
if (this.connectorConfig.getFileEventEnable()) {
try {
log.info("File event enabled");
if (this.fileEventProvider == null) // only if not yet instanciated
this.fileEventProvider =
Optional.of(
(FileEventProvider)
this.connectorConfig
.getFileEventClass()
.getConstructor(String.class, boolean.class)
.newInstance(
connectorConfig.getFileEventConfigJson(),
connectorConfig.getFileEventSkipError()));
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException e) {
throw new RuntimeException(e);
}
} else {
this.fileEventProvider = Optional.empty();
}
}

@SuppressWarnings("unchecked")
private Format<S3SinkConnectorConfig, String> newFormat(String formatClassConfig)
Expand Down Expand Up @@ -321,6 +351,15 @@ public void close(Collection<TopicPartition> partitions) {
log.error("Error closing writer for {}. Error: {}", tp, e.getMessage());
}
}
this.fileEventProvider.ifPresent(
fc -> {
try {
fc.close();
this.fileEventProvider = null;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
topicPartitionWriters.clear();
}

Expand Down Expand Up @@ -350,6 +389,6 @@ private TopicPartitionWriter newTopicPartitionWriter(TopicPartition tp) {
context,
time,
reporter
);
).withFileEventProvider(fileEventProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.connect.s3;

import com.amazonaws.SdkClientException;
import io.confluent.connect.s3.file.FileEventProvider;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.FileRotationTracker;
import io.confluent.connect.s3.util.RetryUtil;
Expand Down Expand Up @@ -105,6 +106,7 @@ public class TopicPartitionWriter {
private ErrantRecordReporter reporter;

private final FileRotationTracker fileRotationTracker;
private Optional<FileEventProvider> fileCallback = Optional.empty();

public TopicPartitionWriter(TopicPartition tp,
S3Storage storage,
Expand Down Expand Up @@ -189,6 +191,10 @@ public TopicPartitionWriter(TopicPartition tp,
// Initialize scheduled rotation timer if applicable
setNextScheduledRotation();
}
public TopicPartitionWriter withFileEventProvider(Optional<FileEventProvider> fileEventProvider){
this.fileCallback = fileEventProvider;
return this;
}

private enum State {
WRITE_STARTED,
Expand Down Expand Up @@ -636,6 +642,7 @@ private void commitFiles() {
for (Map.Entry<String, String> entry : commitFiles.entrySet()) {
String encodedPartition = entry.getKey();
commitFile(encodedPartition);
callbackFile(encodedPartition);
if (isTaggingEnabled) {
RetryUtil.exponentialBackoffRetry(() -> tagFile(encodedPartition, entry.getValue()),
ConnectException.class,
Expand Down Expand Up @@ -672,6 +679,14 @@ private void commitFile(String encodedPartition) {
}
}

private void callbackFile(String encodedPartition) {
fileCallback.ifPresent(fs -> fs.call(tp.topic(), encodedPartition,
commitFiles.get(encodedPartition), tp.partition(),
new DateTime(baseRecordTimestamp).withZone(timeZone),
new DateTime(currentTimestamp).withZone(timeZone), (recordCounts.get(encodedPartition)).intValue(),
new DateTime(time.milliseconds()).withZone(timeZone)));
}

private void tagFile(String encodedPartition, String s3ObjectPath) {
Long startOffset = startOffsets.get(encodedPartition);
Long endOffset = endOffsets.get(encodedPartition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.s3.file;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import java.io.IOException;
import java.util.Properties;

public abstract class AbstractFileEventConfig {
public static <T extends AbstractFileEventConfig> T fromJsonString(
String jsonContent, Class<T> clazz) {
try {
if (jsonContent == null) {
return clazz.newInstance();
}
ObjectMapper instanceMapper = new ObjectMapper();
instanceMapper.setPropertyNamingStrategy(
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
instanceMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
instanceMapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
instanceMapper.enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_ENUMS);
T value = instanceMapper.readValue(jsonContent, clazz);
value.validateFields();
return value;
} catch (IllegalAccessException | InstantiationException | IOException e) {
throw new RuntimeException(e);
}
}

protected abstract void validateFields() ;

public abstract Properties toProps();

public abstract String toJson();
}
Loading