Skip to content

Commit

Permalink
Merge pull request #567 from microsoft/dev
Browse files Browse the repository at this point in the history
Release 1.15.0
  • Loading branch information
tvaron3 committed Apr 18, 2024
2 parents cb8d95a + fd0cbfe commit 53ccdff
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 14 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## Release History
### 1.15.0 (2024-04-18)
#### Key Bug Fixes
* Fixed an issue where using `CosmosDBSinkConnector` in bulk mode failed to write items for container with nested partition key path - [PR 565](https://github.com/microsoft/kafka-connect-cosmosdb/pull/565)

### 1.14.2 (2024-03-12)
#### Key Bug Fixes
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.14.2</version>
<version>1.15.0</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.56.0</version>
<version>4.58.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down
12 changes: 2 additions & 10 deletions src/main/java/com/azure/cosmos/kafka/connect/sink/BulkWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.LinkedHashMap;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -120,16 +120,8 @@ protected SinkWriteResponse writeCore(List<SinkRecord> sinkRecords) {
private PartitionKey getPartitionKeyValue(Object recordValue) {
checkArgument(recordValue instanceof Map, "Argument 'recordValue' is not valid map format.");

//TODO: examine the code here for sub-partition
String partitionKeyPath = StringUtils.join(this.partitionKeyDefinition.getPaths(), "");
Map<String, Object> recordMap = (Map<String, Object>) recordValue;
Object partitionKeyValue = recordMap.get(partitionKeyPath.substring(1));
PartitionKeyInternal partitionKeyInternal = PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), false);

return ImplementationBridgeHelpers
.PartitionKeyHelper
.getPartitionKeyAccessor()
.toPartitionKey(partitionKeyInternal);
return PartitionKey.fromItem(recordMap, this.partitionKeyDefinition);
}

BulkOperationFailedException handleErrorStatusCode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RequestTimeoutException;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
Expand All @@ -16,6 +17,9 @@
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.PartitionKind;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -31,11 +35,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static junit.framework.Assert.assertNotNull;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
Expand All @@ -59,6 +65,7 @@ public void setup(){
PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class);
Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition);
Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/id"));
Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH);

bulkWriter = new BulkWriter(container, MAX_RETRY_COUNT, COMPRESSION_ENABLED);
}
Expand Down Expand Up @@ -207,6 +214,65 @@ public void testBulkWriteFailedWithTransientException() {
assertEquals(HttpConstants.StatusCodes.REQUEST_TIMEOUT, ((CosmosException)response.getFailedRecordResponses().get(0).getException()).getStatusCode());
}

@Test
public void testBulkWriteForContainerWithNestedPartitionKey() {
CosmosContainer containerWithNestedPartitionKey = Mockito.mock(CosmosContainer.class);

CosmosContainerResponse mockedContainerResponse = Mockito.mock(CosmosContainerResponse.class);
Mockito.when(containerWithNestedPartitionKey.read()).thenReturn(mockedContainerResponse);
CosmosContainerProperties mockedContainerProperties = Mockito.mock(CosmosContainerProperties.class);
Mockito.when(mockedContainerResponse.getProperties()).thenReturn(mockedContainerProperties);
PartitionKeyDefinition mockedPartitionKeyDefinition = Mockito.mock(PartitionKeyDefinition.class);
Mockito.when(mockedContainerProperties.getPartitionKeyDefinition()).thenReturn(mockedPartitionKeyDefinition);
Mockito.when(mockedPartitionKeyDefinition.getPaths()).thenReturn(Arrays.asList("/location/city/zipCode"));
Mockito.when(mockedPartitionKeyDefinition.getKind()).thenReturn(PartitionKind.HASH);

BulkWriter testWriter = new BulkWriter(containerWithNestedPartitionKey, MAX_RETRY_COUNT, COMPRESSION_ENABLED);

String itemId = UUID.randomUUID().toString();
String pkValue = "1234";

ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode();
objectNode.put("id", itemId);

ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode();
ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode();
cityNode.put("zipCode", pkValue);
locationNode.put("city", cityNode);
objectNode.put("location", locationNode);

SinkRecord sinkRecord =
new SinkRecord(
TOPIC_NAME,
1,
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING),
objectNode.get("id"),
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP),
Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference<Map<String, Object>>() {}),
0L);

// setup successful item response
List<CosmosBulkOperationResponse<Object>> mockedBulkOperationResponseList = new ArrayList<>();
mockedBulkOperationResponseList.add(mockSuccessfulBulkOperationResponse(sinkRecord, itemId));

