Skip to content

Commit

Permalink
fix races in complete and renew lock span checks
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Feb 9, 2024
1 parent bf5ebb1 commit 0804573
Showing 1 changed file with 69 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,36 +151,35 @@ public void sendAndReceive() throws InterruptedException {

List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");
assertClientSpan(completed.get(0), Collections.singletonList(received.get(0)), "ServiceBus.complete", "settle");
assertParentFound(completed.get(0), processed, true);

assertClientSpan(completed.get(1), Collections.singletonList(received.get(1)), "ServiceBus.complete", "settle");
assertParentFound(completed.get(1), processed, true);
assertSettledVsProcessed(completed, processed, messages.size());
}

@Test
public void receiveAndRenewLockWithDuration() throws InterruptedException {
ServiceBusMessage message = new ServiceBusMessage(CONTENTS_BYTES);
StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT);

CountDownLatch processedFound = new CountDownLatch(1);
spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process"));
CountDownLatch processedFound = new CountDownLatch(2);
spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process") || s.getName().equals("ServiceBus.renewMessageLock"));

AtomicReference<ServiceBusReceivedMessage> received = new AtomicReference<>();
StepVerifier.create(receiver.receiveMessages()
.next()
.flatMap(msg -> receiver.renewMessageLock(msg, Duration.ofSeconds(10))
.flatMap(msg -> receiver.renewMessageLock(msg, Duration.ofSeconds(2))
.thenReturn(msg)))
.assertNext(msg -> {
List<ReadableSpan> spans = spanProcessor.getEndedSpans();

List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
assertConsumerSpan(processed.get(0), msg, "ServiceBus.process");

List<ReadableSpan> renewLock = findSpans(spans, "ServiceBus.renewMessageLock");
assertClientSpan(renewLock.get(0), Collections.singletonList(msg), "ServiceBus.renewMessageLock", null);
})
.assertNext(msg -> received.compareAndSet(null, msg))
.expectComplete()
.verify(TIMEOUT);
assertTrue(processedFound.await(20, TimeUnit.SECONDS));
assertTrue(processedFound.await(30, TimeUnit.SECONDS));

List<ReadableSpan> spans = spanProcessor.getEndedSpans();

List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
assertConsumerSpan(processed.get(0), received.get(), "ServiceBus.process");

List<ReadableSpan> renewLock = findSpans(spans, "ServiceBus.renewMessageLock");
assertClientSpan(renewLock.get(0), Collections.singletonList(received.get()), "ServiceBus.renewMessageLock", null);
}

@Test
Expand All @@ -207,8 +206,8 @@ public void receiveAndRenewSessionLockWithDuration() throws InterruptedException

StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT);

CountDownLatch processedFound = new CountDownLatch(1);
spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process"));
CountDownLatch processedFound = new CountDownLatch(2);
spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process") || s.getName().equals("ServiceBus.renewSessionLock"));

