Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller #155

Merged
merged 42 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f571f2c
messagechannel.send() implementation
kmrdhruv Feb 13, 2024
fc704e9
Merge branch 'master' into controller
kmrdhruv Feb 27, 2024
b021412
moving cluster related details to server module
kmrdhruv Feb 27, 2024
edcd600
merge
kmrdhruv Feb 28, 2024
59cc57a
Merge branch 'master' into controller
kmrdhruv Mar 1, 2024
f5d484c
- refactoring needed for controller bootstrapping
kmrdhruv Mar 7, 2024
f243139
Merge branch 'master' into controller
kmrdhruv Mar 7, 2024
f498775
add string override to channel method
kmrdhruv Mar 7, 2024
c0d9a5e
fix commented test and remove obselete message handler
kmrdhruv Mar 7, 2024
0c7b269
Merge branch 'master' into controller
kmrdhruv Mar 18, 2024
1fd93f8
review comments
kmrdhruv Mar 19, 2024
1f43d2e
avoid http port conflict in for server deployment in UTs
kmrdhruv Mar 19, 2024
597df03
Merge branch 'master' into controller
kmrdhruv Mar 19, 2024
ccfed68
cluster manager and refactoring
kmrdhruv Mar 21, 2024
249d957
fixing TODOs, Router tests and implementation of getAllMembers() in c…
kmrdhruv Mar 23, 2024
8d849a3
Merge branch 'master' into controller
kmrdhruv Apr 3, 2024
48d28b2
Merge branch 'master' into controller
kmrdhruv Apr 3, 2024
4caa516
replacing Component with Verticle
kmrdhruv Apr 3, 2024
08ad83f
removing hard coded 127.0.0.1
kmrdhruv Apr 4, 2024
a97b531
class name fix in gradle file
kmrdhruv Apr 18, 2024
61b1175
Merge branch 'master' into controller
kmrdhruv Apr 20, 2024
615c422
StartSubscription operation and Consumer verticle bootsrap code flow.
kmrdhruv May 9, 2024
f978900
Merge branch 'master' into controller
kmrdhruv May 9, 2024
9fa7a9f
adding persistence to shard assignment and operations.
kmrdhruv May 17, 2024
50c96b1
Merge branch 'master' into controller
kmrdhruv May 17, 2024
19a1363
pr fixes and improvements
kmrdhruv May 18, 2024
68bc1b3
fixing subscription failing test
kmrdhruv May 18, 2024
caf9082
Merge branch 'master' into controller
kmrdhruv May 27, 2024
094a279
Merge branch 'master' into controller
kmrdhruv May 27, 2024
16211de
serializing the shard assignment for all, subscription operation exe…
kmrdhruv May 30, 2024
c210b37
Merge branch 'master' into controller
kmrdhruv May 31, 2024
ae4e310
Merge branch 'master' into controller
kmrdhruv Jun 10, 2024
778c57e
subscription shard provision and deprovisioning in create/delete subs…
kmrdhruv Jun 12, 2024
26088d9
Refactoring for
kmrdhruv Jun 13, 2024
60e3cc6
Merge branch 'master' into controller
kmrdhruv Jun 13, 2024
a562715
Adding noarg constructor to make vertx deserialization work.
kmrdhruv Jun 14, 2024
c303e17
minor fixes and add info traces for operation progress.
kmrdhruv Jun 14, 2024
c447933
consumer node restart handling
kmrdhruv Jun 19, 2024
621db35
Merge branch 'master' into controller
kmrdhruv Jun 19, 2024
abffc3d
adding exception handling to operation and update executions
kmrdhruv Jun 20, 2024
5685d37
adding controller config
kmrdhruv Jun 20, 2024
7d88e6a
fixes and refactoring
kmrdhruv Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/java/com/flipkart/varadhi/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class Constants {
public static final String USER_ID_HEADER = "x_user_id";

public static final TopicCapacityPolicy DefaultTopicCapacity = new TopicCapacityPolicy(100, 400, 2);
public static String SYSTEM_IDENTITY = "System";

public static class PathParams {
public static final String PATH_PARAM_ORG = "org";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@
@Slf4j
public class ConsumerApiMgr implements ConsumerApi {
private final ConsumersManager consumersManager;
private final ConsumerInfo consumerInfo;

public ConsumerApiMgr(ConsumersManager consumersManager) {

public ConsumerApiMgr(ConsumersManager consumersManager, ConsumerInfo consumerInfo) {
this.consumersManager = consumersManager;
this.consumerInfo = consumerInfo;
}

@Override
public CompletableFuture<Void> start(ShardOperation.StartData operation) {
log.info("Consumer: Starting shard {}", operation);
VaradhiSubscription subscription = operation.getSubscription();
SubscriptionUnitShard shard = operation.getShard();
consumerInfo.recordShardAssignment(subscription.getName(), shard.getShardId(), shard.getCapacityRequest());
StorageSubscription<StorageTopic> mainSub = shard.getMainSubscription().getSubscriptionToConsume();
ConsumptionFailurePolicy failurePolicy =
new ConsumptionFailurePolicy(subscription.getRetryPolicy(), shard.getRetrySubscription(),
Expand All @@ -47,6 +51,8 @@ public CompletableFuture<Void> start(ShardOperation.StartData operation) {
@Override
public CompletableFuture<Void> stop(ShardOperation.StopData operation) {
VaradhiSubscription subscription = operation.getSubscription();
SubscriptionUnitShard shard = operation.getShard();
consumerInfo.purgeShardAssignment(subscription.getName(), shard.getShardId(), shard.getCapacityRequest());
return consumersManager.stopSubscription(
subscription.getName(),
operation.getShardId()
Expand All @@ -60,7 +66,6 @@ public CompletableFuture<ShardStatus> getShardStatus(String subscriptionId, int

@Override
public CompletableFuture<ConsumerInfo> getConsumerInfo() {
//TODO::Return assignments as well.
return CompletableFuture.completedFuture(consumersManager.getInfo());
return CompletableFuture.completedFuture(consumerInfo);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -37,6 +36,4 @@ CompletableFuture<Void> startSubscription(
ConsumerState getConsumerState(String subscription, int shardId);

// TODO likely need status on the starting / stopping as well; as the above status is for a running consumer..

ConsumerInfo getInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,10 @@
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.entities.*;
import com.flipkart.varadhi.entities.cluster.ConsumerInfo;
import com.flipkart.varadhi.entities.TopicPartitions;

import java.util.concurrent.CompletableFuture;

public class ConsumersManagerImpl implements ConsumersManager {
private final ConsumerInfo consumerInfo;

public ConsumersManagerImpl(ConsumerInfo consumerInfo) {
this.consumerInfo = consumerInfo;
}

@Override
public CompletableFuture<Void> startSubscription(
Expand Down Expand Up @@ -44,9 +37,4 @@ public void resumeSubscription(String subscription, int shardId) {
public ConsumerState getConsumerState(String subscription, int shardId) {
return null;
}

@Override
public ConsumerInfo getInfo() {
return consumerInfo;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.flipkart.varadhi.controller;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.flipkart.varadhi.controller.config.ControllerConfig;
import com.flipkart.varadhi.entities.cluster.*;
import com.flipkart.varadhi.core.cluster.*;
import com.flipkart.varadhi.entities.SubscriptionShards;
Expand All @@ -15,6 +17,8 @@
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;

import static com.flipkart.varadhi.Constants.SYSTEM_IDENTITY;

@Slf4j
public class ControllerApiMgr implements ControllerApi {
private final ShardAssigner shardAssigner;
Expand All @@ -23,28 +27,29 @@ public class ControllerApiMgr implements ControllerApi {
private final OperationMgr operationMgr;

public ControllerApiMgr(
ConsumerClientFactory consumerClientFactory, MetaStoreProvider metaStoreProvider,
ControllerConfig config, ConsumerClientFactory consumerClientFactory, MetaStoreProvider metaStoreProvider,
MeterRegistry meterRegistry
) {
this.consumerClientFactory = consumerClientFactory;
this.shardAssigner = new ShardAssigner(metaStoreProvider.getAssignmentStore(), meterRegistry);
this.metaStore = metaStoreProvider.getMetaStore();
this.operationMgr = new OperationMgr(metaStoreProvider.getOpStore());
this.operationMgr = new OperationMgr(config, metaStoreProvider.getOpStore());
}

public CompletableFuture<Void> addConsumerNodes(List<ConsumerNode> clusterConsumers) {
return CompletableFuture.allOf(clusterConsumers.stream()
.map(cc -> getConsumerInfo(cc.getConsumerId()).thenAccept(
cc::updateWithConsumerInfo)).toArray(CompletableFuture[]::new))
.thenAccept(v -> shardAssigner.addConsumerNodes(clusterConsumers));
cc::initFromConsumerInfo)).toArray(CompletableFuture[]::new))
.thenCompose(v -> shardAssigner.addConsumerNodes(clusterConsumers));
}

@Override
public CompletableFuture<SubscriptionStatus> getSubscriptionStatus(String subscriptionId, String requestedBy) {
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
String.format(
"Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
});
Expand Down Expand Up @@ -84,32 +89,31 @@ private SubscriptionStatus getSubscriptionStatusFromShardStatus(
public CompletableFuture<SubscriptionOperation> startSubscription(
String subscriptionId, String requestedBy
) {
//TODO:: Fix it -assignment failure is not failing the start op. Task failure in the operation mgr queue.
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
// If not temporary, then alternate needs to be provided to allow recovery from this.
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
String.format(
"Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
}).thenApply(ss -> {
if (ss.getState() == SubscriptionState.RUNNING || ss.getState() == SubscriptionState.STARTING) {
throw new InvalidOperationForResourceException("Subscription is already running or starting.");
}
log.info("Starting the Subscription: {}", subscriptionId);
// operationMgr is not expected to create a subOp and throw, so failure is not handled here.
// TODO:: fix this w.r.to failure in getOrCreateShardAssignment or its chain
return operationMgr.requestSubStart(
subscriptionId, requestedBy, subOp -> getOrCreateShardAssignment(subscription).thenCompose(
assignments -> startShards(subOp, subscription, assignments)));
SubscriptionOperation operation = SubscriptionOperation.startOp(subscriptionId, requestedBy);
operationMgr.createAndEnqueue(operation, subOp -> getOrCreateShardAssignment(subscription).thenCompose(
assignments -> startShards((SubscriptionOperation) subOp, subscription, assignments)));
return operation;
});
}

private CompletableFuture<List<Assignment>> getOrCreateShardAssignment(VaradhiSubscription subscription) {
List<Assignment> assignedShards = shardAssigner.getSubscriptionAssignment(subscription.getName());
if (assignedShards.isEmpty()) {
List<SubscriptionUnitShard> unAssigned = getSubscriptionShards(subscription.getShards());
return shardAssigner.assignShard(unAssigned, subscription);
return shardAssigner.assignShard(unAssigned, subscription, new HashSet<>());
} else {
log.info(
"{} Shards for Subscription {} are already assigned.", assignedShards.size(),
Expand Down Expand Up @@ -145,13 +149,16 @@ private CompletableFuture<Void> startShard(
// IsAssigned is started|starting|errored.
// Stopping isn't considered as assigned, as start/stop shouldn't be running in parallel.
if (!shardStatus.isAssigned()) {
ShardOperation shardOp = operationMgr.requestShardStart(subOpId, shard, subscription);
return consumer.start((ShardOperation.StartData) shardOp.getOpData()).whenComplete((v, t) -> {
if (t != null) {
markShardOpFailed(shardOp, t);
} else {
log.info("Scheduled shard start({}).", shardOp);
}
ShardOperation startOp = ShardOperation.startOp(subOpId, shard, subscription);
return operationMgr.createAndExecute(startOp, op -> {
ShardOperation shardOp = (ShardOperation) op;
return consumer.start((ShardOperation.StartData) shardOp.getOpData()).whenComplete((v, t) -> {
if (t != null) {
markShardOpFailed(shardOp, t);
} else {
log.info("Scheduled shard start({}).", shardOp);
}
});
});
} else {
log.info("Subscription:{} Shard:{} is already started. Skipping.", subId, shardId);
Expand All @@ -167,17 +174,18 @@ public CompletableFuture<SubscriptionOperation> stopSubscription(
VaradhiSubscription subscription = metaStore.getSubscription(subscriptionId);
return getSubscriptionStatus(subscription).exceptionally(t -> {
throw new IllegalStateException(
String.format("Failure in getting subscription status, try again after sometime. %s",
String.format(
"Failure in getting subscription status, try again after sometime. %s",
t.getMessage()
));
}).thenApply(ss -> {
if (ss.getState() == SubscriptionState.STOPPED) {
throw new InvalidOperationForResourceException("Subscription is already stopped.");
}
log.info("Stopping the Subscription: {}", subscriptionId);
// operationMgr is not expected to create a subOp and throw, so failure is not handled here.
// TODO:: fix this w.r.to failure in stopShards
return operationMgr.requestSubStop(subscriptionId, requestedBy, subOp -> stopShards(subOp, subscription));
SubscriptionOperation operation = SubscriptionOperation.stopOp(subscriptionId, requestedBy);
operationMgr.createAndEnqueue(operation, subOp -> stopShards((SubscriptionOperation) subOp, subscription));
return operation;
});
}

Expand All @@ -187,7 +195,7 @@ private CompletableFuture<Void> stopShards(SubscriptionOperation subOp, VaradhiS
SubscriptionShards shards = subscription.getShards();
List<Assignment> assignments = shardAssigner.getSubscriptionAssignment(subId);
log.info(
"Found {} assigned Shards for the Subscription:{} with Shards:{}.", assignments.size(),
"Found {} assigned Shards for the Subscription:{} with total {} Shards.", assignments.size(),
subId, shards.getShardCount()
);
CompletableFuture<Void> future = CompletableFuture.allOf(assignments.stream()
Expand All @@ -196,7 +204,7 @@ private CompletableFuture<Void> stopShards(SubscriptionOperation subOp, VaradhiS
)).toArray(CompletableFuture[]::new)).exceptionally(t -> {
markSubOpFailed(subOp, t);
return null;
}).thenCompose(v -> shardAssigner.unAssignShard(assignments, subscription));
}).thenCompose(v -> shardAssigner.unAssignShard(assignments, subscription, true));
log.info("Scheduled Stop on {} shards for SubOp({}).", shards.getShardCount(), subOp.getData());
return future;
}
Expand All @@ -209,13 +217,16 @@ private CompletableFuture<Void> stopShard(
ConsumerApi consumer = getAssignedConsumer(assignment);
return consumer.getShardStatus(subId, shardId).thenCompose(shardStatus -> {
if (!shardStatus.isAssigned()) {
ShardOperation shardOp = operationMgr.requestShardStop(subOpId, shard, subscription);
return consumer.stop((ShardOperation.StopData) shardOp.getOpData()).whenComplete((v, t) -> {
if (t != null) {
markShardOpFailed(shardOp, t);
} else {
log.info("Scheduled shard stop({}).", shardOp);
}
ShardOperation stopOp = ShardOperation.stopOp(subOpId, shard, subscription);
return operationMgr.createAndExecute(stopOp, op -> {
ShardOperation shardOp = (ShardOperation) op;
return consumer.stop((ShardOperation.StopData) shardOp.getOpData()).whenComplete((v, t) -> {
if (t != null) {
markShardOpFailed(shardOp, t);
} else {
log.info("Scheduled shard stop({}).", shardOp);
}
});
});
} else {
log.info("Subscription:{} Shard:{} is already Stopped. Skipping.", subId, shardId);
Expand Down Expand Up @@ -262,14 +273,31 @@ public CompletableFuture<Void> update(ShardOperation.OpData opData) {
}

public CompletableFuture<Void> consumerNodeLeft(String consumerNodeId) {
shardAssigner.consumerNodeLeft(consumerNodeId);
return CompletableFuture.completedFuture(null);
log.info("ConsumerNode {} left the cluster.", consumerNodeId);
return shardAssigner.consumerNodeLeft(consumerNodeId).thenAccept((v) -> {
List<Assignment> assignments = shardAssigner.getConsumerNodeAssignment(consumerNodeId);
assignments.forEach(assignment -> {
log.info("Assignment {} needs to be re-assigned", assignment);
SubscriptionOperation operation = SubscriptionOperation.reAssignShardOp(assignment, SYSTEM_IDENTITY);
operationMgr.createAndEnqueue(operation, subOp -> reAssignShard((SubscriptionOperation) subOp));
});
});
}

private CompletableFuture<Void> reAssignShard(SubscriptionOperation subOp) {
//TODO:: failure recovery of re-assign needs to be taken care of.
SubscriptionOperation.ReassignShardData data = (SubscriptionOperation.ReassignShardData) subOp.getData();
Assignment currentAssignment = data.getAssignment();
VaradhiSubscription subscription = metaStore.getSubscription(currentAssignment.getSubscriptionId());
SubscriptionUnitShard shard = subscription.getShards().getShard(currentAssignment.getShardId());
return shardAssigner.reAssignShard(currentAssignment, subscription, false).thenCompose(a ->
startShard(subOp.getId(), a, shard, subscription));
}

public CompletableFuture<Void> consumerNodeJoined(ConsumerNode consumerNode) {
return getConsumerInfo(consumerNode.getConsumerId()).thenAccept(ci -> {
consumerNode.updateWithConsumerInfo(ci);
shardAssigner.consumerNodeJoined(consumerNode);
return getConsumerInfo(consumerNode.getConsumerId()).thenCompose(ci -> {
consumerNode.initFromConsumerInfo(ci);
return shardAssigner.consumerNodeJoined(consumerNode);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.flipkart.varadhi.controller;

import com.flipkart.varadhi.entities.cluster.Operation;

import java.util.concurrent.CompletableFuture;

public interface OpExecutor<T extends Operation> {
CompletableFuture<Void> execute(T operation);
}
Loading
Loading