Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding first barebone impl for varadhi consumer. #147

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.consumer.concurrent.Context;
import com.flipkart.varadhi.entities.InternalQueueType;

import java.util.Collection;
Expand All @@ -8,7 +9,20 @@

public interface ConcurrencyControl<T> {

Collection<CompletableFuture<T>> enqueueTasks(InternalQueueType type, Collection<Supplier<CompletableFuture<T>>> tasks);
// TODO: maybe evaluate per task enqueue for CC as well.
Collection<CompletableFuture<T>> enqueueTasks(
InternalQueueType type, Iterable<Supplier<CompletableFuture<T>>> tasks
);

void executePendingTasks();
/**
* @return true, if there are free concurrency slots and some tasks can be executed immediately, if enqueued.
*/
boolean isFree();

/**
* A one time task registration, that will be called when there are free concurrency slots. After executing, the task
* is removed from the registration. Only 1 task can be registered.
* @param task
*/
void onFree(Context.Task task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

@Slf4j
public class ConsumerApiMgr implements ConsumerApi {

private final ConsumersManager consumersManager;
private final ConsumerInfo consumerInfo;


public ConsumerApiMgr(ConsumersManager consumersManager, ConsumerInfo consumerInfo) {
this.consumersManager = consumersManager;
this.consumerInfo = consumerInfo;
Expand All @@ -28,7 +28,7 @@ public CompletableFuture<Void> start(ShardOperation.StartData operation) {
log.info("Consumer: Starting shard {}", operation);
SubscriptionUnitShard shard = operation.getShard();
consumerInfo.addShardCapacity(operation.getSubscriptionId(), shard.getShardId(), shard.getCapacityRequest());
StorageSubscription<StorageTopic> mainSub = shard.getMainSubscription().getSubscriptionToConsume();
StorageSubscription<StorageTopic> mainSub = shard.getMainSubscription().getSubscriptionForConsume();
ConsumptionFailurePolicy failurePolicy =
new ConsumptionFailurePolicy(operation.getRetryPolicy(), shard.getRetrySubscription(),
shard.getDeadLetterSubscription()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.entities.StorageTopic;
import com.flipkart.varadhi.spi.services.ConsumerFactory;
import com.flipkart.varadhi.spi.services.ProducerFactory;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.net.http.HttpClient;

@RequiredArgsConstructor
@Getter
public class ConsumerEnvironment {
private final ProducerFactory<StorageTopic> producerFactory;
private final ConsumerFactory<StorageTopic, Offset> consumerFactory;
private final HttpClient httpClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface ConsumersManager {
* details.
* `shardName` identifies the different shards within the subscription.
*
* @return
* @return Future that will be completed when the consumer is started & ready to consume messages.
*/
CompletableFuture<Void> startSubscription(
String project,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.consumer.concurrent.Context;
import com.flipkart.varadhi.consumer.concurrent.FutureExtensions;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
import com.flipkart.varadhi.spi.services.PolledMessages;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.ExtensionMethod;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* A specialized consumer for RQs. It applies constant delay on all messages based on their produce time.
* Allows changing the delay in runtime.
*
* TODO: allow passing a filter that recognizes some messages as follow-through and not apply delay on them.
*
* @param <O>
*/
@ExtensionMethod({FutureExtensions.class})
public class DelayedConsumer<O extends Offset> implements Consumer<O> {

private final Consumer<O> delegate;
private final Context context;
@Getter
private long delayMs;

private final Batch<O> polledMessages = Batch.emtpyBatch();

public DelayedConsumer(Consumer<O> delegate, Context context, long delayMs) {
this.delegate = delegate;
this.context = context;
this.delayMs = delayMs;
}


public void setDelayMsAsync(long delayMs) {
context.runOnContext(() -> {
this.delayMs = delayMs;
});
}

/**
* only to be called from the context
*
* @return
*/
@Override
public CompletableFuture<PolledMessages<O>> receiveAsync() {
assert context.isInContext();

if (polledMessages.isEmpty()) {
CompletableFuture<PolledMessages<O>> promise = new CompletableFuture<>();
delegate.receiveAsync().whenComplete((messages, t) -> {
if (t != null) {
promise.completeExceptionally(t);
} else {
context.runOnContext(() -> {
polledMessages.add(messages);
getConsumableMsgs().handleCompletion(promise);
});
}
});
return promise;
} else {
return getConsumableMsgs();
}
}

@Override
public CompletableFuture<Void> commitIndividualAsync(PolledMessage<O> message) {
return delegate.commitIndividualAsync(message);
}

@Override
public CompletableFuture<Void> commitCumulativeAsync(PolledMessage<O> message) {
return delegate.commitCumulativeAsync(message);
}

@Override
public void close() throws IOException {
delegate.close();
}

/**
* to be run on context
*
* @return
*/
private CompletableFuture<PolledMessages<O>> getConsumableMsgs() {
assert context.isInContext();

long now = System.currentTimeMillis();
long earliestMessageTs = polledMessages.earliestMessageTimestamp();
if (earliestMessageTs == Long.MAX_VALUE) {
throw new IllegalStateException("this is not expected");
}

long timeLeft = Math.max(0, delayMs - (now - earliestMessageTs));

CompletableFuture<PolledMessages<O>> delayedMsgs = new CompletableFuture<>();
if (timeLeft == 0) {
delayedMsgs.complete(polledMessages.getConsumableMsgs(delayMs));
} else {
context.scheduleOnContext(
() -> delayedMsgs.complete(polledMessages.getConsumableMsgs(delayMs)),
timeLeft,
TimeUnit.MILLISECONDS
);
}
return delayedMsgs;
}

@AllArgsConstructor
static final class DelayedMessages<O extends Offset> {
int arrOffset;
final ArrayList<PolledMessage<O>> messages;

long earliestMessageTimestamp() {
if (arrOffset < messages.size()) {
return messages.get(arrOffset).getProducedTimestampMs();
} else {
return Long.MAX_VALUE;
}
}
}

/**
* Batch of messages per partition.
*/
@AllArgsConstructor
static class Batch<O extends Offset> {
private int total;
private int consumed;
private final Map<Integer, DelayedMessages<O>> delayedMsgs;

static <O extends Offset> Batch<O> emtpyBatch() {
return new Batch<>(0, 0, new HashMap<>());
}

void add(PolledMessages<O> newMsgs) {
if (isEmpty()) {
// reset the fields. nullify the arrays.
clear();
}
for (PolledMessage<O> message : newMsgs) {
delayedMsgs.computeIfAbsent(
message.getPartition(), partition -> new DelayedMessages<>(0, new ArrayList<>())
)
.messages.add(message);
}
total += newMsgs.getCount();
}

void clear() {
total = 0;
consumed = 0;
delayedMsgs.values().forEach(a -> a.messages.clear());
}

boolean isEmpty() {
return consumed >= total;
}

long earliestMessageTimestamp() {
return delayedMsgs.values().stream()
.mapToLong(DelayedMessages::earliestMessageTimestamp)
.min()
.orElse(Long.MAX_VALUE);
}

PolledMessages<O> getConsumableMsgs(long delayMillis) {
long now = System.currentTimeMillis();
ArrayList<PolledMessage<O>> msgs = new ArrayList<>();
for (var m : delayedMsgs.values()) {
while (m.arrOffset < m.messages.size() &&
(m.messages.get(m.arrOffset).getProducedTimestampMs() + delayMillis) <= now) {
msgs.add(m.messages.get(m.arrOffset));
++m.arrOffset;
}
}
return new PolledMessages.ArrayBacked<>(msgs);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.InternalQueueType;
import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Producer;
import lombok.RequiredArgsConstructor;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* Convenient wrapper of a producer to produce a "failed" message to RQs & DLQs. It also allows to produce
* follow through messages that are simply following failures in previous message of the same group.
*/
@RequiredArgsConstructor
public class FailedMsgProducer implements Producer {

public static final String FOLLOW_THROUGH_MSG_HEADER = "varadhi.msg.followed";

private final Producer delegate;

public static boolean isFollowThroughMsg(Message message) {
return message.hasHeader(FOLLOW_THROUGH_MSG_HEADER);
}

@Override
public CompletableFuture<Offset> produceAsync(Message message) {
return delegate.produceAsync(message.withoutHeader(FOLLOW_THROUGH_MSG_HEADER));
}

public CompletableFuture<Offset> produceFollowThroughMsgAsync(Message msg, String previousOffset) {
return delegate.produceAsync(msg.withHeader(FOLLOW_THROUGH_MSG_HEADER, previousOffset));
}

@Override
public void close() throws IOException {
Producer.super.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.flipkart.varadhi.consumer;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.Offset;
import com.flipkart.varadhi.spi.services.Consumer;
import com.flipkart.varadhi.spi.services.PolledMessage;
Expand All @@ -22,8 +21,8 @@
/**
* Message source that maintains ordering among messages of the same groupId.
*/
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
public class GroupedMessageSrc<O extends Offset> implements MessageSrc {

private final ConcurrentHashMap<String, GroupTracker> allGroupedMessages = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -91,6 +90,7 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
private void tryCompletePendingRequest() {
NextMsgsRequest request;
if ((request = pendingRequest.getAndSet(null)) != null) {
// TODO: does it need to be done on the context?
request.result.complete(nextMessagesInternal(request.messages));
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ private class GroupedMessageTracker implements MessageTracker {
private final MessageTracker messageTracker;

@Override
public Message getMessage() {
public PolledMessage<? extends Offset> getMessage() {
return messageTracker.getMessage();
}

Expand Down
Loading
Loading