AtomicReference<ServiceBusReceivedMessage> received = new AtomicReference<>();
StepVerifier.create(
Expand All @@ -227,7 +226,7 @@ public void receiveAndRenewSessionLockWithDuration() throws InterruptedException
.expectComplete()
.verify(TIMEOUT);

assertTrue(processedFound.await(20, TimeUnit.SECONDS));
assertTrue(processedFound.await(30, TimeUnit.SECONDS));

List<ReadableSpan> spans = spanProcessor.getEndedSpans();

Expand All @@ -253,11 +252,9 @@ public void receiveCheckSubscribe() throws InterruptedException {
CountDownLatch processedFound = new CountDownLatch(2);
spanProcessor.notifyIfCondition(processedFound, s -> s.getName().equals("ServiceBus.process"));

List<ServiceBusReceivedMessage> received = new ArrayList<>();
Disposable subscription = receiver.receiveMessages()
.take(2)
.take(messages.size())
.subscribe(msg -> {
received.add(msg);
String traceparent = (String) msg.getApplicationProperties().get("traceparent");
String traceId = Span.current().getSpanContext().getTraceId();

Expand All @@ -270,11 +267,7 @@ public void receiveCheckSubscribe() throws InterruptedException {
assertTrue(processedFound.await(20, TimeUnit.SECONDS));

List<ReadableSpan> spans = spanProcessor.getEndedSpans();

List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");
assertParentFound(completed.get(0), processed, true);
assertParentFound(completed.get(1), processed, true);
assertSettledVsProcessed(findSpans(spans, "ServiceBus.complete"), findSpans(spans, "ServiceBus.process"), messages.size());
}

@Test
Expand Down Expand Up @@ -324,24 +317,18 @@ public void sendAndReceiveParallelNoAutoCompleteAndLockRenewal() throws Interrup
assertTrue(processedFound.await(20, TimeUnit.SECONDS));

List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");

assertEquals(messageCount, processed.size());
assertEquals(messageCount, completed.size());
for (ReadableSpan c : completed) {
// all completed spans should have a parent, but complete call may start after the parent span has ended
// the last part heavily depends on how above code is written wrt parallelization.
// in the current form (receive -> doonnext (complete) -> parallel)
// complete is called happens synchronously from the receive perspective
// so complete will finish before processing is completed.
//
// if complete call is done differently (e.g. receive -> parallel -> runon -> doonnext (complete))
// then doonnext callbacks become async and may happen after receiver call has completed
// then complete will be a child of process, but will last longer than processing
// TODO (limolkova): this is another good reason to rename receiver's span to delivery instead of processing.
assertParentFound(c, processed, true);
}
// all completed spans should have a parent, but complete call may start after the parent span has ended
// the last part heavily depends on how above code is written wrt parallelization.
// in the current form (receive -> doonnext (complete) -> parallel)
// complete is happens synchronously from the receive perspective
// so complete will finish before processing is completed.
//
// if complete call is done differently (e.g. receive -> parallel -> runon -> doonnext (complete))
// then doonnext callbacks become async and may happen after receiver call has completed
// then complete will be a child of process, but will last longer than processing
// TODO (limolkova): this is another good reason to rename receiver's span to delivery instead of processing.
assertSettledVsProcessed(findSpans(spans, "ServiceBus.complete"), findSpans(spans, "ServiceBus.process"), messageCount);
}

@Test
Expand Down Expand Up @@ -385,18 +372,10 @@ public void sendAndReceiveParallelAutoComplete() throws InterruptedException {
.expectNextCount(messageCount)
.expectComplete()
.verify(TIMEOUT);

assertTrue(processedFound.await(20, TimeUnit.SECONDS));

List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");

assertEquals(messageCount, processed.size());
assertEquals(messageCount, completed.size());
for (ReadableSpan c : completed) {
assertParentFound(c, processed, true);
}
assertSettledVsProcessed(findSpans(spans, "ServiceBus.complete"), findSpans(spans, "ServiceBus.process"), messageCount);
}

@Test
Expand All @@ -409,8 +388,8 @@ public void sendReceiveRenewLockAndDefer() throws InterruptedException {

StepVerifier.create(sender.sendMessage(message)).expectComplete().verify(TIMEOUT);

CountDownLatch latch = new CountDownLatch(2);
spanProcessor.notifyIfCondition(latch, s -> s.getName().equals("ServiceBus.process") && s.getSpanContext().getTraceId().equals(traceId));
CountDownLatch latch = new CountDownLatch(3);
spanProcessor.notifyIfCondition(latch, s -> (s.getName().equals("ServiceBus.process") || s.getName().equals("ServiceBus.renewMessageLock")) && s.getSpanContext().getTraceId().equals(traceId));
toClose(receiver.receiveMessages()
.skipUntil(m -> traceparent.equals(m.getApplicationProperties().get("traceparent")))
.flatMap(m -> receiver.renewMessageLock(m).thenReturn(m))
Expand Down Expand Up @@ -605,7 +584,8 @@ public void sendAndProcessNoAutoComplete() throws InterruptedException {
.collect(Collectors.toList());
assertEquals(1, completed.size());
assertClientProducerSpan(completed.get(0), Collections.singletonList(message), "ServiceBus.complete", "settle");
assertParentFound(completed.get(0), processed, true);

assertSettledVsProcessed(completed, processed, 1);
}

@Test
Expand Down Expand Up @@ -651,14 +631,7 @@ public void sendAndProcessParallel() throws InterruptedException {
processor.stop();

List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");

assertEquals(messageCount, processed.size());
assertEquals(messageCount, completed.size());
for (ReadableSpan c : completed) {
assertParentFound(c, processed, false);
}
assertSettledVsProcessed(findSpans(spans, "ServiceBus.complete"), findSpans(spans, "ServiceBus.process"), messageCount);
}

@Test
Expand Down Expand Up @@ -700,14 +673,7 @@ public void sendAndProcessParallelNoAutoComplete() throws InterruptedException {
processor.stop();

List<ReadableSpan> spans = spanProcessor.getEndedSpans();
List<ReadableSpan> processed = findSpans(spans, "ServiceBus.process");
List<ReadableSpan> completed = findSpans(spans, "ServiceBus.complete");

assertEquals(messageCount, processed.size());
assertEquals(messageCount, completed.size());
for (ReadableSpan c : completed) {
assertParentFound(c, processed, true);
}
assertSettledVsProcessed(findSpans(spans, "ServiceBus.complete"), findSpans(spans, "ServiceBus.process"), messageCount);
}

@Test
Expand Down Expand Up @@ -758,7 +724,7 @@ public void sendProcessAndFail() throws InterruptedException {
.collect(Collectors.toList());
assertEquals(1, abandoned.size());
assertClientProducerSpan(abandoned.get(0), Collections.singletonList(message), "ServiceBus.abandon", "settle");
assertParentFound(abandoned.get(0), processed, true);
assertSettledVsProcessed(abandoned, processed, 1);
}

@Test
Expand Down Expand Up @@ -836,21 +802,40 @@ private void assertParent(ReadableSpan child, ReadableSpan parent) {
assertEquals(child.getParentSpanContext().getSpanId(), parent.getSpanContext().getSpanId());
}

private void assertParentFound(ReadableSpan child, List<ReadableSpan> possibleParents, boolean childEndsBeforeParent) {
boolean hasParentInProcessed = false;
private void assertSettledVsProcessed(List<ReadableSpan> settled, List<ReadableSpan> processed, int expectedCount) {
assertTrue(settled.size() >= expectedCount,
String.format("Expected at least %d completed spans, but found %d", expectedCount, settled.size()));
assertTrue(processed.size() >= expectedCount,
String.format("Expected at least %d processed spans, but found %d", expectedCount, processed.size()));

// there could be more completed spans than processed because completion happens before processing, and we can have more messages in the queue than we expected.
assertTrue(settled.size() >= processed.size(),
String.format("Expected at least as many completed spans as processed spans, but found %d completed and %d processed", settled.size(), processed.size()));

int processedFound = 0;
for (int i = 0; i < settled.size(); i ++) {
ReadableSpan parent = findParent(settled.get(i), processed);
if (parent != null) {
processedFound++;
assertTrue(settled.get(i).getLatencyNanos() <= parent.getLatencyNanos(),
String.format("Expected settled span to have less latency than processed span, but found %d vs %d", settled.get(i).getLatencyNanos(), parent.getLatencyNanos()));
}
}

assertEquals(expectedCount, processedFound);
}

private ReadableSpan findParent(ReadableSpan child, List<ReadableSpan> possibleParents) {
String traceId = child.getParentSpanContext().getTraceId();
String parentId = child.getParentSpanContext().getSpanId();
for (ReadableSpan p : possibleParents) {
hasParentInProcessed |=
child.getParentSpanContext().getTraceId().equals(p.getSpanContext().getTraceId())
&& child.getParentSpanContext().getSpanId().equals(p.getSpanContext().getSpanId());
if (hasParentInProcessed) {
if (childEndsBeforeParent) {
assertTrue(p.getLatencyNanos() >= child.getLatencyNanos());
}
break;
if (traceId.equals(p.getSpanContext().getTraceId())
&& parentId.equals(p.getSpanContext().getSpanId())) {
return p;
}
}

assertTrue(hasParentInProcessed);
return null;
}

private List<ReadableSpan> findSpans(List<ReadableSpan> spans, String spanName) {
Expand Down

0 comments on commit 0804573

Please sign in to comment.