Skip to content

Commit

Permalink
adding first barebone impl for varadhi consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravAshok committed May 31, 2024
1 parent 16e60ef commit 2078352
Show file tree
Hide file tree
Showing 22 changed files with 598 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

public interface ConcurrencyControl<T> {

Collection<CompletableFuture<T>> enqueueTasks(InternalQueueType type, Collection<Supplier<CompletableFuture<T>>> tasks);
// TODO: maybe evaluate per task enqueue for CC as well.
Collection<CompletableFuture<T>> enqueueTasks(
InternalQueueType type, Collection<Supplier<CompletableFuture<T>>> tasks
);

void executePendingTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface ConsumersManager {
* details.
* `shardName` identifies the different shards within the subscription.
*
* @return
* @return Future that will be completed when the consumer is started & ready to consume messages.
*/
CompletableFuture<Void> startSubscription(
Project project,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
Expand All @@ -22,8 +21,8 @@
/**
* Message source that maintains ordering among messages of the same groupId.
*/
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
public class GroupedMessageSrc<O extends Offset> implements MessageSrc {

private final ConcurrentHashMap<String, GroupTracker> allGroupedMessages = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -91,6 +90,7 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
private void tryCompletePendingRequest() {
NextMsgsRequest request;
if ((request = pendingRequest.getAndSet(null)) != null) {
// TODO: does it need to be done on the context?
request.result.complete(nextMessagesInternal(request.messages));
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private class GroupedMessageTracker implements MessageTracker {
private final MessageTracker messageTracker;

@Override
public Message getMessage() {
public PolledMessage<? extends Offset> getMessage() {
return messageTracker.getMessage();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,98 @@

public abstract sealed class InternalQueueType {

abstract InternalQueueCategory getCategory();
public abstract InternalQueueCategory getCategory();


final public static class Main extends InternalQueueType {

public static final Main MAIN = new Main();

@Override
InternalQueueCategory getCategory() {
public InternalQueueCategory getCategory() {
return InternalQueueCategory.MAIN;
}

public boolean equals(final Object o) {
if (o == this) {
return true;
}
return o instanceof Main;
}

public int hashCode() {
return Main.class.hashCode();
}
}

@Getter
@RequiredArgsConstructor
final public static class Retry extends InternalQueueType {

public static final Retry[] RETRY = new Retry[]{
new Retry(1), new Retry(2), new Retry(3),
new Retry(4), new Retry(5), new Retry(6),
new Retry(7), new Retry(8), new Retry(9),
};

private final int retryCount;

@Override
InternalQueueCategory getCategory() {
public InternalQueueCategory getCategory() {
return InternalQueueCategory.RETRY;
}

public boolean equals(final Object o) {
if (o == this) {
return true;
}
if (!(o instanceof Retry other)) {
return false;
}
return this.getRetryCount() == other.getRetryCount();
}

public int hashCode() {
final int PRIME = 59;
int result = Retry.class.hashCode();
result = result * PRIME + this.getRetryCount();
return result;
}
}

final public static class DeadLetter extends InternalQueueType {

public static final DeadLetter DEADLETTER = new DeadLetter();

@Override
InternalQueueCategory getCategory() {
public InternalQueueCategory getCategory() {
return InternalQueueCategory.DEAD_LETTER;
}

public boolean equals(final Object o) {
if (o == this) {
return true;
}
return o instanceof DeadLetter;
}

public int hashCode() {
return DeadLetter.class.hashCode();
}
}

public static Main mainType() {
return Main.MAIN;
}

public static DeadLetter deadLetterType() {
return DeadLetter.DEADLETTER;
}

public static Retry retryType(int retryCount) {
if (retryCount < 1 || retryCount > 9) {
throw new IllegalArgumentException("Invalid retry count: " + retryCount);
}
return Retry.RETRY[retryCount - 1];
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.PolledMessage;

public interface MessageTracker {

Message getMessage();
PolledMessage<? extends Offset> getMessage();

default String getGroupId() {
Message msg = getMessage();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
Expand All @@ -18,7 +17,7 @@ public PolledMessageTracker(Consumer<O> committer, PolledMessage<O> message) {
}

@Override
public Message getMessage() {
public PolledMessage<O> getMessage() {
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public class Context {

private static final FastThreadLocal<Context> currentThreadCtx = new FastThreadLocal<>();

private final EventExecutor executor;
@Getter(lombok.AccessLevel.PACKAGE)
final EventExecutor executor;

public interface Task extends Runnable {
Context getContext();
}

/**
* Update the current thread's context to this context.
*/
public void updateCurrentThreadContext() {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CustomThread) {
Expand All @@ -25,12 +28,48 @@ public void updateCurrentThreadContext() {
}
}

public static Context getCurrentTheadContext() {
public static Context getCurrentThreadContext() {
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CustomThread) {
return ((CustomThread) currentThread).getContext();
} else {
return currentThreadCtx.get();
}
}

public boolean isInContext() {
return Thread.currentThread() == executor.getThread();
}

public Task wrap(Runnable runnable) {
return new Task() {
@Override
public Context getContext() {
return Context.this;
}

@Override
public void run() {
runnable.run();
}
};
}

/**
* It makes sure that the task is run on the thread bound to the context. If already on the context thread, it
* runs the task directly.
*
* @param runnable
*/
public void executeOnContext(Runnable runnable) {
if (isInContext()) {
runnable.run();
} else {
executor.execute(wrap(runnable));
}
}

public void execute(Task task) {
executor.execute(task);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,28 @@ EventExecutorGroup getParent() {
return parent;
}

CustomThread getThread() {
return thread;
}

@Override
public void execute(Runnable command) {
// TODO: review all cases here. As of now, here we are assuming that all tasks are running from custom threads,
// which is most likely false.

if (command instanceof Context.Task) {
EventExecutor boundExecutor = ((Context.Task) command).getContext().getExecutor();
if (command instanceof Context.Task task) {

// TODO: maybe move this check to context.execute()
EventExecutor boundExecutor = task.getContext().getExecutor();
if (boundExecutor != null && boundExecutor != this) {
throw new IllegalStateException(
"task is tied to an executor:" + boundExecutor + ", but is being executed on:" + this);
}
taskQueue.add((Context.Task) command);
taskQueue.add(task);
return;
}

taskQueue.add(new WrappedTask(Context.getCurrentTheadContext(), command));
taskQueue.add(new WrappedTask(Context.getCurrentThreadContext(), command));
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -126,6 +129,8 @@ public int getPendingCount() {
* Otherwise, pending tasks may sit idle forever.
* 3. If there is no task running at the moment to schedule any pending task, then we should schedule it regardless.
*
* This method can run on any arbitrary thread.
*
* @param result
* @param ex
*/
Expand All @@ -141,7 +146,7 @@ private void onTaskCompletion(T result, Throwable ex) {
}

if (scheduleRequired) {
context.getExecutor().execute(() -> {
context.executeOnContext(() -> {
executePendingTasks();
schedulePendingTaskCounter.decrementAndGet();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
import com.flipkart.varadhi.consumer.ConsumerState;
import com.flipkart.varadhi.consumer.ConsumersManager;
import com.flipkart.varadhi.consumer.ConsumptionFailurePolicy;
import com.flipkart.varadhi.consumer.VaradhiConsumer;
import com.flipkart.varadhi.entities.ConsumptionPolicy;
import com.flipkart.varadhi.entities.Endpoint;
import com.flipkart.varadhi.entities.Project;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.spi.services.TopicPartitions;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

public class ConsumersManagerImpl implements ConsumersManager {

private final Map<ShardId, ConsumerHolder> consumers = new ConcurrentHashMap<>();


public ConsumersManagerImpl() {
}
Expand All @@ -23,7 +28,16 @@ public CompletableFuture<Void> startSubscription(
boolean grouped, Endpoint endpoint, ConsumptionPolicy consumptionPolicy,
ConsumptionFailurePolicy failurePolicy
) {
return CompletableFuture.completedFuture(null);
ShardId id = new ShardId(subscription, shardName);
ConsumerHolder prev = consumers.putIfAbsent(id, new ConsumerHolder());
if (prev != null) {
throw new IllegalArgumentException(
"Consumer already exists for subscription: " + subscription + " shard: " + shardName);
}
ConsumerHolder newConsumer = consumers.get(id);
newConsumer.consumer = null;

return null;
}

@Override
Expand All @@ -45,4 +59,11 @@ public void resumeSubscription(String subscription, String shardName) {
public ConsumerState getConsumerState(String subscription, String shardName) {
return null;
}

record ShardId(String subscription, String shard) {
}

static class ConsumerHolder {
private VaradhiConsumer consumer;
}
}
Loading

0 comments on commit 2078352

Please sign in to comment.