Skip to content

Commit

Permalink
subscription/shard (de)provision (#153)
Browse files Browse the repository at this point in the history
* subscription shard provision and deprovisioning in create/delete subscriptions. This includes
- main sub, retry topic & sub, dlt topic & sub
- basic shard/partitioning based on qps/throughput limits
- subscription running status controller API
- subscription provisioning status in rest API
Moved internal queue type and topic partition related entities to entities module.
  • Loading branch information
kmrdhruv committed Jun 13, 2024
1 parent 55fda41 commit fc87a1e
Show file tree
Hide file tree
Showing 80 changed files with 1,622 additions and 700 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.InternalQueueType;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;
import com.flipkart.varadhi.entities.TopicPartitions;

import java.util.concurrent.CompletableFuture;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.InternalQueueType;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.flipkart.varadhi.CircularQueue;
import com.flipkart.varadhi.consumer.ConcurrencyControl;
import com.flipkart.varadhi.consumer.InternalQueueType;
import com.flipkart.varadhi.entities.InternalQueueType;
import com.flipkart.varadhi.consumer.concurrent.Context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.spi.services.TopicPartitions;
import com.flipkart.varadhi.entities.TopicPartitions;

import java.util.concurrent.CompletableFuture;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.flipkart.varadhi.consumer.impl;

import com.flipkart.varadhi.consumer.InternalQueueType;
import com.flipkart.varadhi.entities.InternalQueueType;
import com.flipkart.varadhi.consumer.ThresholdProvider;
import com.flipkart.varadhi.consumer.Throttler;
import com.google.common.base.Ticker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.flipkart.varadhi.consumer.impl.ConcurrencyControlImpl;
import com.flipkart.varadhi.consumer.impl.SlidingWindowThresholdProvider;
import com.flipkart.varadhi.consumer.impl.SlidingWindowThrottler;
import com.flipkart.varadhi.entities.InternalQueueType;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.RateLimiter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.flipkart.varadhi.consumer.impl;

import com.flipkart.varadhi.consumer.InternalQueueType;
import com.flipkart.varadhi.entities.InternalQueueType;
import com.flipkart.varadhi.consumer.concurrent.Context;
import com.flipkart.varadhi.consumer.concurrent.CustomThread;
import com.flipkart.varadhi.consumer.concurrent.EventExecutor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.flipkart.varadhi.consumer.impl;

import com.flipkart.varadhi.MockTicker;
import com.flipkart.varadhi.consumer.InternalQueueType;
import com.flipkart.varadhi.entities.InternalQueueType;
import com.google.common.base.Ticker;
import io.vertx.core.impl.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ public CompletableFuture<Void> addConsumerNodes(List<ConsumerNode> clusterConsum
.thenAccept(v -> shardAssigner.addConsumerNodes(clusterConsumers));
}

@Override
public CompletableFuture<SubscriptionStatus> getSubscriptionStatus(String subscriptionId, String requestedBy) {
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
});
}