ArgumentCaptor<Iterable<CosmosItemOperation>> parameters = ArgumentCaptor.forClass(Iterable.class);
Mockito
.when(containerWithNestedPartitionKey.executeBulkOperations(parameters.capture()))
.thenReturn(() -> mockedBulkOperationResponseList.iterator());

testWriter.write(Arrays.asList(sinkRecord));

Iterator<CosmosItemOperation> bulkExecutionParameters = parameters.getValue().iterator();

assertTrue(bulkExecutionParameters.hasNext());
CosmosItemOperation bulkItemOperation = bulkExecutionParameters.next();
assertNotNull(bulkItemOperation.getPartitionKeyValue());
assertEquals(bulkItemOperation.getPartitionKeyValue(), new PartitionKey(pkValue));

// there should only be 1 operation
assertFalse(bulkExecutionParameters.hasNext());
}

private SinkRecord createSinkRecord(String id) {
Schema stringSchema = new ConnectSchema(Schema.Type.STRING);
Schema mapSchema = new ConnectSchema(Schema.Type.MAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,24 @@
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.kafka.connect.ConnectorTestConfigurations;
import com.azure.cosmos.kafka.connect.sink.BulkWriter;
import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInKeyStrategy;
import com.azure.cosmos.kafka.connect.sink.id.strategy.ProvidedInValueStrategy;
import com.azure.cosmos.kafka.connect.sink.id.strategy.TemplateStrategy;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.ThroughputProperties;
import com.azure.cosmos.util.CosmosPagedIterable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -29,7 +34,9 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.connect.sink.SinkRecord;
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
Expand All @@ -48,10 +55,14 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static junit.framework.Assert.fail;
import static org.apache.kafka.common.utils.Utils.sleep;
import static org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition.Builder;

Expand Down Expand Up @@ -86,7 +97,7 @@ public class SinkConnectorIT {
* Create an embedded Kafka Connect cluster.
*/
@Before
public void before() throws URISyntaxException, IOException {
public void before() throws IOException {

// Load the sink.config.json config file
URL configFileUrl = SinkConnectorIT.class.getClassLoader().getResource("sink.config.json");
Expand Down Expand Up @@ -590,6 +601,60 @@ public void testPostJsonMessageWithTTL() throws InterruptedException, ExecutionE
Assert.assertFalse("Record still in DB", retrievedPerson.isPresent());
}

@Test
public void testBulkWriteForContainerWithNestedPartitionKey() {
// verify bulk writer can create records successfully for container with nested partition key path
CosmosDatabase database = null;
try {
// create a container with nested partition key
database = cosmosClient.getDatabase(UUID.randomUUID().toString());
cosmosClient.createDatabaseIfNotExists(database.getId());

String containerWithNestedPartitionKey = UUID.randomUUID().toString();
cosmosClient
.getDatabase(database.getId())
.createContainerIfNotExists(containerWithNestedPartitionKey, "/location/city/zipCode");
CosmosContainer testContainer = cosmosClient.getDatabase(database.getId()).getContainer(containerWithNestedPartitionKey);

String itemId = UUID.randomUUID().toString();
String pkValue = "1234";

ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode();
objectNode.put("id", itemId);
ObjectNode locationNode = Utils.getSimpleObjectMapper().createObjectNode();
ObjectNode cityNode = Utils.getSimpleObjectMapper().createObjectNode();
cityNode.put("zipCode", pkValue);
locationNode.put("city", cityNode);
objectNode.put("location", locationNode);

SinkRecord sinkRecord =
new SinkRecord(
kafkaTopicJson,
1,
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.STRING),
objectNode.get("id"),
new ConnectSchema(org.apache.kafka.connect.data.Schema.Type.MAP),
Utils.getSimpleObjectMapper().convertValue(objectNode, new TypeReference<Map<String, Object>>() {}),
0L);

BulkWriter testWriter = new BulkWriter(testContainer, 1, false);
testWriter.write(Arrays.asList(sinkRecord));

// verify the item is created successfully
try {
testContainer.readItem(itemId, new PartitionKey(pkValue), ObjectNode.class).getItem();
} catch (Exception e) {
fail("Should be able to read item " + e.getMessage());
}
} finally {
if (cosmosClient != null) {
if (database != null) {
database.delete();
}
}
}
}

/**
* A simple entity to serialize to/deserialize from JSON in tests.
*/
Expand Down

0 comments on commit 53ccdff

Please sign in to comment.