From fd61adbeaad0bc69bad3196b75ab0b89d2887524 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 23 Oct 2020 21:49:36 -0700 Subject: [PATCH 01/10] Cleanup retryer and retry options --- .../common/{Retryer.java => RpcRetryer.java} | 27 ++++++++---- .../common/WorkflowExecutionUtils.java | 16 ++----- .../GenericWorkflowClientExternalImpl.java | 38 +++++++---------- .../ManualActivityCompletionClientImpl.java | 14 ++----- .../internal/replay/ReplayDecider.java | 4 +- .../replay/ReplayDecisionTaskHandler.java | 6 +-- .../sync/POJOActivityTaskHandler.java | 9 ++-- .../internal/worker/ActivityTaskHandler.java | 10 +---- .../internal/worker/ActivityWorker.java | 39 ++++------------- .../internal/worker/DecisionTaskHandler.java | 12 +----- .../internal/worker/SingleWorkerOptions.java | 42 ------------------- .../internal/worker/WorkflowWorker.java | 26 ++---------- .../java/com/uber/cadence/worker/Worker.java | 6 --- .../uber/cadence/worker/WorkerOptions.java | 28 ------------- .../cadence/internal/common/RetryerTest.java | 10 ++--- 15 files changed, 67 insertions(+), 220 deletions(-) rename src/main/java/com/uber/cadence/internal/common/{Retryer.java => RpcRetryer.java} (90%) diff --git a/src/main/java/com/uber/cadence/internal/common/Retryer.java b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java similarity index 90% rename from src/main/java/com/uber/cadence/internal/common/Retryer.java rename to src/main/java/com/uber/cadence/internal/common/RpcRetryer.java index a2d7e0a9a..e75ce3b32 100644 --- a/src/main/java/com/uber/cadence/internal/common/Retryer.java +++ b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java @@ -19,7 +19,13 @@ import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap; -import com.uber.cadence.*; +import com.uber.cadence.BadRequestError; +import com.uber.cadence.CancellationAlreadyRequestedError; +import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; +import com.uber.cadence.EntityNotExistsError; +import com.uber.cadence.QueryFailedError; +import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.common.RetryOptions; import java.time.Duration; import java.util.concurrent.CancellationException; @@ -31,8 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class Retryer { - public static final RetryOptions DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; +public final class RpcRetryer { + public static final RetryOptions DEFAULT_RPC_RETRY_OPTIONS; private static final Duration RETRY_SERVICE_OPERATION_INITIAL_INTERVAL = Duration.ofMillis(20); private static final Duration RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL = Duration.ofMinutes(1); @@ -57,8 +63,9 @@ public final class Retryer { DomainAlreadyExistsError.class, QueryFailedError.class, DomainNotActiveError.class, - CancellationAlreadyRequestedError.class); - DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults(); + CancellationAlreadyRequestedError.class, + DomainNotActiveError.class); + DEFAULT_RPC_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults(); } public interface RetryableProc { @@ -81,7 +88,7 @@ private static class ValueExceptionPair { private final CompletableFuture value; private final Throwable exception; - public ValueExceptionPair(CompletableFuture value, Throwable exception) { + ValueExceptionPair(CompletableFuture value, Throwable exception) { this.value = value; this.exception = exception; } @@ -95,7 +102,7 @@ public Throwable getException() { } } - private static final Logger log = LoggerFactory.getLogger(Retryer.class); + private static final Logger log = LoggerFactory.getLogger(RpcRetryer.class); public static void retry(RetryOptions options, RetryableProc r) throws T { @@ -107,6 +114,10 @@ public static void retry(RetryOptions options, RetryablePr }); } + public static void retryWithDefaultOption(RetryableProc r) throws T { + retry(DEFAULT_RPC_RETRY_OPTIONS, r); + } + public static R retryWithResult( RetryOptions options, RetryableFunc r) throws T { int attempt = 0; @@ -266,5 +277,5 @@ private static void rethrow(Exception e) throws T { } /** Prohibits instantiation. */ - private Retryer() {} + private RpcRetryer() {} } diff --git a/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java b/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java index 02bf2cc24..5f37a16df 100644 --- a/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java +++ b/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java @@ -27,7 +27,6 @@ import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.uber.cadence.ActivityType; -import com.uber.cadence.BadRequestError; import com.uber.cadence.Decision; import com.uber.cadence.DecisionType; import com.uber.cadence.DescribeWorkflowExecutionRequest; @@ -91,15 +90,6 @@ public class WorkflowExecutionUtils { */ private static final String INDENTATION = " "; - private static RetryOptions retryParameters = - new RetryOptions.Builder() - .setBackoffCoefficient(2) - .setInitialInterval(Duration.ofMillis(500)) - .setMaximumInterval(Duration.ofSeconds(30)) - .setMaximumAttempts(Integer.MAX_VALUE) - .setDoNotRetry(BadRequestError.class, EntityNotExistsError.class) - .build(); - // Wait period for passive cluster to retry getting workflow result in case of replication delay. private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500; @@ -216,7 +206,7 @@ private static HistoryEvent getInstanceCloseEvent( RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit); try { response = - Retryer.retryWithResult( + RpcRetryer.retryWithResult( retryOptions, () -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout))); } catch (EntityNotExistsError e) { @@ -343,7 +333,7 @@ private static CompletableFuture getInstanceCloseEventAsync( } private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) { - return new RetryOptions.Builder(retryParameters) + return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS) .setExpiration(Duration.ofSeconds(unit.toSeconds(timeout))) .build(); } @@ -355,7 +345,7 @@ private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit uni long timeout, TimeUnit unit) { RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit); - return Retryer.retryWithResultAsync( + return RpcRetryer.retryWithResultAsync( retryOptions, () -> { CompletableFuture result = new CompletableFuture<>(); diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java index 6d86d186a..7180673d1 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java @@ -118,9 +118,8 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters StartWorkflowExecutionResponse result; try { result = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.StartWorkflowExecution(request)); + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.StartWorkflowExecution(request)); } catch (WorkflowExecutionAlreadyStartedError e) { throw e; } catch (TException e) { @@ -137,7 +136,7 @@ private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutI if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) { return o; } - return new RetryOptions.Builder(Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS) + return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS) .setExpiration(Duration.ofMillis((timeoutInMillis))) .build(); } @@ -146,9 +145,8 @@ private CompletableFuture startWorkflowAsyncInternal( StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) { StartWorkflowExecutionRequest request = getStartRequest(startParameters); - return Retryer.retryWithResultAsync( - getRetryOptionsWithExpiration( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis), + return RpcRetryer.retryWithResultAsync( + getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis), () -> { CompletableFuture result = new CompletableFuture<>(); try { @@ -272,9 +270,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.SignalWorkflowExecution(request)); + RpcRetryer.retryWithDefaultOption(() -> service.SignalWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -290,9 +286,8 @@ public CompletableFuture signalWorkflowExecutionAsync( public CompletableFuture signalWorkflowExecutionAsync( SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) { SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); - return Retryer.retryWithResultAsync( - getRetryOptionsWithExpiration( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis), + return RpcRetryer.retryWithResultAsync( + getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis), () -> { CompletableFuture result = new CompletableFuture<>(); try { @@ -387,8 +382,8 @@ private WorkflowExecution signalWithStartWorkflowInternal( StartWorkflowExecutionResponse result; try { result = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.SignalWithStartWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); @@ -405,9 +400,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) { request.setDomain(domain); request.setWorkflowExecution(execution); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RequestCancelWorkflowExecution(request)); + RpcRetryer.retryWithDefaultOption(() -> service.RequestCancelWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -427,9 +420,8 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete request.setQueryRejectCondition(queryParameters.getQueryRejectCondition()); try { QueryWorkflowResponse response = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.QueryWorkflow(request)); + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.QueryWorkflow(request)); return response; } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); @@ -451,9 +443,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term request.setReason(terminateParameters.getReason()); // request.setChildPolicy(terminateParameters.getChildPolicy()); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.TerminateWorkflowExecution(request)); + RpcRetryer.retryWithDefaultOption(() -> service.TerminateWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java index ab30159bc..51af90856 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java @@ -31,7 +31,7 @@ import com.uber.cadence.client.ActivityCompletionFailureException; import com.uber.cadence.client.ActivityNotExistsException; import com.uber.cadence.converter.DataConverter; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Scope; @@ -95,9 +95,7 @@ public void complete(Object result) { request.setResult(convertedResult); request.setTaskToken(taskToken); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskCompleted(request)); + RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskCompleted(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -139,9 +137,7 @@ public void fail(Throwable failure) { request.setDetails(dataConverter.toData(failure)); request.setTaskToken(taskToken); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskFailed(request)); + RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailed(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -156,9 +152,7 @@ public void fail(Throwable failure) { request.setWorkflowID(execution.getWorkflowId()); request.setRunID(execution.getRunId()); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskFailedByID(request)); + RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailedByID(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index d019005e0..b3ddcf760 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -32,7 +32,7 @@ import com.uber.cadence.WorkflowType; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.internal.common.OptionsUtils; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents; @@ -667,7 +667,7 @@ public HistoryEvent next() { try { GetWorkflowExecutionHistoryResponse r = - Retryer.retryWithResult( + RpcRetryer.retryWithResult( retryOptions, () -> service.GetWorkflowExecutionHistory(request)); current = r.getHistory().getEventsIterator(); nextPageToken = r.getNextPageToken(); diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index d06a0c01b..c1c6d9b05 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -113,7 +113,7 @@ public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse e.printStackTrace(pw); String stackTrace = sw.toString(); failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8)); - return new DecisionTaskHandler.Result(null, failedRequest, null, null); + return new DecisionTaskHandler.Result(null, failedRequest, null); } } @@ -236,7 +236,7 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) { cache.markProcessingDone(decisionTask); } } - return new Result(null, null, queryCompletedRequest, null); + return new Result(null, null, queryCompletedRequest); } private Result createCompletedRequest( @@ -254,7 +254,7 @@ private Result createCompletedRequest( (int) stickyTaskListScheduleToStartTimeout.getSeconds()); completedRequest.setStickyAttributes(attributes); } - return new Result(completedRequest, null, null, null); + return new Result(completedRequest, null, null); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java index ec49d1469..8f907875e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java @@ -147,8 +147,7 @@ private ActivityTaskHandler.Result mapToActivityFailure( failure = CheckedExceptionWrapper.unwrap(failure); result.setReason(failure.getClass().getName()); result.setDetails(dataConverter.toData(failure)); - return new ActivityTaskHandler.Result( - null, new Result.TaskFailedResult(result, failure), null, null); + return new ActivityTaskHandler.Result(null, new Result.TaskFailedResult(result, failure), null); } @Override @@ -214,12 +213,12 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc Object result = method.invoke(activity, args); RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest(); if (context.isDoNotCompleteOnReturn()) { - return new ActivityTaskHandler.Result(null, null, null, null); + return new ActivityTaskHandler.Result(null, null, null); } if (method.getReturnType() != Void.TYPE) { request.setResult(dataConverter.toData(result)); } - return new ActivityTaskHandler.Result(request, null, null, null); + return new ActivityTaskHandler.Result(request, null, null); } catch (RuntimeException | IllegalAccessException e) { return mapToActivityFailure(e, metricsScope, false); } catch (InvocationTargetException e) { @@ -252,7 +251,7 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc if (method.getReturnType() != Void.TYPE) { request.setResult(dataConverter.toData(result)); } - return new ActivityTaskHandler.Result(request, null, null, null); + return new ActivityTaskHandler.Result(request, null, null); } catch (RuntimeException | IllegalAccessException e) { return mapToActivityFailure(e, metricsScope, true); } catch (InvocationTargetException e) { diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java index e49e5ff56..9d18c1ad9 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java @@ -21,7 +21,6 @@ import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; -import com.uber.cadence.common.RetryOptions; import com.uber.m3.tally.Scope; import java.time.Duration; @@ -37,7 +36,6 @@ final class Result { private final RespondActivityTaskCompletedRequest taskCompleted; private final TaskFailedResult taskFailed; private final RespondActivityTaskCanceledRequest taskCancelled; - private final RetryOptions requestRetryOptions; private int attempt; private Duration backoff; @@ -68,12 +66,10 @@ public Throwable getFailure() { public Result( RespondActivityTaskCompletedRequest taskCompleted, TaskFailedResult taskFailed, - RespondActivityTaskCanceledRequest taskCancelled, - RetryOptions requestRetryOptions) { + RespondActivityTaskCanceledRequest taskCancelled) { this.taskCompleted = taskCompleted; this.taskFailed = taskFailed; this.taskCancelled = taskCancelled; - this.requestRetryOptions = requestRetryOptions; } public RespondActivityTaskCompletedRequest getTaskCompleted() { @@ -88,10 +84,6 @@ public RespondActivityTaskCanceledRequest getTaskCancelled() { return taskCancelled; } - public RetryOptions getRequestRetryOptions() { - return requestRetryOptions; - } - public void setAttempt(int attempt) { this.attempt = attempt; } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 01d65383b..0902000e2 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -17,18 +17,14 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.BadRequestError; -import com.uber.cadence.DomainNotActiveError; -import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.Header; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.WorkflowExecution; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -150,7 +146,7 @@ public void handle(PollForActivityTaskResponse task) throws Exception { cancelledRequest.setDetails( String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8)); Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); - sendReply(task, new Result(null, null, cancelledRequest, null), metricsScope); + sendReply(task, new Result(null, null, cancelledRequest), metricsScope); sw.stop(); } finally { MDC.remove(LoggerTag.ACTIVITY_ID); @@ -201,49 +197,28 @@ public Throwable wrapFailure(PollForActivityTaskResponse task, Throwable failure private void sendReply( PollForActivityTaskResponse task, ActivityTaskHandler.Result response, Scope metricsScope) throws TException { - RetryOptions ro = response.getRequestRetryOptions(); RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted(); if (taskCompleted != null) { - ro = - options - .getReportCompletionRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class); taskCompleted.setTaskToken(task.getTaskToken()); taskCompleted.setIdentity(options.getIdentity()); - Retryer.retry(ro, () -> service.RespondActivityTaskCompleted(taskCompleted)); + RpcRetryer.retryWithDefaultOption( + () -> service.RespondActivityTaskCompleted(taskCompleted)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } else { if (response.getTaskFailedResult() != null) { RespondActivityTaskFailedRequest taskFailed = response.getTaskFailedResult().getTaskFailedRequest(); - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); taskFailed.setTaskToken(task.getTaskToken()); taskFailed.setIdentity(options.getIdentity()); - Retryer.retry(ro, () -> service.RespondActivityTaskFailed(taskFailed)); + RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailed(taskFailed)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } else { RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled(); if (taskCancelled != null) { taskCancelled.setTaskToken(task.getTaskToken()); taskCancelled.setIdentity(options.getIdentity()); - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); - Retryer.retry(ro, () -> service.RespondActivityTaskCanceled(taskCancelled)); + RpcRetryer.retryWithDefaultOption( + () -> service.RespondActivityTaskCanceled(taskCancelled)); metricsScope.counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java index 2193e82d1..70c10f55f 100644 --- a/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java @@ -21,7 +21,6 @@ import com.uber.cadence.RespondDecisionTaskCompletedRequest; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; -import com.uber.cadence.common.RetryOptions; /** * Interface of workflow task handlers. @@ -34,17 +33,14 @@ final class Result { private final RespondDecisionTaskCompletedRequest taskCompleted; private final RespondDecisionTaskFailedRequest taskFailed; private final RespondQueryTaskCompletedRequest queryCompleted; - private final RetryOptions requestRetryOptions; public Result( RespondDecisionTaskCompletedRequest taskCompleted, RespondDecisionTaskFailedRequest taskFailed, - RespondQueryTaskCompletedRequest queryCompleted, - RetryOptions requestRetryOptions) { + RespondQueryTaskCompletedRequest queryCompleted) { this.taskCompleted = taskCompleted; this.taskFailed = taskFailed; this.queryCompleted = queryCompleted; - this.requestRetryOptions = requestRetryOptions; } public RespondDecisionTaskCompletedRequest getTaskCompleted() { @@ -59,10 +55,6 @@ public RespondQueryTaskCompletedRequest getQueryCompleted() { return queryCompleted; } - public RetryOptions getRequestRetryOptions() { - return requestRetryOptions; - } - @Override public String toString() { return "Result{" @@ -72,8 +64,6 @@ public String toString() { + taskFailed + ", queryCompleted=" + queryCompleted - + ", requestRetryOptions=" - + requestRetryOptions + '}'; } } diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index 3a6d66916..3216a66e0 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -17,11 +17,9 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.converter.JsonDataConverter; -import com.uber.cadence.internal.common.Retryer; import com.uber.cadence.internal.metrics.NoopScope; import com.uber.m3.tally.Scope; import java.time.Duration; @@ -43,10 +41,6 @@ public static final class Builder { private int taskExecutorThreadPoolSize = 100; private double taskListActivitiesPerSecond; private PollerOptions pollerOptions; - /** TODO: Dynamic expiration based on activity timeout */ - private RetryOptions reportCompletionRetryOptions; - - private RetryOptions reportFailureRetryOptions; private Scope metricsScope; private boolean enableLoggingInReplay; private List contextPropagators; @@ -59,8 +53,6 @@ public Builder(SingleWorkerOptions options) { this.pollerOptions = options.getPollerOptions(); this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond(); this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize(); - this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions(); - this.reportFailureRetryOptions = options.getReportFailureRetryOptions(); this.metricsScope = options.getMetricsScope(); this.enableLoggingInReplay = options.getEnableLoggingInReplay(); this.contextPropagators = options.getContextPropagators(); @@ -101,16 +93,6 @@ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond return this; } - public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - return this; - } - - public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { - this.reportFailureRetryOptions = reportFailureRetryOptions; - return this; - } - /** Specifies the list of context propagators to use during this workflow. */ public Builder setContextPropagators(List contextPropagators) { this.contextPropagators = contextPropagators; @@ -118,14 +100,6 @@ public Builder setContextPropagators(List contextPropagators) } public SingleWorkerOptions build() { - if (reportCompletionRetryOptions == null) { - reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; - } - - if (reportFailureRetryOptions == null) { - reportFailureRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; - } - if (pollerOptions == null) { pollerOptions = PollerOptions.newBuilder() @@ -149,8 +123,6 @@ public SingleWorkerOptions build() { taskExecutorThreadPoolSize, taskListActivitiesPerSecond, pollerOptions, - reportCompletionRetryOptions, - reportFailureRetryOptions, metricsScope, enableLoggingInReplay, contextPropagators); @@ -162,8 +134,6 @@ public SingleWorkerOptions build() { private final int taskExecutorThreadPoolSize; private final double taskListActivitiesPerSecond; private final PollerOptions pollerOptions; - private final RetryOptions reportCompletionRetryOptions; - private final RetryOptions reportFailureRetryOptions; private final Scope metricsScope; private final boolean enableLoggingInReplay; private List contextPropagators; @@ -174,8 +144,6 @@ private SingleWorkerOptions( int taskExecutorThreadPoolSize, double taskListActivitiesPerSecond, PollerOptions pollerOptions, - RetryOptions reportCompletionRetryOptions, - RetryOptions reportFailureRetryOptions, Scope metricsScope, boolean enableLoggingInReplay, List contextPropagators) { @@ -184,8 +152,6 @@ private SingleWorkerOptions( this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.pollerOptions = pollerOptions; - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - this.reportFailureRetryOptions = reportFailureRetryOptions; this.metricsScope = metricsScope; this.enableLoggingInReplay = enableLoggingInReplay; this.contextPropagators = contextPropagators; @@ -207,14 +173,6 @@ PollerOptions getPollerOptions() { return pollerOptions; } - RetryOptions getReportCompletionRetryOptions() { - return reportCompletionRetryOptions; - } - - RetryOptions getReportFailureRetryOptions() { - return reportFailureRetryOptions; - } - double getTaskListActivitiesPerSecond() { return taskListActivitiesPerSecond; } diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index 959024ea3..eb09bbf50 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -18,9 +18,6 @@ package com.uber.cadence.internal.worker; import com.google.common.base.Strings; -import com.uber.cadence.BadRequestError; -import com.uber.cadence.DomainNotActiveError; -import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; @@ -32,9 +29,8 @@ import com.uber.cadence.WorkflowExecutionStartedEventAttributes; import com.uber.cadence.WorkflowQuery; import com.uber.cadence.WorkflowType; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.common.WorkflowExecutionHistory; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; @@ -237,32 +233,18 @@ public Throwable wrapFailure(PollForDecisionTaskResponse task, Throwable failure private void sendReply( IWorkflowService service, byte[] taskToken, DecisionTaskHandler.Result response) throws TException { - RetryOptions ro = response.getRequestRetryOptions(); RespondDecisionTaskCompletedRequest taskCompleted = response.getTaskCompleted(); if (taskCompleted != null) { - ro = - options - .getReportCompletionRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class); taskCompleted.setIdentity(options.getIdentity()); taskCompleted.setTaskToken(taskToken); - Retryer.retry(ro, () -> service.RespondDecisionTaskCompleted(taskCompleted)); + RpcRetryer.retryWithDefaultOption( + () -> service.RespondDecisionTaskCompleted(taskCompleted)); } else { RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed(); if (taskFailed != null) { - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); taskFailed.setIdentity(options.getIdentity()); taskFailed.setTaskToken(taskToken); - Retryer.retry(ro, () -> service.RespondDecisionTaskFailed(taskFailed)); + RpcRetryer.retryWithDefaultOption(() -> service.RespondDecisionTaskFailed(taskFailed)); } else { RespondQueryTaskCompletedRequest queryCompleted = response.getQueryCompleted(); if (queryCompleted != null) { diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 6fd167988..d965a241a 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -87,8 +87,6 @@ public final class Worker implements Suspendable { .setDataConverter(options.getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getActivityPollerOptions()) - .setReportCompletionRetryOptions(options.getReportActivityCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportActivityFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) @@ -103,8 +101,6 @@ public final class Worker implements Suspendable { .setDataConverter(options.getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) - .setReportCompletionRetryOptions(options.getReportWorkflowCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportWorkflowFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) @@ -115,8 +111,6 @@ public final class Worker implements Suspendable { .setDataConverter(options.getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) - .setReportCompletionRetryOptions(options.getReportWorkflowCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportWorkflowFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentLocalActivityExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index 799b81e94..5fe92c3da 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -138,34 +138,6 @@ public Builder setWorkflowPollerOptions(PollerOptions workflowPollerOptions) { return this; } - public Builder setReportActivityCompletionRetryOptions( - RetryOptions reportActivityCompletionRetryOptions) { - this.reportActivityCompletionRetryOptions = - Objects.requireNonNull(reportActivityCompletionRetryOptions); - return this; - } - - public Builder setReportActivityFailureRetryOptions( - RetryOptions reportActivityFailureRetryOptions) { - this.reportActivityFailureRetryOptions = - Objects.requireNonNull(reportActivityFailureRetryOptions); - return this; - } - - public Builder setReportWorkflowCompletionRetryOptions( - RetryOptions reportWorkflowCompletionRetryOptions) { - this.reportWorkflowCompletionRetryOptions = - Objects.requireNonNull(reportWorkflowCompletionRetryOptions); - return this; - } - - public Builder setReportWorkflowFailureRetryOptions( - RetryOptions reportWorkflowFailureRetryOptions) { - this.reportWorkflowFailureRetryOptions = - Objects.requireNonNull(reportWorkflowFailureRetryOptions); - return this; - } - public Builder setInterceptorFactory( Function interceptorFactory) { this.interceptorFactory = Objects.requireNonNull(interceptorFactory); diff --git a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java index 1cbf543fe..612764e85 100644 --- a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java +++ b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java @@ -40,7 +40,7 @@ public void testExpiration() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { throw new IllegalArgumentException("simulated"); @@ -64,7 +64,7 @@ public void testExpirationFuture() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -91,7 +91,7 @@ public void testInterruptedException() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -117,7 +117,7 @@ public void testAddDoNotRetry() throws InterruptedException { options = options.addDoNotRetry(InterruptedException.class); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -144,7 +144,7 @@ public void testMaxAttempt() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { throw new IllegalArgumentException("simulated"); From 45ebe1354252a6027fc79e8f59935ff5af8d0c99 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 23 Oct 2020 22:26:37 -0700 Subject: [PATCH 02/10] Remove unused retry options from worker options --- .../sync/TestWorkflowEnvironmentInternal.java | 3 - .../java/com/uber/cadence/worker/Worker.java | 6 +- .../uber/cadence/worker/WorkerOptions.java | 72 ------------------- 3 files changed, 3 insertions(+), 78 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 4e61afc6f..236aa3bca 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -139,9 +139,6 @@ public Worker newWorker( WorkerOptions.Builder builder = WorkerOptions.newBuilder() .setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory()); - if (testEnvironmentOptions.getDataConverter() != null) { - builder.setDataConverter(testEnvironmentOptions.getDataConverter()); - } builder = overrideOptions.apply(builder); return workerFactory.newWorker(taskList, builder.build()); } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index d965a241a..d0bac31c2 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -84,7 +84,7 @@ public final class Worker implements Suspendable { SingleWorkerOptions activityOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getActivityPollerOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize()) @@ -98,7 +98,7 @@ public final class Worker implements Suspendable { SingleWorkerOptions workflowOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowExecutionSize()) @@ -108,7 +108,7 @@ public final class Worker implements Suspendable { .build(); SingleWorkerOptions localActivityOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentLocalActivityExecutionSize()) diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index 5fe92c3da..d0bdc11ce 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -17,9 +17,6 @@ package com.uber.cadence.worker; -import com.uber.cadence.common.RetryOptions; -import com.uber.cadence.converter.DataConverter; -import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.worker.PollerOptions; import com.uber.cadence.workflow.WorkflowInterceptor; import java.util.Objects; @@ -47,24 +44,18 @@ public static WorkerOptions defaultInstance() { public static final class Builder { private double workerActivitiesPerSecond; - private DataConverter dataConverter = JsonDataConverter.getInstance(); private int maxConcurrentActivityExecutionSize = 100; private int maxConcurrentWorkflowExecutionSize = 50; private int maxConcurrentLocalActivityExecutionSize = 100; private double taskListActivitiesPerSecond = 100000; private PollerOptions activityPollerOptions; private PollerOptions workflowPollerOptions; - private RetryOptions reportActivityCompletionRetryOptions; - private RetryOptions reportActivityFailureRetryOptions; - private RetryOptions reportWorkflowCompletionRetryOptions; - private RetryOptions reportWorkflowFailureRetryOptions; private Function interceptorFactory = (n) -> n; private Builder() {} private Builder(WorkerOptions options) { this.workerActivitiesPerSecond = options.workerActivitiesPerSecond; - this.dataConverter = options.dataConverter; this.maxConcurrentActivityExecutionSize = options.maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = options.maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = @@ -72,22 +63,9 @@ private Builder(WorkerOptions options) { this.taskListActivitiesPerSecond = options.taskListActivitiesPerSecond; this.activityPollerOptions = options.activityPollerOptions; this.workflowPollerOptions = options.workflowPollerOptions; - this.reportActivityCompletionRetryOptions = options.reportActivityCompletionRetryOptions; - this.reportActivityFailureRetryOptions = options.reportActivityFailureRetryOptions; - this.reportWorkflowCompletionRetryOptions = options.reportWorkflowCompletionRetryOptions; - this.reportWorkflowFailureRetryOptions = options.reportWorkflowFailureRetryOptions; this.interceptorFactory = options.interceptorFactory; } - /** - * Override a data converter implementation used by workflows and activities executed by this - * worker. Default is {@link com.uber.cadence.converter.JsonDataConverter} data converter. - */ - public Builder setDataConverter(DataConverter dataConverter) { - this.dataConverter = Objects.requireNonNull(dataConverter); - return this; - } - /** Maximum number of activities started per second. Default is 0 which means unlimited. */ public Builder setWorkerActivitiesPerSecond(double workerActivitiesPerSecond) { if (workerActivitiesPerSecond <= 0) { @@ -160,61 +138,41 @@ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond public WorkerOptions build() { return new WorkerOptions( workerActivitiesPerSecond, - dataConverter, maxConcurrentActivityExecutionSize, maxConcurrentWorkflowExecutionSize, maxConcurrentLocalActivityExecutionSize, taskListActivitiesPerSecond, activityPollerOptions, workflowPollerOptions, - reportActivityCompletionRetryOptions, - reportActivityFailureRetryOptions, - reportWorkflowCompletionRetryOptions, - reportWorkflowFailureRetryOptions, interceptorFactory); } } private final double workerActivitiesPerSecond; - private final DataConverter dataConverter; private final int maxConcurrentActivityExecutionSize; private final int maxConcurrentWorkflowExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; private final double taskListActivitiesPerSecond; private final PollerOptions activityPollerOptions; private final PollerOptions workflowPollerOptions; - private final RetryOptions reportActivityCompletionRetryOptions; - private final RetryOptions reportActivityFailureRetryOptions; - private final RetryOptions reportWorkflowCompletionRetryOptions; - private final RetryOptions reportWorkflowFailureRetryOptions; private final Function interceptorFactory; private WorkerOptions( double workerActivitiesPerSecond, - DataConverter dataConverter, int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowExecutionSize, int maxConcurrentLocalActivityExecutionSize, double taskListActivitiesPerSecond, PollerOptions activityPollerOptions, PollerOptions workflowPollerOptions, - RetryOptions reportActivityCompletionRetryOptions, - RetryOptions reportActivityFailureRetryOptions, - RetryOptions reportWorkflowCompletionRetryOptions, - RetryOptions reportWorkflowFailureRetryOptions, Function interceptorFactory) { this.workerActivitiesPerSecond = workerActivitiesPerSecond; - this.dataConverter = dataConverter; this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.activityPollerOptions = activityPollerOptions; this.workflowPollerOptions = workflowPollerOptions; - this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions; - this.reportActivityFailureRetryOptions = reportActivityFailureRetryOptions; - this.reportWorkflowCompletionRetryOptions = reportWorkflowCompletionRetryOptions; - this.reportWorkflowFailureRetryOptions = reportWorkflowFailureRetryOptions; this.interceptorFactory = interceptorFactory; } @@ -222,10 +180,6 @@ public double getWorkerActivitiesPerSecond() { return workerActivitiesPerSecond; } - public DataConverter getDataConverter() { - return dataConverter; - } - public int getMaxConcurrentActivityExecutionSize() { return maxConcurrentActivityExecutionSize; } @@ -246,22 +200,6 @@ public PollerOptions getWorkflowPollerOptions() { return workflowPollerOptions; } - public RetryOptions getReportActivityCompletionRetryOptions() { - return reportActivityCompletionRetryOptions; - } - - public RetryOptions getReportActivityFailureRetryOptions() { - return reportActivityFailureRetryOptions; - } - - public RetryOptions getReportWorkflowCompletionRetryOptions() { - return reportWorkflowCompletionRetryOptions; - } - - public RetryOptions getReportWorkflowFailureRetryOptions() { - return reportWorkflowFailureRetryOptions; - } - public Function getInterceptorFactory() { return interceptorFactory; } @@ -271,8 +209,6 @@ public String toString() { return "WorkerOptions{" + "workerActivitiesPerSecond=" + workerActivitiesPerSecond - + ", dataConverter=" - + dataConverter + ", maxConcurrentActivityExecutionSize=" + maxConcurrentActivityExecutionSize + ", maxConcurrentWorkflowExecutionSize=" @@ -285,14 +221,6 @@ public String toString() { + activityPollerOptions + ", workflowPollerOptions=" + workflowPollerOptions - + ", reportActivityCompletionRetryOptions=" - + reportActivityCompletionRetryOptions - + ", reportActivityFailureRetryOptions=" - + reportActivityFailureRetryOptions - + ", reportWorkflowCompletionRetryOptions=" - + reportWorkflowCompletionRetryOptions - + ", reportWorkflowFailureRetryOptions=" - + reportWorkflowFailureRetryOptions + '}'; } } From 75d121d8891b1ea3770acc46c551f577ee8c20ec Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Fri, 23 Oct 2020 22:35:24 -0700 Subject: [PATCH 03/10] Fix license --- .../java/com/uber/cadence/client/WorkflowClientOptions.java | 4 ++-- .../java/com/uber/cadence/internal/common/RpcRetryer.java | 4 ++-- .../java/com/uber/cadence/internal/worker/PollerOptions.java | 4 ++-- .../java/com/uber/cadence/serviceclient/ClientOptions.java | 4 ++-- src/main/java/com/uber/cadence/worker/Worker.java | 4 ++-- .../java/com/uber/cadence/worker/WorkerFactoryOptions.java | 4 ++-- src/main/java/com/uber/cadence/worker/WorkerOptions.java | 4 ++-- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java index d47ef8528..df59f25eb 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java index e75ce3b32..8949f24c2 100644 --- a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java +++ b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java @@ -1,8 +1,8 @@ /* - * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. - * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java b/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java index 9b78546b1..23d43feea 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java index b8781f861..3cb9b08ce 100644 --- a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java +++ b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017-2020 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index d0bac31c2..1dd9eb79e 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java index 5eb876d68..6d197a0c2 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index d0bdc11ce..cb66b3ade 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at From 95bbabad75a0bf99c60765648cd89931f55ad064 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Tue, 27 Oct 2020 19:52:06 -0700 Subject: [PATCH 04/10] Review comments --- .../java/com/uber/cadence/internal/common/RpcRetryer.java | 5 ++--- .../external/GenericWorkflowClientExternalImpl.java | 6 +++--- .../external/ManualActivityCompletionClientImpl.java | 6 +++--- .../com/uber/cadence/internal/worker/ActivityWorker.java | 8 +++----- .../com/uber/cadence/internal/worker/WorkflowWorker.java | 5 ++--- 5 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java index 8949f24c2..4f27f6f87 100644 --- a/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java +++ b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java @@ -63,8 +63,7 @@ public final class RpcRetryer { DomainAlreadyExistsError.class, QueryFailedError.class, DomainNotActiveError.class, - CancellationAlreadyRequestedError.class, - DomainNotActiveError.class); + CancellationAlreadyRequestedError.class); DEFAULT_RPC_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults(); } @@ -114,7 +113,7 @@ public static void retry(RetryOptions options, RetryablePr }); } - public static void retryWithDefaultOption(RetryableProc r) throws T { + public static void retry(RetryableProc r) throws T { retry(DEFAULT_RPC_RETRY_OPTIONS, r); } diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java index 7180673d1..ebcfc67da 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java @@ -270,7 +270,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); try { - RpcRetryer.retryWithDefaultOption(() -> service.SignalWorkflowExecution(request)); + RpcRetryer.retry(() -> service.SignalWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -400,7 +400,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) { request.setDomain(domain); request.setWorkflowExecution(execution); try { - RpcRetryer.retryWithDefaultOption(() -> service.RequestCancelWorkflowExecution(request)); + RpcRetryer.retry(() -> service.RequestCancelWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -443,7 +443,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term request.setReason(terminateParameters.getReason()); // request.setChildPolicy(terminateParameters.getChildPolicy()); try { - RpcRetryer.retryWithDefaultOption(() -> service.TerminateWorkflowExecution(request)); + RpcRetryer.retry(() -> service.TerminateWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java index 51af90856..3367e705a 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java @@ -95,7 +95,7 @@ public void complete(Object result) { request.setResult(convertedResult); request.setTaskToken(taskToken); try { - RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskCompleted(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskCompleted(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -137,7 +137,7 @@ public void fail(Throwable failure) { request.setDetails(dataConverter.toData(failure)); request.setTaskToken(taskToken); try { - RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailed(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailed(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -152,7 +152,7 @@ public void fail(Throwable failure) { request.setWorkflowID(execution.getWorkflowId()); request.setRunID(execution.getRunId()); try { - RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailedByID(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailedByID(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 0902000e2..a0a86014a 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -201,8 +201,7 @@ private void sendReply( if (taskCompleted != null) { taskCompleted.setTaskToken(task.getTaskToken()); taskCompleted.setIdentity(options.getIdentity()); - RpcRetryer.retryWithDefaultOption( - () -> service.RespondActivityTaskCompleted(taskCompleted)); + RpcRetryer.retry(() -> service.RespondActivityTaskCompleted(taskCompleted)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } else { if (response.getTaskFailedResult() != null) { @@ -210,15 +209,14 @@ private void sendReply( response.getTaskFailedResult().getTaskFailedRequest(); taskFailed.setTaskToken(task.getTaskToken()); taskFailed.setIdentity(options.getIdentity()); - RpcRetryer.retryWithDefaultOption(() -> service.RespondActivityTaskFailed(taskFailed)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailed(taskFailed)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } else { RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled(); if (taskCancelled != null) { taskCancelled.setTaskToken(task.getTaskToken()); taskCancelled.setIdentity(options.getIdentity()); - RpcRetryer.retryWithDefaultOption( - () -> service.RespondActivityTaskCanceled(taskCancelled)); + RpcRetryer.retry(() -> service.RespondActivityTaskCanceled(taskCancelled)); metricsScope.counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index eb09bbf50..d6a7bbeee 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -237,14 +237,13 @@ private void sendReply( if (taskCompleted != null) { taskCompleted.setIdentity(options.getIdentity()); taskCompleted.setTaskToken(taskToken); - RpcRetryer.retryWithDefaultOption( - () -> service.RespondDecisionTaskCompleted(taskCompleted)); + RpcRetryer.retry(() -> service.RespondDecisionTaskCompleted(taskCompleted)); } else { RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed(); if (taskFailed != null) { taskFailed.setIdentity(options.getIdentity()); taskFailed.setTaskToken(taskToken); - RpcRetryer.retryWithDefaultOption(() -> service.RespondDecisionTaskFailed(taskFailed)); + RpcRetryer.retry(() -> service.RespondDecisionTaskFailed(taskFailed)); } else { RespondQueryTaskCompletedRequest queryCompleted = response.getQueryCompleted(); if (queryCompleted != null) { From 7b4b3d21a881cf45541706fcce9949c832988126 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 1 Apr 2020 15:49:33 -0700 Subject: [PATCH 05/10] Added ActivityInterface. Removed timeout properties from ActivityMethod --- .../cadence/activity/ActivityInterface.java | 36 ++++++ .../uber/cadence/activity/ActivityMethod.java | 40 +------ .../cadence/activity/ActivityOptions.java | 69 +++++------ .../activity/LocalActivityOptions.java | 57 +++++----- .../sync/ActivityInvocationHandler.java | 10 +- .../sync/ActivityInvocationHandlerBase.java | 4 +- .../sync/LocalActivityInvocationHandler.java | 11 +- .../sync/TestActivityEnvironmentInternal.java | 81 ++++++++++++- .../cadence/activity/ActivityOptionsTest.java | 107 +----------------- .../internal/testing/ActivityTestingTest.java | 7 +- .../internal/testing/WorkflowTestingTest.java | 33 ++++-- .../worker/CleanWorkerShutdownTest.java | 15 ++- .../uber/cadence/workflow/WorkflowTest.java | 43 ++++--- 13 files changed, 249 insertions(+), 264 deletions(-) create mode 100644 src/main/java/com/uber/cadence/activity/ActivityInterface.java diff --git a/src/main/java/com/uber/cadence/activity/ActivityInterface.java b/src/main/java/com/uber/cadence/activity/ActivityInterface.java new file mode 100644 index 000000000..f29f28c93 --- /dev/null +++ b/src/main/java/com/uber/cadence/activity/ActivityInterface.java @@ -0,0 +1,36 @@ +/* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.activity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that the interface is an activity interface. Only interfaces annotated with this + * annotation can be used as parameters to {@link + * com.uber.cadence.workflow.Workflow#newActivityStub(Class)} methods. + * + *

Each method of the interface annotated with ActivityInterface including inherited + * from interfaces is a separate activity. By default the name of an activity type is "short + * interface name"_"method name". + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ActivityInterface {} diff --git a/src/main/java/com/uber/cadence/activity/ActivityMethod.java b/src/main/java/com/uber/cadence/activity/ActivityMethod.java index 239492a45..878821ebf 100644 --- a/src/main/java/com/uber/cadence/activity/ActivityMethod.java +++ b/src/main/java/com/uber/cadence/activity/ActivityMethod.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -24,9 +24,7 @@ /** * Indicates that the method is an activity method. This annotation applies only to activity - * interface methods. Not required. Use it to override default activity type name or other options. - * When both {@link ActivityOptions} and {@link ActivityMethod} have non default value for some - * parameter the {@link ActivityOptions} one takes precedence. + * interface methods. Not required. Use it to override default activity type name. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @@ -34,36 +32,4 @@ /** Name of the workflow type. Default is {short class name}::{method name} */ String name() default ""; - - /** - * Overall timeout workflow is willing to wait for activity to complete. It includes time in a - * task list (use {@link #scheduleToStartTimeoutSeconds()} to limit it) plus activity execution - * time (use {@link #startToCloseTimeoutSeconds()} to limit it). Either this option or both - * schedule to start and start to close are required. - */ - int scheduleToCloseTimeoutSeconds() default 0; - - /** - * Time activity can stay in task list before it is picked up by a worker. If schedule to close is - * not provided then both this and start to close are required. - */ - int scheduleToStartTimeoutSeconds() default 0; - - /** - * Maximum activity execution time after it was sent to a worker. If schedule to close is not - * provided then both this and schedule to start are required. - */ - int startToCloseTimeoutSeconds() default 0; - - /** - * Heartbeat interval. Activity must heartbeat before this interval passes after a last heartbeat - * or activity start. - */ - int heartbeatTimeoutSeconds() default 0; - - /** - * Task list to use when dispatching activity task to a worker. By default it is the same task - * list name the workflow was started with. - */ - String taskList() default ""; } diff --git a/src/main/java/com/uber/cadence/activity/ActivityOptions.java b/src/main/java/com/uber/cadence/activity/ActivityOptions.java index da3062c18..d0b6016a4 100644 --- a/src/main/java/com/uber/cadence/activity/ActivityOptions.java +++ b/src/main/java/com/uber/cadence/activity/ActivityOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -29,36 +29,22 @@ /** Options used to configure how an activity is invoked. */ public final class ActivityOptions { - /** - * Used to merge annotation and options. Options takes precedence. Returns options with all - * defaults filled in. - */ - public static ActivityOptions merge(ActivityMethod a, MethodRetry r, ActivityOptions o) { - if (a == null) { - if (r == null) { - return new ActivityOptions.Builder(o).validateAndBuildWithDefaults(); - } - RetryOptions mergedR = RetryOptions.merge(r, o.getRetryOptions()); - return new ActivityOptions.Builder().setRetryOptions(mergedR).validateAndBuildWithDefaults(); - } - if (o == null) { - o = new ActivityOptions.Builder().build(); - } - return new ActivityOptions.Builder() - .setScheduleToCloseTimeout( - mergeDuration(a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout())) - .setScheduleToStartTimeout( - mergeDuration(a.scheduleToStartTimeoutSeconds(), o.getScheduleToStartTimeout())) - .setStartToCloseTimeout( - mergeDuration(a.startToCloseTimeoutSeconds(), o.getStartToCloseTimeout())) - .setHeartbeatTimeout(mergeDuration(a.heartbeatTimeoutSeconds(), o.getHeartbeatTimeout())) - .setTaskList( - o.getTaskList() != null - ? o.getTaskList() - : (a.taskList().isEmpty() ? null : a.taskList())) - .setRetryOptions(RetryOptions.merge(r, o.getRetryOptions())) - .setContextPropagators(o.getContextPropagators()) - .validateAndBuildWithDefaults(); + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(ActivityOptions options) { + return new Builder(options); + } + + public static ActivityOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final ActivityOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = ActivityOptions.newBuilder().build(); } public static final class Builder { @@ -155,6 +141,14 @@ public Builder setContextPropagators(List contextPropagators) return this; } + /** + * Properties that are set on this builder take precedence over ones found in the annotation. + */ + public Builder setMethodRetry(MethodRetry r) { + retryOptions = RetryOptions.merge(r, retryOptions); + return this; + } + public ActivityOptions build() { return new ActivityOptions( heartbeatTimeout, @@ -324,15 +318,4 @@ public int hashCode() { retryOptions, contextPropagators); } - - static Duration mergeDuration(int annotationSeconds, Duration options) { - if (options == null) { - if (annotationSeconds == 0) { - return null; - } - return Duration.ofSeconds(annotationSeconds); - } else { - return options; - } - } } diff --git a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java index 9534af9a2..dde8334b8 100644 --- a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java +++ b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -29,31 +29,23 @@ /** Options used to configure how an local activity is invoked. */ public final class LocalActivityOptions { - /** - * Used to merge annotation and options. Options takes precedence. Returns options with all - * defaults filled in. - */ - public static LocalActivityOptions merge( - ActivityMethod a, MethodRetry r, LocalActivityOptions o) { - if (a == null) { - if (r == null) { - return new LocalActivityOptions.Builder(o).validateAndBuildWithDefaults(); - } - RetryOptions mergedR = RetryOptions.merge(r, o.getRetryOptions()); - return new LocalActivityOptions.Builder() - .setRetryOptions(mergedR) - .validateAndBuildWithDefaults(); - } - if (o == null) { - o = new LocalActivityOptions.Builder().build(); - } - return new LocalActivityOptions.Builder() - .setScheduleToCloseTimeout( - ActivityOptions.mergeDuration( - a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout())) - .setRetryOptions(RetryOptions.merge(r, o.getRetryOptions())) - .setContextPropagators(o.getContextPropagators()) - .validateAndBuildWithDefaults(); + public static Builder newBuilder() { + return new Builder(null); + } + + /** @param o null is allowed */ + public static Builder newBuilder(LocalActivityOptions o) { + return new Builder(o); + } + + public static LocalActivityOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final LocalActivityOptions DEFAULT_INSTANCE; + + static { + DEFAULT_INSTANCE = LocalActivityOptions.newBuilder().build(); } public static final class Builder { @@ -92,6 +84,17 @@ public Builder setContextPropagators(List contextPropagators) return this; } + /** + * Merges MethodRetry annotation. The values of this builder take precedence over annotation + * ones. + */ + public Builder setMethodRetry(MethodRetry r) { + if (r != null) { + this.retryOptions = RetryOptions.merge(r, retryOptions); + } + return this; + } + public LocalActivityOptions build() { return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions, contextPropagators); } diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java index 17836bb5f..3131787da 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -17,7 +17,6 @@ package com.uber.cadence.internal.sync; -import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.common.MethodRetry; import com.uber.cadence.workflow.ActivityStub; @@ -42,9 +41,10 @@ private ActivityInvocationHandler(ActivityOptions options, WorkflowInterceptor a @Override protected Function getActivityFunc( - Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName) { + Method method, MethodRetry methodRetry, String activityName) { Function function; - ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options); + ActivityOptions mergedOptions = + ActivityOptions.newBuilder(options).setMethodRetry(methodRetry).build(); ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java index 5b1e9ada8..36659fe5b 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java @@ -67,7 +67,7 @@ public Object invoke(Object proxy, Method method, Object[] args) { activityName = activityMethod.name(); } - function = getActivityFunc(method, methodRetry, activityMethod, activityName); + function = getActivityFunc(method, methodRetry, activityName); methodFunctions.put(method, function); } catch (NoSuchMethodException e) { throw Workflow.wrap(e); @@ -77,5 +77,5 @@ public Object invoke(Object proxy, Method method, Object[] args) { } protected abstract Function getActivityFunc( - Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName); + Method method, MethodRetry methodRetry, String activityName); } diff --git a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java index 53888b81d..5f0c74c30 100644 --- a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -17,7 +17,6 @@ package com.uber.cadence.internal.sync; -import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.LocalActivityOptions; import com.uber.cadence.common.MethodRetry; import com.uber.cadence.workflow.ActivityStub; @@ -43,10 +42,12 @@ private LocalActivityInvocationHandler( @Override protected Function getActivityFunc( - Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName) { + Method method, MethodRetry methodRetry, String activityName) { Function function; LocalActivityOptions mergedOptions = - LocalActivityOptions.merge(activityMethod, methodRetry, options); + LocalActivityOptions.newBuilder(options) + .setMethodRetry(methodRetry) + .validateAndBuildWithDefaults(); ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 30dcb206d..9bd1cbb83 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -18,7 +18,77 @@ package com.uber.cadence.internal.sync; import com.google.common.base.Defaults; -import com.uber.cadence.*; +import com.uber.cadence.ActivityType; +import com.uber.cadence.BadRequestError; +import com.uber.cadence.CancellationAlreadyRequestedError; +import com.uber.cadence.ClientVersionNotSupportedError; +import com.uber.cadence.ClusterInfo; +import com.uber.cadence.CountWorkflowExecutionsRequest; +import com.uber.cadence.CountWorkflowExecutionsResponse; +import com.uber.cadence.DeprecateDomainRequest; +import com.uber.cadence.DescribeDomainRequest; +import com.uber.cadence.DescribeDomainResponse; +import com.uber.cadence.DescribeTaskListRequest; +import com.uber.cadence.DescribeTaskListResponse; +import com.uber.cadence.DescribeWorkflowExecutionRequest; +import com.uber.cadence.DescribeWorkflowExecutionResponse; +import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; +import com.uber.cadence.EntityNotExistsError; +import com.uber.cadence.GetSearchAttributesResponse; +import com.uber.cadence.GetWorkflowExecutionHistoryRequest; +import com.uber.cadence.GetWorkflowExecutionHistoryResponse; +import com.uber.cadence.InternalServiceError; +import com.uber.cadence.LimitExceededError; +import com.uber.cadence.ListArchivedWorkflowExecutionsRequest; +import com.uber.cadence.ListArchivedWorkflowExecutionsResponse; +import com.uber.cadence.ListClosedWorkflowExecutionsRequest; +import com.uber.cadence.ListClosedWorkflowExecutionsResponse; +import com.uber.cadence.ListDomainsRequest; +import com.uber.cadence.ListDomainsResponse; +import com.uber.cadence.ListOpenWorkflowExecutionsRequest; +import com.uber.cadence.ListOpenWorkflowExecutionsResponse; +import com.uber.cadence.ListTaskListPartitionsRequest; +import com.uber.cadence.ListTaskListPartitionsResponse; +import com.uber.cadence.ListWorkflowExecutionsRequest; +import com.uber.cadence.ListWorkflowExecutionsResponse; +import com.uber.cadence.PollForActivityTaskRequest; +import com.uber.cadence.PollForActivityTaskResponse; +import com.uber.cadence.PollForDecisionTaskRequest; +import com.uber.cadence.PollForDecisionTaskResponse; +import com.uber.cadence.QueryFailedError; +import com.uber.cadence.QueryWorkflowRequest; +import com.uber.cadence.QueryWorkflowResponse; +import com.uber.cadence.RecordActivityTaskHeartbeatByIDRequest; +import com.uber.cadence.RecordActivityTaskHeartbeatRequest; +import com.uber.cadence.RecordActivityTaskHeartbeatResponse; +import com.uber.cadence.RegisterDomainRequest; +import com.uber.cadence.RequestCancelWorkflowExecutionRequest; +import com.uber.cadence.ResetStickyTaskListRequest; +import com.uber.cadence.ResetStickyTaskListResponse; +import com.uber.cadence.ResetWorkflowExecutionRequest; +import com.uber.cadence.ResetWorkflowExecutionResponse; +import com.uber.cadence.RespondActivityTaskCanceledByIDRequest; +import com.uber.cadence.RespondActivityTaskCanceledRequest; +import com.uber.cadence.RespondActivityTaskCompletedByIDRequest; +import com.uber.cadence.RespondActivityTaskCompletedRequest; +import com.uber.cadence.RespondActivityTaskFailedByIDRequest; +import com.uber.cadence.RespondActivityTaskFailedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedRequest; +import com.uber.cadence.RespondDecisionTaskCompletedResponse; +import com.uber.cadence.RespondDecisionTaskFailedRequest; +import com.uber.cadence.RespondQueryTaskCompletedRequest; +import com.uber.cadence.ServiceBusyError; +import com.uber.cadence.SignalWithStartWorkflowExecutionRequest; +import com.uber.cadence.SignalWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionRequest; +import com.uber.cadence.StartWorkflowExecutionResponse; +import com.uber.cadence.TerminateWorkflowExecutionRequest; +import com.uber.cadence.UpdateDomainRequest; +import com.uber.cadence.UpdateDomainResponse; +import com.uber.cadence.WorkflowExecution; +import com.uber.cadence.WorkflowExecutionAlreadyStartedError; +import com.uber.cadence.WorkflowType; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.activity.LocalActivityOptions; import com.uber.cadence.internal.metrics.NoopScope; @@ -102,7 +172,10 @@ public void registerActivitiesImplementations(Object... activityImplementations) @Override public T newActivityStub(Class activityInterface) { ActivityOptions options = - new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofDays(1)).build(); + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofDays(1)) + .setHeartbeatTimeout(Duration.ofSeconds(1)) + .build(); InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance(options, new TestActivityExecutor(workflowService)); invocationHandler = new DeterministicRunnerWrapper(invocationHandler); diff --git a/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java b/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java index bed1e5fc7..2e2a54ecd 100644 --- a/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java +++ b/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -27,65 +27,6 @@ public class ActivityOptionsTest { - @ActivityMethod - public void defaultActivityOptions() {} - - @Test - public void testOnlyOptionsPresent() throws NoSuchMethodException { - ActivityOptions o = - new ActivityOptions.Builder() - .setTaskList("foo") - .setHeartbeatTimeout(Duration.ofSeconds(123)) - .setScheduleToCloseTimeout(Duration.ofSeconds(321)) - .setScheduleToStartTimeout(Duration.ofSeconds(333)) - .setStartToCloseTimeout(Duration.ofSeconds(345)) - .setRetryOptions( - new RetryOptions.Builder() - .setDoNotRetry(IllegalArgumentException.class) - .setMaximumAttempts(11111) - .setBackoffCoefficient(1.55) - .setMaximumInterval(Duration.ofDays(3)) - .setExpiration(Duration.ofDays(365)) - .setInitialInterval(Duration.ofMinutes(12)) - .build()) - .build(); - ActivityMethod a = - ActivityOptionsTest.class - .getMethod("defaultActivityOptions") - .getAnnotation(ActivityMethod.class); - Assert.assertEquals(o, ActivityOptions.merge(a, null, o)); - } - - @MethodRetry(initialIntervalSeconds = 3) - @ActivityMethod - public void defaultActivityAndRetryOptions() {} - - @Test - public void testOnlyOptionsAndEmptyAnnotationsPresent() throws NoSuchMethodException { - ActivityOptions o = - new ActivityOptions.Builder() - .setTaskList("foo") - .setHeartbeatTimeout(Duration.ofSeconds(123)) - .setScheduleToCloseTimeout(Duration.ofSeconds(321)) - .setScheduleToStartTimeout(Duration.ofSeconds(333)) - .setStartToCloseTimeout(Duration.ofSeconds(345)) - .setRetryOptions( - new RetryOptions.Builder() - .setDoNotRetry(IllegalArgumentException.class) - .setMaximumAttempts(11111) - .setBackoffCoefficient(1.55) - .setMaximumInterval(Duration.ofDays(3)) - .setExpiration(Duration.ofDays(365)) - .setInitialInterval(Duration.ofMinutes(12)) - .build()) - .build(); - ActivityMethod a = - ActivityOptionsTest.class - .getMethod("defaultActivityAndRetryOptions") - .getAnnotation(ActivityMethod.class); - Assert.assertEquals(o, ActivityOptions.merge(a, null, o)); - } - @MethodRetry( initialIntervalSeconds = 12, backoffCoefficient = 1.97, @@ -94,13 +35,6 @@ public void testOnlyOptionsAndEmptyAnnotationsPresent() throws NoSuchMethodExcep maximumIntervalSeconds = 22, doNotRetry = {NullPointerException.class, UnsupportedOperationException.class} ) - @ActivityMethod( - startToCloseTimeoutSeconds = 1135, - taskList = "bar", - heartbeatTimeoutSeconds = 4567, - scheduleToCloseTimeoutSeconds = 2342, - scheduleToStartTimeoutSeconds = 9879 - ) public void activityAndRetryOptions() {} @Test @@ -108,16 +42,8 @@ public void testOnlyAnnotationsPresent() throws NoSuchMethodException { Method method = ActivityOptionsTest.class.getMethod("activityAndRetryOptions"); ActivityMethod a = method.getAnnotation(ActivityMethod.class); MethodRetry r = method.getAnnotation(MethodRetry.class); - ActivityOptions o = new ActivityOptions.Builder().build(); - ActivityOptions merged = ActivityOptions.merge(a, r, o); - Assert.assertEquals(a.taskList(), merged.getTaskList()); - Assert.assertEquals(a.heartbeatTimeoutSeconds(), merged.getHeartbeatTimeout().getSeconds()); - Assert.assertEquals( - a.scheduleToCloseTimeoutSeconds(), merged.getScheduleToCloseTimeout().getSeconds()); - Assert.assertEquals( - a.scheduleToStartTimeoutSeconds(), merged.getScheduleToStartTimeout().getSeconds()); - Assert.assertEquals( - a.startToCloseTimeoutSeconds(), merged.getStartToCloseTimeout().getSeconds()); + ActivityOptions o = ActivityOptions.newBuilder().build(); + ActivityOptions merged = ActivityOptions.newBuilder(o).setMethodRetry(r).build(); RetryOptions rMerged = merged.getRetryOptions(); Assert.assertEquals(r.maximumAttempts(), rMerged.getMaximumAttempts()); @@ -129,29 +55,4 @@ public void testOnlyAnnotationsPresent() throws NoSuchMethodException { Duration.ofSeconds(r.maximumIntervalSeconds()), rMerged.getMaximumInterval()); Assert.assertEquals(Arrays.asList(r.doNotRetry()), rMerged.getDoNotRetry()); } - - @Test - public void testBothPresent() throws NoSuchMethodException { - ActivityOptions o = - new ActivityOptions.Builder() - .setTaskList("foo") - .setHeartbeatTimeout(Duration.ofSeconds(123)) - .setScheduleToCloseTimeout(Duration.ofSeconds(321)) - .setScheduleToStartTimeout(Duration.ofSeconds(333)) - .setStartToCloseTimeout(Duration.ofSeconds(345)) - .setRetryOptions( - new RetryOptions.Builder() - .setDoNotRetry(IllegalArgumentException.class) - .setMaximumAttempts(11111) - .setBackoffCoefficient(1.55) - .setMaximumInterval(Duration.ofDays(3)) - .setExpiration(Duration.ofDays(365)) - .setInitialInterval(Duration.ofMinutes(12)) - .build()) - .build(); - Method method = ActivityOptionsTest.class.getMethod("activityAndRetryOptions"); - ActivityMethod a = method.getAnnotation(ActivityMethod.class); - MethodRetry r = method.getAnnotation(MethodRetry.class); - Assert.assertEquals(o, ActivityOptions.merge(a, r, o)); - } } diff --git a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java index 9603f9e4c..7575ca10b 100644 --- a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -26,7 +26,6 @@ import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.activity.Activity; -import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.testing.TestActivityEnvironment; @@ -112,8 +111,6 @@ public void testHeartbeat() { } public interface InterruptibleTestActivity { - - @ActivityMethod(scheduleToCloseTimeoutSeconds = 1000, heartbeatTimeoutSeconds = 1) void activity1() throws InterruptedException; } diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index e22de1bdd..3566541c5 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -36,7 +36,6 @@ import com.uber.cadence.WorkflowExecution; import com.uber.cadence.WorkflowExecutionInfo; import com.uber.cadence.activity.Activity; -import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.*; import com.uber.cadence.context.ContextPropagator; @@ -163,8 +162,6 @@ public void testFailure() { } public interface TestActivity { - - @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600) String activity1(String input); } @@ -178,7 +175,10 @@ public String activity1(String input) { public static class ActivityWorkflow implements TestWorkflow { - private final TestActivity activity = Workflow.newActivityStub(TestActivity.class); + private final TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build()); @Override public String workflow1(String input) { @@ -487,8 +487,6 @@ public void testConcurrentDecision() throws ExecutionException, InterruptedExcep } public interface TestCancellationActivity { - - @ActivityMethod(scheduleToCloseTimeoutSeconds = 1000, heartbeatTimeoutSeconds = 2) String activity1(String input); } @@ -506,7 +504,12 @@ public String activity1(String input) { public static class TestCancellationWorkflow implements TestWorkflow { private final TestCancellationActivity activity = - Workflow.newActivityStub(TestCancellationActivity.class); + Workflow.newActivityStub( + TestCancellationActivity.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .build()); @Override public String workflow1(String input) { @@ -542,7 +545,11 @@ public static class TestTimerCancellationWorkflow implements TestWorkflow { public String workflow1(String input) { long startTime = Workflow.currentTimeMillis(); Promise s = Async.procedure(() -> Workflow.sleep(Duration.ofHours(3))); - TestActivity activity = Workflow.newActivityStub(TestActivity.class); + TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build()); + activity.activity1("input"); Workflow.sleep(Duration.ofHours(1)); s.get(); @@ -894,7 +901,11 @@ public String workflow1(String input) { .setScheduleToCloseTimeout(Duration.ofSeconds(5)) .setContextPropagators(Collections.singletonList(new TestContextPropagator())) .build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofHours(1)).build()); + return activity.activity1("foo"); } } diff --git a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java index 936370cd1..5ba1d011b 100644 --- a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java +++ b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -27,7 +27,7 @@ import com.uber.cadence.HistoryEvent; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.Activity; -import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.ActivityWorkerShutdownException; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientOptions; @@ -40,6 +40,7 @@ import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -97,7 +98,12 @@ public interface TestWorkflow { public static class TestWorkflowImpl implements TestWorkflow { - private final Activities activities = Workflow.newActivityStub(Activities.class); + private final Activities activities = + Workflow.newActivityStub( + Activities.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(100)) + .build()); @Override public String execute() { @@ -106,7 +112,6 @@ public String execute() { } public interface Activities { - @ActivityMethod(scheduleToCloseTimeoutSeconds = 100) String execute(); } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index aa24f0cd7..13f16d5d4 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -771,7 +771,15 @@ public static class TestActivityRetryAnnotated implements TestWorkflow1 { private final TestActivities activities; public TestActivityRetryAnnotated() { - this.activities = Workflow.newActivityStub(TestActivities.class); + this.activities = + Workflow.newActivityStub( + TestActivities.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build()); } @Override @@ -2935,7 +2943,7 @@ public String execute(String taskList) { public interface AngryChildActivity { - @ActivityMethod(scheduleToCloseTimeoutSeconds = 5) + @ActivityMethod void execute(); } @@ -2960,7 +2968,10 @@ public String execute(String taskList, int delay) { AngryChildActivity activity = Workflow.newActivityStub( AngryChildActivity.class, - new ActivityOptions.Builder().setTaskList(taskList).build()); + ActivityOptions.newBuilder() + .setTaskList(taskList) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build()); activity.execute(); throw new UnsupportedOperationException("simulated failure"); } @@ -3704,12 +3715,6 @@ public interface TestActivities { void neverComplete(); - @ActivityMethod( - scheduleToStartTimeoutSeconds = 5, - scheduleToCloseTimeoutSeconds = 5, - heartbeatTimeoutSeconds = 5, - startToCloseTimeoutSeconds = 10 - ) @MethodRetry( initialIntervalSeconds = 1, maximumIntervalSeconds = 1, @@ -4859,8 +4864,6 @@ public NonSerializableException() { } public interface NonSerializableExceptionActivity { - - @ActivityMethod(scheduleToCloseTimeoutSeconds = 5) void execute(); } @@ -4878,7 +4881,11 @@ public static class TestNonSerializableExceptionInActivityWorkflow implements Te @Override public String execute(String taskList) { NonSerializableExceptionActivity activity = - Workflow.newActivityStub(NonSerializableExceptionActivity.class); + Workflow.newActivityStub( + NonSerializableExceptionActivity.class, + ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build()); try { activity.execute(); } catch (ActivityFailureException e) { @@ -4902,7 +4909,7 @@ public void testNonSerializableExceptionInActivity() { public interface NonDeserializableArgumentsActivity { - @ActivityMethod(scheduleToCloseTimeoutSeconds = 5) + @ActivityMethod void execute(int arg); } @@ -5432,7 +5439,7 @@ public interface GreetingWorkflow { } public interface GreetingActivities { - @ActivityMethod(scheduleToCloseTimeoutSeconds = 60) + @ActivityMethod String composeGreeting(String string); } @@ -5452,7 +5459,9 @@ public String composeGreeting(String string) { public static class TimerFiringWorkflowImpl implements GreetingWorkflow { private final GreetingActivities activities = - Workflow.newActivityStub(GreetingActivities.class); + Workflow.newActivityStub( + GreetingActivities.class, + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build()); @Override public void createGreeting(String name) { From 606086681f12d1ec8e4d5fa1d17961d52add1ff8 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 2 Apr 2020 11:51:14 -0700 Subject: [PATCH 06/10] Added @ActivityInterface implementation to POJOActivityTaskHandler --- .../internal/common/InternalUtils.java | 10 +- .../sync/POJOActivityTaskHandler.java | 155 +++++++++++--- .../LocalActivityContextPropagationTest.java | 1 + .../internal/testing/ActivityTestingTest.java | 3 + .../internal/testing/WorkflowTestingTest.java | 3 + .../cadence/worker/ActivityInterfaceTest.java | 192 ++++++++++++++++++ .../worker/CleanWorkerShutdownTest.java | 2 + .../uber/cadence/worker/StickyWorkerTest.java | 9 +- .../cadence/worker/WorkerStressTests.java | 9 +- .../uber/cadence/workflow/MetricsTest.java | 6 +- .../workflow/TestEnvironmentCloseTest.java | 8 +- .../uber/cadence/workflow/WorkflowTest.java | 19 +- 12 files changed, 366 insertions(+), 51 deletions(-) create mode 100644 src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java diff --git a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java index 73498885a..5b1d0a1c5 100644 --- a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java +++ b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -51,7 +51,11 @@ public final class InternalUtils { * @return "Simple class name"::"methodName" */ public static String getSimpleName(Method method) { - return method.getDeclaringClass().getSimpleName() + "::" + method.getName(); + return getSimpleName(method.getDeclaringClass(), method); + } + + public static String getSimpleName(Class type, Method method) { + return type.getSimpleName() + "::" + method.getName(); } public static String getWorkflowType(Method method, WorkflowMethod workflowMethod) { diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java index 8f907875e..2cbadf144 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -18,10 +18,11 @@ package com.uber.cadence.internal.sync; import com.google.common.base.Joiner; -import com.google.common.reflect.TypeToken; +import com.google.common.base.Objects; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.RespondActivityTaskCompletedRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.common.MethodRetry; @@ -33,11 +34,14 @@ import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.m3.tally.Scope; +import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; @@ -82,30 +86,29 @@ private void addActivityImplementation( + "\" This annotation can be used only on the interface method it implements."); } } - TypeToken.TypeSet interfaces = TypeToken.of(cls).getTypes().interfaces(); - if (interfaces.isEmpty()) { - throw new IllegalArgumentException("Activity must implement at least one interface"); + Set activityMethods = + getAnnotatedInterfaceMethods(cls, ActivityInterface.class); + if (activityMethods.isEmpty()) { + throw new IllegalArgumentException( + "Class doesn't implement any non empty interface annotated with @ActivityInterface: " + + cls.getName()); } - for (TypeToken i : interfaces) { - if (i.getType().getTypeName().startsWith("org.mockito")) { - continue; + for (MethodInterfacePair pair : activityMethods) { + Method method = pair.getMethod(); + ActivityMethod annotation = method.getAnnotation(ActivityMethod.class); + String activityType; + if (annotation != null && !annotation.name().isEmpty()) { + activityType = annotation.name(); + } else { + activityType = InternalUtils.getSimpleName(pair.getType(), method); } - for (Method method : i.getRawType().getMethods()) { - ActivityMethod annotation = method.getAnnotation(ActivityMethod.class); - String activityType; - if (annotation != null && !annotation.name().isEmpty()) { - activityType = annotation.name(); - } else { - activityType = InternalUtils.getSimpleName(method); - } - if (activities.containsKey(activityType)) { - throw new IllegalStateException( - activityType + " activity type is already registered with the worker"); - } - - ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity); - activities.put(activityType, implementation); + if (activities.containsKey(activityType)) { + throw new IllegalStateException( + activityType + " activity type is already registered with the worker"); } + + ActivityTaskExecutor implementation = newTaskExecutor.apply(method, activity); + activities.put(activityType, implementation); } } @@ -266,4 +269,108 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc void setWorkflowService(IWorkflowService service) { this.service = service; } + + static class MethodInterfacePair { + private final Method method; + private final Class type; + + MethodInterfacePair(Method method, Class type) { + this.method = method; + this.type = type; + } + + public Method getMethod() { + return method; + } + + public Class getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MethodInterfacePair that = (MethodInterfacePair) o; + return Objects.equal(method, that.method) && Objects.equal(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hashCode(method, type); + } + + @Override + public String toString() { + return "MethodInterfacePair{" + "method=" + method + ", type=" + type + '}'; + } + } + + /** Used to override equals and hashCode of Method to ensure deduping by method name in a set. */ + static class MethodWrapper { + private final Method method; + + MethodWrapper(Method method) { + this.method = method; + } + + public Method getMethod() { + return method; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MethodWrapper that = (MethodWrapper) o; + return Objects.equal(method.getName(), that.method.getName()); + } + + @Override + public int hashCode() { + return Objects.hashCode(method.getName()); + } + } + + Set getAnnotatedInterfaceMethods( + Class implementationClass, Class annotationClass) { + if (implementationClass.isInterface()) { + throw new IllegalArgumentException( + "Concrete class expected. Found interface: " + implementationClass.getSimpleName()); + } + Set pairs = new HashSet<>(); + // Methods inherited from interfaces that are not annotated with @ActivityInterface + Set ignored = new HashSet<>(); + getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs); + return pairs; + } + + private void getAnnotatedInterfaceMethods( + Class current, + Class annotationClass, + Set methods, + Set result) { + // Using set to dedupe methods which are defined in both non activity parent and current + Set ourMethods = new HashSet<>(); + if (current.isInterface()) { + Method[] declaredMethods = current.getDeclaredMethods(); + for (int i = 0; i < declaredMethods.length; i++) { + Method declaredMethod = declaredMethods[i]; + ourMethods.add(new MethodWrapper(declaredMethod)); + } + } + Class[] interfaces = current.getInterfaces(); + for (int i = 0; i < interfaces.length; i++) { + Class anInterface = interfaces[i]; + getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result); + } + Annotation annotation = current.getAnnotation(annotationClass); + if (annotation == null) { + methods.addAll(ourMethods); + return; + } + for (MethodWrapper method : ourMethods) { + result.add(new MethodInterfacePair(method.getMethod(), current)); + } + } } diff --git a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java index 6b8817b9e..3eced394c 100644 --- a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java +++ b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java @@ -155,6 +155,7 @@ public String foo() { } } + @ActivityInterface public interface LocalActivityContextPropagation { @ActivityMethod diff --git a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java index 7575ca10b..ba4199a82 100644 --- a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java @@ -26,6 +26,7 @@ import com.uber.cadence.RecordActivityTaskHeartbeatResponse; import com.uber.cadence.activity.Activity; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.cadence.testing.TestActivityEnvironment; @@ -47,6 +48,7 @@ public void setUp() { testEnvironment = TestActivityEnvironment.newInstance(); } + @ActivityInterface public interface TestActivity { String activity1(String input); @@ -110,6 +112,7 @@ public void testHeartbeat() { assertEquals("details1", details.get()); } + @ActivityInterface public interface InterruptibleTestActivity { void activity1() throws InterruptedException; } diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index 3566541c5..27d8ac4eb 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -36,6 +36,7 @@ import com.uber.cadence.WorkflowExecution; import com.uber.cadence.WorkflowExecutionInfo; import com.uber.cadence.activity.Activity; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.*; import com.uber.cadence.context.ContextPropagator; @@ -161,6 +162,7 @@ public void testFailure() { } } + @ActivityInterface public interface TestActivity { String activity1(String input); } @@ -486,6 +488,7 @@ public void testConcurrentDecision() throws ExecutionException, InterruptedExcep log.info(testEnvironment.getDiagnostics()); } + @ActivityInterface public interface TestCancellationActivity { String activity1(String input); } diff --git a/src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java b/src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java new file mode 100644 index 000000000..de2d1e626 --- /dev/null +++ b/src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java @@ -0,0 +1,192 @@ +/* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.worker; + +import com.google.common.base.Objects; +import com.uber.cadence.activity.ActivityInterface; +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import org.junit.Test; + +public class ActivityInterfaceTest { + + public interface I0 { + void baz(); + } + + public interface I1 extends I0 { + void foo(); + } + + @ActivityInterface + public interface I2 extends I1, I0 { + void bar(); + } + + @ActivityInterface + public interface I3 extends I1 { + void foo(); + } + + @ActivityInterface + public interface I4 extends I3 { + void foo(); + + void bar(); + } + + @ActivityInterface + public interface I5 extends I4, I3 {} + + public interface NonActivity { + void foobar(); + } + + public static class Impl implements I2, I4, NonActivity, I5 { + + @Override + public void foo() {} + + @Override + public void bar() {} + + @Override + public void foobar() {} + + @Override + public void baz() {} + } + + @Test + public void test() { + Set activityMethods = + getAnnotatedInterfaceMethods(Impl.class, ActivityInterface.class); + for (Iterator iterator = activityMethods.iterator(); + iterator.hasNext(); ) { + MethodInterfacePair next = iterator.next(); + System.out.println(next); + } + } + + static class MethodInterfacePair { + private final Method method; + private final Class type; + + MethodInterfacePair(Method method, Class type) { + this.method = method; + this.type = type; + } + + public Method getMethod() { + return method; + } + + public Class getType() { + return type; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MethodInterfacePair that = (MethodInterfacePair) o; + return Objects.equal(method, that.method) && Objects.equal(type, that.type); + } + + @Override + public int hashCode() { + return Objects.hashCode(method, type); + } + + @Override + public String toString() { + return "MethodInterfacePair{" + "method=" + method + ", type=" + type + '}'; + } + } + + /** Used to override equals and hashCode of Method to ensure deduping by method name in a set. */ + static class MethodWrapper { + private final Method method; + + MethodWrapper(Method method) { + this.method = method; + } + + public Method getMethod() { + return method; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MethodWrapper that = (MethodWrapper) o; + return Objects.equal(method.getName(), that.method.getName()); + } + + @Override + public int hashCode() { + return Objects.hashCode(method.getName()); + } + } + + Set getAnnotatedInterfaceMethods( + Class implementationClass, Class annotationClass) { + if (implementationClass.isInterface()) { + throw new IllegalArgumentException( + "Concrete class expected. Found interface: " + implementationClass.getSimpleName()); + } + Set pairs = new HashSet<>(); + // Methods inherited from interfaces that are not annotated with @ActivityInterface + Set ignored = new HashSet<>(); + getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs); + return pairs; + } + + private void getAnnotatedInterfaceMethods( + Class current, + Class annotationClass, + Set methods, + Set result) { + // Using set to dedupe methods which are defined in both non activity parent and current + Set ourMethods = new HashSet<>(); + if (current.isInterface()) { + Method[] declaredMethods = current.getDeclaredMethods(); + for (int i = 0; i < declaredMethods.length; i++) { + Method declaredMethod = declaredMethods[i]; + ourMethods.add(new MethodWrapper(declaredMethod)); + } + } + Class[] interfaces = current.getInterfaces(); + for (int i = 0; i < interfaces.length; i++) { + Class anInterface = interfaces[i]; + getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result); + } + Annotation annotation = current.getAnnotation(annotationClass); + if (annotation == null) { + methods.addAll(ourMethods); + return; + } + for (MethodWrapper method : ourMethods) { + result.add(new MethodInterfacePair(method.getMethod(), current)); + } + } +} diff --git a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java index 5ba1d011b..6235b53e9 100644 --- a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java +++ b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java @@ -27,6 +27,7 @@ import com.uber.cadence.HistoryEvent; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.Activity; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.ActivityWorkerShutdownException; import com.uber.cadence.client.WorkflowClient; @@ -111,6 +112,7 @@ public String execute() { } } + @ActivityInterface public interface Activities { String execute(); } diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index 8a2ecbf5e..b07b7ef1a 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -28,7 +28,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientOptions; @@ -709,9 +709,8 @@ public void execute(WorkflowParams params) { } } + @ActivityInterface public interface SleepActivity { - - @ActivityMethod() void sleep(int chain, int concurrency, byte[] bytes); } diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 30508fbda..7ecdf5fad 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -20,7 +20,7 @@ import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; import static org.junit.Assert.assertNotNull; -import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientOptions; @@ -260,9 +260,8 @@ public String execute(WorkflowParams params) { } } + @ActivityInterface public interface SleepActivity { - - @ActivityMethod() void sleep(int chain, int concurrency, byte[] bytes); } diff --git a/src/test/java/com/uber/cadence/workflow/MetricsTest.java b/src/test/java/com/uber/cadence/workflow/MetricsTest.java index 21417847a..9010d2dfa 100644 --- a/src/test/java/com/uber/cadence/workflow/MetricsTest.java +++ b/src/test/java/com/uber/cadence/workflow/MetricsTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -22,6 +22,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.client.WorkflowClientOptions; @@ -101,6 +102,7 @@ public void execute() { } } + @ActivityInterface public interface TestActivity { int runActivity(int input); } diff --git a/src/test/java/com/uber/cadence/workflow/TestEnvironmentCloseTest.java b/src/test/java/com/uber/cadence/workflow/TestEnvironmentCloseTest.java index 7c5692554..54d5fba21 100644 --- a/src/test/java/com/uber/cadence/workflow/TestEnvironmentCloseTest.java +++ b/src/test/java/com/uber/cadence/workflow/TestEnvironmentCloseTest.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -19,7 +19,7 @@ import static junit.framework.TestCase.assertTrue; -import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.worker.Worker; import org.junit.Test; @@ -43,8 +43,8 @@ public void foo() {} public void signal() {} } + @ActivityInterface public interface A { - @ActivityMethod void bar(); } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 13f16d5d4..0ac22d84e 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -41,11 +41,7 @@ import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.WorkflowExecutionCloseStatus; import com.uber.cadence.WorkflowIdReusePolicy; -import com.uber.cadence.activity.Activity; -import com.uber.cadence.activity.ActivityMethod; -import com.uber.cadence.activity.ActivityOptions; -import com.uber.cadence.activity.ActivityTask; -import com.uber.cadence.activity.LocalActivityOptions; +import com.uber.cadence.activity.*; import com.uber.cadence.client.ActivityCancelledException; import com.uber.cadence.client.ActivityCompletionClient; import com.uber.cadence.client.ActivityNotExistsException; @@ -73,7 +69,11 @@ import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.testing.WorkflowReplayer; -import com.uber.cadence.worker.*; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactory; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.worker.WorkerOptions; +import com.uber.cadence.worker.WorkflowImplementationOptions; import com.uber.cadence.workflow.Functions.Func; import com.uber.cadence.workflow.Functions.Func1; import java.io.File; @@ -2941,6 +2941,7 @@ public String execute(String taskList) { } } + @ActivityInterface public interface AngryChildActivity { @ActivityMethod @@ -3674,6 +3675,7 @@ public void testChildWorkflowWithCronSchedule() { Assert.assertEquals("run 2", lastCompletionResult); } + @ActivityInterface public interface TestActivities { String sleepActivity(long milliseconds, int input); @@ -4755,6 +4757,7 @@ public void testUUIDAndRandom() { tracer.setExpected("sideEffect", "sideEffect", "executeActivity TestActivities::activity2"); } + @ActivityInterface public interface GenericParametersActivity { List execute(List arg1, Set arg2); @@ -4863,6 +4866,7 @@ public NonSerializableException() { } } + @ActivityInterface public interface NonSerializableExceptionActivity { void execute(); } @@ -4907,9 +4911,8 @@ public void testNonSerializableExceptionInActivity() { assertTrue(result.contains("NonSerializableException")); } + @ActivityInterface public interface NonDeserializableArgumentsActivity { - - @ActivityMethod void execute(int arg); } From acb797fa33782cf267e75697061a6f282c5bbbdf Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 2 Apr 2020 12:50:17 -0700 Subject: [PATCH 07/10] Added ActivityInterface hierarchy test --- .../uber/cadence/internal/sync/POJOActivityTaskHandler.java | 6 ++++++ .../{worker => internal/sync}/ActivityInterfaceTest.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) rename src/test/java/com/uber/cadence/{worker => internal/sync}/ActivityInterfaceTest.java (99%) diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java index 2cbadf144..3088edbfe 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java @@ -17,6 +17,7 @@ package com.uber.cadence.internal.sync; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.uber.cadence.PollForActivityTaskResponse; @@ -158,6 +159,11 @@ public boolean isAnyTypeSupported() { return !activities.isEmpty(); } + @VisibleForTesting + public Set getRegisteredActivityTypes() { + return activities.keySet(); + } + void setActivitiesImplementation(Object[] activitiesImplementation) { activities.clear(); for (Object activity : activitiesImplementation) { diff --git a/src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java b/src/test/java/com/uber/cadence/internal/sync/ActivityInterfaceTest.java similarity index 99% rename from src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java rename to src/test/java/com/uber/cadence/internal/sync/ActivityInterfaceTest.java index de2d1e626..693f540f9 100644 --- a/src/test/java/com/uber/cadence/worker/ActivityInterfaceTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/ActivityInterfaceTest.java @@ -15,7 +15,7 @@ * permissions and limitations under the License. */ -package com.uber.cadence.worker; +package com.uber.cadence.internal.sync; import com.google.common.base.Objects; import com.uber.cadence.activity.ActivityInterface; From 68767aeefc98ca55292549843f02fe9c2d930fd1 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 2 Apr 2020 16:09:21 -0700 Subject: [PATCH 08/10] Added support for calling polymorphic activities --- .../sync/ActivityInvocationHandler.java | 8 ++- .../sync/ActivityInvocationHandlerBase.java | 52 ++++++++-------- .../sync/LocalActivityInvocationHandler.java | 11 +++- .../sync/POJOActivityTaskHandler.java | 33 ++++++++-- .../sync/TestActivityEnvironmentInternal.java | 3 +- .../internal/sync/WorkflowInternal.java | 4 +- .../internal/testing/ActivityTestingTest.java | 61 +++++++++++++++++++ .../uber/cadence/workflow/WorkflowTest.java | 1 + 8 files changed, 135 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java index 3131787da..e15ed4f94 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java @@ -30,13 +30,15 @@ class ActivityInvocationHandler extends ActivityInvocationHandlerBase { private final WorkflowInterceptor activityExecutor; static InvocationHandler newInstance( - ActivityOptions options, WorkflowInterceptor activityExecutor) { - return new ActivityInvocationHandler(options, activityExecutor); + Class activityInterface, ActivityOptions options, WorkflowInterceptor activityExecutor) { + return new ActivityInvocationHandler(activityInterface, activityExecutor, options); } - private ActivityInvocationHandler(ActivityOptions options, WorkflowInterceptor activityExecutor) { + private ActivityInvocationHandler( + Class activityInterface, WorkflowInterceptor activityExecutor, ActivityOptions options) { this.options = options; this.activityExecutor = activityExecutor; + init(activityInterface); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java index 36659fe5b..001d6d86e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java @@ -18,17 +18,19 @@ package com.uber.cadence.internal.sync; import static com.uber.cadence.internal.common.InternalUtils.getValueOrDefault; +import static com.uber.cadence.internal.sync.POJOActivityTaskHandler.getAnnotatedInterfaceMethodsFromInterface; +import com.uber.cadence.activity.ActivityInterface; import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.common.MethodRetry; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.sync.AsyncInternal.AsyncMarker; -import com.uber.cadence.workflow.Workflow; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.function.Function; /** Dynamic implementation of a strongly typed child workflow interface. */ @@ -45,33 +47,35 @@ static T newProxy(Class activityInterface, InvocationHandler invocationHa invocationHandler); } + protected void init(Class activityInterface) { + Set activityMethods = + getAnnotatedInterfaceMethodsFromInterface(activityInterface, ActivityInterface.class); + if (activityMethods.isEmpty()) { + throw new IllegalArgumentException( + "Class doesn't implement any non empty interface annotated with @ActivityInterface: " + + activityInterface.getName()); + } + for (POJOActivityTaskHandler.MethodInterfacePair pair : activityMethods) { + Method method = pair.getMethod(); + ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class); + String activityType; + if (activityMethod != null && !activityMethod.name().isEmpty()) { + activityType = activityMethod.name(); + } else { + activityType = InternalUtils.getSimpleName(pair.getType(), method); + } + + MethodRetry methodRetry = method.getAnnotation(MethodRetry.class); + Function function = getActivityFunc(method, methodRetry, activityType); + methodFunctions.put(method, function); + } + } + @Override public Object invoke(Object proxy, Method method, Object[] args) { Function function = methodFunctions.get(method); if (function == null) { - try { - if (method.equals(Object.class.getMethod("toString"))) { - // TODO: activity info - return "ActivityInvocationHandlerBase"; - } - if (!method.getDeclaringClass().isInterface()) { - throw new IllegalArgumentException( - "Interface type is expected: " + method.getDeclaringClass()); - } - MethodRetry methodRetry = method.getAnnotation(MethodRetry.class); - ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class); - String activityName; - if (activityMethod == null || activityMethod.name().isEmpty()) { - activityName = InternalUtils.getSimpleName(method); - } else { - activityName = activityMethod.name(); - } - - function = getActivityFunc(method, methodRetry, activityName); - methodFunctions.put(method, function); - } catch (NoSuchMethodException e) { - throw Workflow.wrap(e); - } + throw new IllegalArgumentException("Unexpected method: " + method); } return getValueOrDefault(function.apply(args), method.getReturnType()); } diff --git a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java index 5f0c74c30..cc468fa07 100644 --- a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java @@ -30,14 +30,19 @@ class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { private final WorkflowInterceptor activityExecutor; static InvocationHandler newInstance( - LocalActivityOptions options, WorkflowInterceptor activityExecutor) { - return new LocalActivityInvocationHandler(options, activityExecutor); + Class activityInterface, + LocalActivityOptions options, + WorkflowInterceptor activityExecutor) { + return new LocalActivityInvocationHandler(activityInterface, activityExecutor, options); } private LocalActivityInvocationHandler( - LocalActivityOptions options, WorkflowInterceptor activityExecutor) { + Class activityInterface, + WorkflowInterceptor activityExecutor, + LocalActivityOptions options) { this.options = options; this.activityExecutor = activityExecutor; + init(activityInterface); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java index 3088edbfe..ba660b680 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java @@ -88,7 +88,7 @@ private void addActivityImplementation( } } Set activityMethods = - getAnnotatedInterfaceMethods(cls, ActivityInterface.class); + getAnnotatedInterfaceMethodsFromImplementation(cls, ActivityInterface.class); if (activityMethods.isEmpty()) { throw new IllegalArgumentException( "Class doesn't implement any non empty interface annotated with @ActivityInterface: " @@ -338,7 +338,7 @@ public int hashCode() { } } - Set getAnnotatedInterfaceMethods( + static Set getAnnotatedInterfaceMethodsFromImplementation( Class implementationClass, Class annotationClass) { if (implementationClass.isInterface()) { throw new IllegalArgumentException( @@ -347,11 +347,33 @@ Set getAnnotatedInterfaceMethods( Set pairs = new HashSet<>(); // Methods inherited from interfaces that are not annotated with @ActivityInterface Set ignored = new HashSet<>(); - getAnnotatedInterfaceMethods(implementationClass, annotationClass, ignored, pairs); + getAnnotatedInterfaceMethodsFromImplementation( + implementationClass, annotationClass, ignored, pairs); return pairs; } - private void getAnnotatedInterfaceMethods( + static Set getAnnotatedInterfaceMethodsFromInterface( + Class iClass, Class annotationClass) { + if (!iClass.isInterface()) { + throw new IllegalArgumentException("Interface expected. Found: " + iClass.getSimpleName()); + } + Annotation annotation = iClass.getAnnotation(annotationClass); + if (annotation == null) { + throw new IllegalArgumentException( + "@ActivityInterface annotation is required on the stub interface: " + + iClass.getSimpleName()); + } + Set pairs = new HashSet<>(); + // Methods inherited from interfaces that are not annotated with @ActivityInterface + Set ignored = new HashSet<>(); + getAnnotatedInterfaceMethodsFromImplementation(iClass, annotationClass, ignored, pairs); + if (!ignored.isEmpty()) { + throw new IllegalStateException("Not empty ignored: " + ignored); + } + return pairs; + } + + private static void getAnnotatedInterfaceMethodsFromImplementation( Class current, Class annotationClass, Set methods, @@ -368,7 +390,8 @@ private void getAnnotatedInterfaceMethods( Class[] interfaces = current.getInterfaces(); for (int i = 0; i < interfaces.length; i++) { Class anInterface = interfaces[i]; - getAnnotatedInterfaceMethods(anInterface, annotationClass, ourMethods, result); + getAnnotatedInterfaceMethodsFromImplementation( + anInterface, annotationClass, ourMethods, result); } Annotation annotation = current.getAnnotation(annotationClass); if (annotation == null) { diff --git a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java index 9bd1cbb83..95d065a8b 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java @@ -177,7 +177,8 @@ public T newActivityStub(Class activityInterface) { .setHeartbeatTimeout(Duration.ofSeconds(1)) .build(); InvocationHandler invocationHandler = - ActivityInvocationHandler.newInstance(options, new TestActivityExecutor(workflowService)); + ActivityInvocationHandler.newInstance( + activityInterface, options, new TestActivityExecutor(workflowService)); invocationHandler = new DeterministicRunnerWrapper(invocationHandler); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java index 9ada6ad1f..6140e4de0 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java @@ -149,7 +149,7 @@ public static long currentTimeMillis() { public static T newActivityStub(Class activityInterface, ActivityOptions options) { WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor(); InvocationHandler invocationHandler = - ActivityInvocationHandler.newInstance(options, decisionContext); + ActivityInvocationHandler.newInstance(activityInterface, options, decisionContext); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } @@ -162,7 +162,7 @@ public static T newLocalActivityStub( Class activityInterface, LocalActivityOptions options) { WorkflowInterceptor decisionContext = WorkflowInternal.getWorkflowInterceptor(); InvocationHandler invocationHandler = - LocalActivityInvocationHandler.newInstance(options, decisionContext); + LocalActivityInvocationHandler.newInstance(activityInterface, options, decisionContext); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } diff --git a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java index ba4199a82..f1ff72caa 100644 --- a/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/ActivityTestingTest.java @@ -33,6 +33,8 @@ import com.uber.cadence.workflow.ActivityFailureException; import io.netty.util.internal.ConcurrentSet; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.thrift.TException; @@ -252,4 +254,63 @@ public void testHeartbeatIntermittentError() throws TException, InterruptedExcep activity.activity1(); assertEquals(3, count.get()); } + + public interface A { + void a(); + } + + @ActivityInterface + public interface B extends A { + void b(); + } + + @ActivityInterface + public interface C extends B, A { + void c(); + } + + public class CImpl implements C { + private List invocations = new ArrayList<>(); + + @Override + public void a() { + invocations.add("a"); + } + + @Override + public void b() { + invocations.add("b"); + } + + @Override + public void c() { + invocations.add("c"); + } + } + + @Test + public void testInvokingActivityByBaseInterface() { + CImpl impl = new CImpl(); + testEnvironment.registerActivitiesImplementations(impl); + try { + testEnvironment.newActivityStub(A.class); + fail("A doesn't implement activity"); + } catch (IllegalArgumentException e) { + // expected as A doesn't implement any activity + } + B b = testEnvironment.newActivityStub(B.class); + b.a(); + b.b(); + C c = testEnvironment.newActivityStub(C.class); + c.a(); + c.b(); + c.c(); + List expected = new ArrayList<>(); + expected.add("a"); + expected.add("b"); + expected.add("a"); + expected.add("b"); + expected.add("c"); + assertEquals(expected, impl.invocations); + } } diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 0ac22d84e..63bc010e4 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -5441,6 +5441,7 @@ public interface GreetingWorkflow { void createGreeting(String name); } + @ActivityInterface public interface GreetingActivities { @ActivityMethod String composeGreeting(String string); From b4db3856bdd53ec76c10a48b94262164a1bbc645 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 2 Apr 2020 16:20:59 -0700 Subject: [PATCH 09/10] Added JavaDoc of ActivityInterface --- .../cadence/activity/ActivityInterface.java | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/uber/cadence/activity/ActivityInterface.java b/src/main/java/com/uber/cadence/activity/ActivityInterface.java index f29f28c93..9a0dd45c5 100644 --- a/src/main/java/com/uber/cadence/activity/ActivityInterface.java +++ b/src/main/java/com/uber/cadence/activity/ActivityInterface.java @@ -17,6 +17,7 @@ package com.uber.cadence.activity; +import com.uber.cadence.workflow.Workflow; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -24,12 +25,51 @@ /** * Indicates that the interface is an activity interface. Only interfaces annotated with this - * annotation can be used as parameters to {@link - * com.uber.cadence.workflow.Workflow#newActivityStub(Class)} methods. + * annotation can be used as parameters to {@link Workflow#newActivityStub(Class)} methods. * *

Each method of the interface annotated with ActivityInterface including inherited * from interfaces is a separate activity. By default the name of an activity type is "short * interface name"_"method name". + * + *

Example: + * + *


+ *  public interface A {
+ *      a();
+ *  }
+ *
+ * {@literal @}ActivityInterface
+ *  public interface B extends A {
+ *     b();
+ *  }
+ *
+ * {@literal @}ActivityInterface
+ *  public interface C extends B {
+ *     c();
+ *  }
+ *
+ *  public class CImpl implements C {
+ *      public void a() {}
+ *      public void b() {}
+ *      public void c() {}
+ *  }
+ * 
+ * + * When CImpl instance is registered with the {@link com.uber.cadence.worker.Worker} the + * following activities are registered: + * + *

+ * + *

    + *
  • B_a + *
  • B_b + *
  • C_c + *
+ * + * Note that method a() is registered as "B_a" because interface A lacks + * ActivityInterface annotation. The workflow code can call activities through stubs to B + * and C interfaces. A call to crate stub to A interface will fail + * as A is not annotated with ActivityInterface. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) From 4a32bb1485bb2738cae2c5ce49ea0a84837407ab Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Wed, 28 Oct 2020 20:51:52 -0700 Subject: [PATCH 10/10] Add ActivityMethod fields back for backward compatibility --- .../cadence/activity/ActivityInterface.java | 4 +- .../uber/cadence/activity/ActivityMethod.java | 43 +++++++++++++++++++ .../sync/ActivityInvocationHandler.java | 34 +++++++++++++-- .../sync/ActivityInvocationHandlerBase.java | 7 +-- .../sync/LocalActivityInvocationHandler.java | 19 +++++--- .../cadence/activity/ActivityOptionsTest.java | 1 - 6 files changed, 94 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/uber/cadence/activity/ActivityInterface.java b/src/main/java/com/uber/cadence/activity/ActivityInterface.java index 9a0dd45c5..81b6cc6f8 100644 --- a/src/main/java/com/uber/cadence/activity/ActivityInterface.java +++ b/src/main/java/com/uber/cadence/activity/ActivityInterface.java @@ -55,8 +55,8 @@ * } * * - * When CImpl instance is registered with the {@link com.uber.cadence.worker.Worker} the - * following activities are registered: + * When CImpl instance is registered with the {@link com.uber.cadence.worker.Worker} + * the following activities are registered: * *

* diff --git a/src/main/java/com/uber/cadence/activity/ActivityMethod.java b/src/main/java/com/uber/cadence/activity/ActivityMethod.java index 878821ebf..00118830a 100644 --- a/src/main/java/com/uber/cadence/activity/ActivityMethod.java +++ b/src/main/java/com/uber/cadence/activity/ActivityMethod.java @@ -21,6 +21,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.time.Duration; /** * Indicates that the method is an activity method. This annotation applies only to activity @@ -32,4 +33,46 @@ /** Name of the workflow type. Default is {short class name}::{method name} */ String name() default ""; + + /** + * Overall timeout workflow is willing to wait for activity to complete. It includes time in a + * task list (use {@link #scheduleToStartTimeoutSeconds()} to limit it) plus activity execution + * time (use {@link #startToCloseTimeoutSeconds()} to limit it). Either this option or both + * schedule to start and start to close are required. + * + * @deprecated use {@link ActivityOptions.Builder#setScheduleToCloseTimeout(Duration)} instead. + */ + int scheduleToCloseTimeoutSeconds() default 0; + + /** + * Time activity can stay in task list before it is picked up by a worker. If schedule to close is + * not provided then both this and start to close are required. + * + * @deprecated use {@link ActivityOptions.Builder#setScheduleToStartTimeout(Duration)} instead. + */ + int scheduleToStartTimeoutSeconds() default 0; + + /** + * Maximum activity execution time after it was sent to a worker. If schedule to close is not + * provided then both this and schedule to start are required. + * + * @deprecated use {@link ActivityOptions.Builder#setStartToCloseTimeout(Duration)} instead. + */ + int startToCloseTimeoutSeconds() default 0; + + /** + * Heartbeat interval. Activity must heartbeat before this interval passes after a last heartbeat + * or activity start. + * + * @deprecated use {@link ActivityOptions.Builder#setHeartbeatTimeout(Duration)} instead. + */ + int heartbeatTimeoutSeconds() default 0; + + /** + * Task list to use when dispatching activity task to a worker. By default it is the same task + * list name the workflow was started with. + * + * @deprecated use {@link ActivityOptions.Builder#setTaskList(String)} instead. + */ + String taskList() default ""; } diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java index e15ed4f94..2bf87603e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java @@ -17,12 +17,15 @@ package com.uber.cadence.internal.sync; +import com.google.common.base.Strings; +import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.common.MethodRetry; import com.uber.cadence.workflow.ActivityStub; import com.uber.cadence.workflow.WorkflowInterceptor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.time.Duration; import java.util.function.Function; class ActivityInvocationHandler extends ActivityInvocationHandlerBase { @@ -41,12 +44,37 @@ private ActivityInvocationHandler( init(activityInterface); } + @SuppressWarnings("deprecation") @Override protected Function getActivityFunc( - Method method, MethodRetry methodRetry, String activityName) { + Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName) { Function function; - ActivityOptions mergedOptions = - ActivityOptions.newBuilder(options).setMethodRetry(methodRetry).build(); + ActivityOptions.Builder optionsBuilder = + ActivityOptions.newBuilder(options).setMethodRetry(methodRetry); + if (activityMethod != null) { + // options always take precedence over activity method annotation. + if (options.getStartToCloseTimeout() == null) { + optionsBuilder.setStartToCloseTimeout( + Duration.ofSeconds(activityMethod.startToCloseTimeoutSeconds())); + } + if (options.getScheduleToStartTimeout() == null) { + optionsBuilder.setScheduleToStartTimeout( + Duration.ofSeconds(activityMethod.scheduleToStartTimeoutSeconds())); + } + if (options.getScheduleToCloseTimeout() == null) { + optionsBuilder.setScheduleToCloseTimeout( + Duration.ofSeconds(activityMethod.scheduleToCloseTimeoutSeconds())); + } + if (options.getHeartbeatTimeout() == null) { + optionsBuilder.setHeartbeatTimeout( + Duration.ofSeconds(activityMethod.heartbeatTimeoutSeconds())); + } + if (Strings.isNullOrEmpty(options.getTaskList()) + && !Strings.isNullOrEmpty(activityMethod.taskList())) { + optionsBuilder.setTaskList(activityMethod.taskList()); + } + } + ActivityOptions mergedOptions = optionsBuilder.build(); ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = diff --git a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java index 001d6d86e..f1f2bf9ba 100644 --- a/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java +++ b/src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandlerBase.java @@ -47,7 +47,7 @@ static T newProxy(Class activityInterface, InvocationHandler invocationHa invocationHandler); } - protected void init(Class activityInterface) { + void init(Class activityInterface) { Set activityMethods = getAnnotatedInterfaceMethodsFromInterface(activityInterface, ActivityInterface.class); if (activityMethods.isEmpty()) { @@ -66,7 +66,8 @@ protected void init(Class activityInterface) { } MethodRetry methodRetry = method.getAnnotation(MethodRetry.class); - Function function = getActivityFunc(method, methodRetry, activityType); + Function function = + getActivityFunc(method, methodRetry, activityMethod, activityType); methodFunctions.put(method, function); } } @@ -81,5 +82,5 @@ public Object invoke(Object proxy, Method method, Object[] args) { } protected abstract Function getActivityFunc( - Method method, MethodRetry methodRetry, String activityName); + Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName); } diff --git a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java index cc468fa07..df4ebc085 100644 --- a/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/LocalActivityInvocationHandler.java @@ -17,12 +17,14 @@ package com.uber.cadence.internal.sync; +import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.LocalActivityOptions; import com.uber.cadence.common.MethodRetry; import com.uber.cadence.workflow.ActivityStub; import com.uber.cadence.workflow.WorkflowInterceptor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.time.Duration; import java.util.function.Function; class LocalActivityInvocationHandler extends ActivityInvocationHandlerBase { @@ -45,14 +47,21 @@ private LocalActivityInvocationHandler( init(activityInterface); } + @SuppressWarnings("deprecation") @Override protected Function getActivityFunc( - Method method, MethodRetry methodRetry, String activityName) { + Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName) { Function function; - LocalActivityOptions mergedOptions = - LocalActivityOptions.newBuilder(options) - .setMethodRetry(methodRetry) - .validateAndBuildWithDefaults(); + LocalActivityOptions.Builder optionsBuilder = + LocalActivityOptions.newBuilder(options).setMethodRetry(methodRetry); + if (activityMethod != null) { + // options always take precedence over activity method annotation. + if (options.getScheduleToCloseTimeout() == null) { + optionsBuilder.setScheduleToCloseTimeout( + Duration.ofSeconds(activityMethod.scheduleToCloseTimeoutSeconds())); + } + } + LocalActivityOptions mergedOptions = optionsBuilder.validateAndBuildWithDefaults(); ActivityStub stub = LocalActivityStubImpl.newInstance(mergedOptions, activityExecutor); function = (a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a); diff --git a/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java b/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java index 2e2a54ecd..654110715 100644 --- a/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java +++ b/src/test/java/com/uber/cadence/activity/ActivityOptionsTest.java @@ -40,7 +40,6 @@ public void activityAndRetryOptions() {} @Test public void testOnlyAnnotationsPresent() throws NoSuchMethodException { Method method = ActivityOptionsTest.class.getMethod("activityAndRetryOptions"); - ActivityMethod a = method.getAnnotation(ActivityMethod.class); MethodRetry r = method.getAnnotation(MethodRetry.class); ActivityOptions o = ActivityOptions.newBuilder().build(); ActivityOptions merged = ActivityOptions.newBuilder(o).setMethodRetry(r).build();