private CompletableFuture<SubscriptionStatus> getSubscriptionStatus(VaradhiSubscription subscription) {
String subId = subscription.getName();
List<Assignment> assignments = shardAssigner.getSubscriptionAssignment(subId);
Expand All @@ -62,6 +73,7 @@ private SubscriptionStatus getSubscriptionStatusFromShardStatus(
return new SubscriptionStatus(subscription.getName(), state);
}


/**
* Start the subscription
* TODO::It makes single attempt to start the Subscription, on failure Operation is marked as failed and no
Expand All @@ -75,7 +87,10 @@ public CompletableFuture<SubscriptionOperation> startSubscription(
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
// If not temporary, then alternate needs to be provided to allow recovery from this.
throw new IllegalStateException("Unable to determine subscription status, try again after sometime.");
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
}).thenApply(ss -> {
if (ss.getState() == SubscriptionState.RUNNING || ss.getState() == SubscriptionState.STARTING) {
throw new InvalidOperationForResourceException("Subscription is already running or starting.");
Expand Down Expand Up @@ -150,7 +165,10 @@ public CompletableFuture<SubscriptionOperation> stopSubscription(
) {
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
throw new IllegalStateException("Unable to determine subscription status, try again after sometime.");
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
}).thenApply(ss -> {
if (ss.getState() == SubscriptionState.STOPPED) {
throw new InvalidOperationForResourceException("Subscription is already stopped.");
Expand Down
14 changes: 0 additions & 14 deletions core/src/main/java/com/flipkart/varadhi/core/TopicService.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ public VaradhiTopic get(Project project, TopicResource topicResource) {
return vt;
}


private void planDeployment(Project project, VaradhiTopic varadhiTopic) {
StorageTopic storageTopic =
topicFactory.getTopic(varadhiTopic.getName(), project, varadhiTopic.getCapacityPolicy());
topicFactory.getTopic(
varadhiTopic.getName(), project, varadhiTopic.getCapacity(), InternalQueueCategory.MAIN);
InternalCompositeTopic internalTopic = new InternalCompositeTopic(
deploymentRegion,
TopicState.Producing,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,51 @@

import java.util.List;


// TODO::This need to move to server.services
@Slf4j
public class VaradhiTopicService implements TopicService<VaradhiTopic> {
public class VaradhiTopicService {

private final StorageTopicService topicService;
private final StorageTopicService<StorageTopic> topicService;
private final MetaStore metaStore;

public VaradhiTopicService(
StorageTopicService serviceFactory,
StorageTopicService<StorageTopic> storageTopicService,
MetaStore metaStore
) {
this.topicService = serviceFactory;
this.topicService = storageTopicService;
this.metaStore = metaStore;
}

@Override
public void create(VaradhiTopic varadhiTopic, Project project) {
log.info("Creating Varadhi topic {}", varadhiTopic.getName());
varadhiTopic.getInternalTopics().forEach((kind, internalTopic) ->
{
StorageTopic storageTopic = internalTopic.getStorageTopic();
if (topicService.exists(storageTopic.getName())) {
log.warn("Specified StorageTopic({}:{}) already exists.", project.getName(), storageTopic.getName());
} else {
topicService.create(storageTopic, project);
}
// StorageTopicService.create() to ensure if pre-existing topi can be re-used.
// i.e. topic creation at storage level need to be idempotent.
topicService.create(storageTopic, project);
}
);
metaStore.createTopic(varadhiTopic);
}

@Override
public VaradhiTopic get(String topicName) {
return metaStore.getTopic(topicName);
}

@Override
public void delete(String varadhiTopicName) {
log.info("Deleting Varadhi topic {}", varadhiTopicName);
/*TODO : delete namespace, tenant also if the only Topic in the namespace+tenant is deleted / cleanup independent of delete
* check for existing subscriptions before deleting the topic
/* TODO : delete namespace, tenant also if the only Topic in the namespace+tenant is deleted / cleanup independent of delete
*/
VaradhiTopic varadhiTopic = metaStore.getTopic(varadhiTopicName);
String projectName = varadhiTopic.getProjectName();
Project project = metaStore.getProject(projectName);
validateDelete(varadhiTopicName);
varadhiTopic.getInternalTopics().forEach((kind, internalTopic) ->
{
StorageTopic storageTopic = internalTopic.getStorageTopic();
if (topicService.exists(storageTopic.getName())) {
topicService.delete(storageTopic.getName());
topicService.delete(storageTopic.getName(), project);
} else {
log.warn("Specified StorageTopic({}) does not exist.", storageTopic.getName());
}
Expand All @@ -80,7 +76,6 @@ private void validateDelete(String varadhiTopicName) {
});
}

@Override
public boolean exists(String topicName) {
return metaStore.checkTopicExists(topicName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.flipkart.varadhi.core.cluster;

import com.flipkart.varadhi.entities.VaradhiSubscription;
import com.flipkart.varadhi.entities.cluster.ShardOperation;
import com.flipkart.varadhi.entities.cluster.SubscriptionOperation;
import com.flipkart.varadhi.entities.cluster.SubscriptionStatus;

import java.util.concurrent.CompletableFuture;

Expand All @@ -14,6 +16,8 @@
public interface ControllerApi {
String ROUTE_CONTROLLER = "controller";

CompletableFuture<SubscriptionStatus> getSubscriptionStatus(String subscriptionId, String requestedBy);

CompletableFuture<SubscriptionOperation> startSubscription(String subscriptionId, String requestedBy);

CompletableFuture<SubscriptionOperation> stopSubscription(String subscriptionId, String requestedBy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import static org.mockito.Mockito.*;

public class VaradhiTopicFactoryTest {
private final String region = "local";
private final String topicName = "testTopic";
private VaradhiTopicFactory varadhiTopicFactory;
private StorageTopicFactory<StorageTopic> storageTopicFactory;
private Project project;
private String region = "local";
private String vTopicName;
private String topicName = "testTopic";

@BeforeEach
public void setUp() {
Expand All @@ -26,14 +26,15 @@ public void setUp() {
vTopicName = String.format("%s.%s", project.getName(), topicName);
String pTopicName =
String.format("persistent://%s/%s", project.getOrg(), vTopicName);
CapacityPolicy capacityPolicy = CapacityPolicy.getDefault();
PulsarStorageTopic pTopic = PulsarStorageTopic.from(pTopicName, capacityPolicy);
doReturn(pTopic).when(storageTopicFactory).getTopic(vTopicName, project, capacityPolicy);
TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault();
PulsarStorageTopic pTopic = PulsarStorageTopic.from(pTopicName, 1, capacityPolicy);
doReturn(pTopic).when(storageTopicFactory)
.getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN);
}

@Test
public void getTopic() {
CapacityPolicy capacityPolicy = CapacityPolicy.getDefault();
TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault();
TopicResource topicResource = new TopicResource(
topicName,
1,
Expand All @@ -48,12 +49,12 @@ public void getTopic() {
Assertions.assertEquals(it.getTopicState(), TopicState.Producing);
Assertions.assertEquals(it.getTopicRegion(), region);
Assertions.assertNotNull(st);
verify(storageTopicFactory, times(1)).getTopic(vTopicName, project, capacityPolicy);
verify(storageTopicFactory, times(1)).getTopic(vTopicName, project, capacityPolicy, InternalQueueCategory.MAIN);
}

@Test
public void getTopicWithDefaultCapacity() {
CapacityPolicy capacityPolicy = CapacityPolicy.getDefault();
TopicCapacityPolicy capacityPolicy = TopicCapacityPolicy.getDefault();
TopicResource topicResource = new TopicResource(
topicName,
1,
Expand All @@ -64,7 +65,7 @@ public void getTopicWithDefaultCapacity() {
VaradhiTopic varadhiTopic = varadhiTopicFactory.get(project, topicResource);
InternalCompositeTopic it = varadhiTopic.getProduceTopicForRegion(region);
PulsarStorageTopic pt = (PulsarStorageTopic) it.getStorageTopic();
Assertions.assertEquals(capacityPolicy.getMaxThroughputKBps(), pt.getMaxThroughputKBps());
Assertions.assertEquals(capacityPolicy.getMaxQPS(), pt.getMaxQPS());
Assertions.assertEquals(capacityPolicy.getThroughputKBps(), pt.getCapacity().getThroughputKBps());
Assertions.assertEquals(capacityPolicy.getQps(), pt.getCapacity().getQps());
}
}
Loading

0 comments on commit fc87a1e

Please sign in to comment.