From 5e9b33fdb0b8002c92ae50e2141e6b4d02fc74f8 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Wed, 28 Oct 2020 11:01:17 -0700 Subject: [PATCH] Cleanup retryer and retry options (#558) --- .../cadence/client/WorkflowClientOptions.java | 4 +- .../common/{Retryer.java => RpcRetryer.java} | 28 +++-- .../common/WorkflowExecutionUtils.java | 16 +-- .../GenericWorkflowClientExternalImpl.java | 38 +++---- .../ManualActivityCompletionClientImpl.java | 14 +-- .../internal/replay/ReplayDecider.java | 4 +- .../replay/ReplayDecisionTaskHandler.java | 6 +- .../sync/POJOActivityTaskHandler.java | 9 +- .../sync/TestWorkflowEnvironmentInternal.java | 3 - .../internal/worker/ActivityTaskHandler.java | 10 +- .../internal/worker/ActivityWorker.java | 37 +------ .../internal/worker/DecisionTaskHandler.java | 12 +- .../internal/worker/PollerOptions.java | 4 +- .../internal/worker/SingleWorkerOptions.java | 42 ------- .../internal/worker/WorkflowWorker.java | 25 +---- .../cadence/serviceclient/ClientOptions.java | 4 +- .../java/com/uber/cadence/worker/Worker.java | 16 +-- .../cadence/worker/WorkerFactoryOptions.java | 4 +- .../uber/cadence/worker/WorkerOptions.java | 104 +----------------- .../cadence/internal/common/RetryerTest.java | 10 +- 20 files changed, 79 insertions(+), 311 deletions(-) rename src/main/java/com/uber/cadence/internal/common/{Retryer.java => RpcRetryer.java} (91%) diff --git a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java index d47ef8528..df59f25eb 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/internal/common/Retryer.java b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java similarity index 91% rename from src/main/java/com/uber/cadence/internal/common/Retryer.java rename to src/main/java/com/uber/cadence/internal/common/RpcRetryer.java index a2d7e0a9a..4f27f6f87 100644 --- a/src/main/java/com/uber/cadence/internal/common/Retryer.java +++ b/src/main/java/com/uber/cadence/internal/common/RpcRetryer.java @@ -1,8 +1,8 @@ /* - * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. - * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -19,7 +19,13 @@ import static com.uber.cadence.internal.common.CheckedExceptionWrapper.unwrap; -import com.uber.cadence.*; +import com.uber.cadence.BadRequestError; +import com.uber.cadence.CancellationAlreadyRequestedError; +import com.uber.cadence.DomainAlreadyExistsError; +import com.uber.cadence.DomainNotActiveError; +import com.uber.cadence.EntityNotExistsError; +import com.uber.cadence.QueryFailedError; +import com.uber.cadence.WorkflowExecutionAlreadyStartedError; import com.uber.cadence.common.RetryOptions; import java.time.Duration; import java.util.concurrent.CancellationException; @@ -31,8 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class Retryer { - public static final RetryOptions DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; +public final class RpcRetryer { + public static final RetryOptions DEFAULT_RPC_RETRY_OPTIONS; private static final Duration RETRY_SERVICE_OPERATION_INITIAL_INTERVAL = Duration.ofMillis(20); private static final Duration RETRY_SERVICE_OPERATION_EXPIRATION_INTERVAL = Duration.ofMinutes(1); @@ -58,7 +64,7 @@ public final class Retryer { QueryFailedError.class, DomainNotActiveError.class, CancellationAlreadyRequestedError.class); - DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults(); + DEFAULT_RPC_RETRY_OPTIONS = roBuilder.validateBuildWithDefaults(); } public interface RetryableProc { @@ -81,7 +87,7 @@ private static class ValueExceptionPair { private final CompletableFuture value; private final Throwable exception; - public ValueExceptionPair(CompletableFuture value, Throwable exception) { + ValueExceptionPair(CompletableFuture value, Throwable exception) { this.value = value; this.exception = exception; } @@ -95,7 +101,7 @@ public Throwable getException() { } } - private static final Logger log = LoggerFactory.getLogger(Retryer.class); + private static final Logger log = LoggerFactory.getLogger(RpcRetryer.class); public static void retry(RetryOptions options, RetryableProc r) throws T { @@ -107,6 +113,10 @@ public static void retry(RetryOptions options, RetryablePr }); } + public static void retry(RetryableProc r) throws T { + retry(DEFAULT_RPC_RETRY_OPTIONS, r); + } + public static R retryWithResult( RetryOptions options, RetryableFunc r) throws T { int attempt = 0; @@ -266,5 +276,5 @@ private static void rethrow(Exception e) throws T { } /** Prohibits instantiation. */ - private Retryer() {} + private RpcRetryer() {} } diff --git a/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java b/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java index 02bf2cc24..5f37a16df 100644 --- a/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java +++ b/src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java @@ -27,7 +27,6 @@ import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.uber.cadence.ActivityType; -import com.uber.cadence.BadRequestError; import com.uber.cadence.Decision; import com.uber.cadence.DecisionType; import com.uber.cadence.DescribeWorkflowExecutionRequest; @@ -91,15 +90,6 @@ public class WorkflowExecutionUtils { */ private static final String INDENTATION = " "; - private static RetryOptions retryParameters = - new RetryOptions.Builder() - .setBackoffCoefficient(2) - .setInitialInterval(Duration.ofMillis(500)) - .setMaximumInterval(Duration.ofSeconds(30)) - .setMaximumAttempts(Integer.MAX_VALUE) - .setDoNotRetry(BadRequestError.class, EntityNotExistsError.class) - .build(); - // Wait period for passive cluster to retry getting workflow result in case of replication delay. private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500; @@ -216,7 +206,7 @@ private static HistoryEvent getInstanceCloseEvent( RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit); try { response = - Retryer.retryWithResult( + RpcRetryer.retryWithResult( retryOptions, () -> service.GetWorkflowExecutionHistoryWithTimeout(r, unit.toMillis(timeout))); } catch (EntityNotExistsError e) { @@ -343,7 +333,7 @@ private static CompletableFuture getInstanceCloseEventAsync( } private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit unit) { - return new RetryOptions.Builder(retryParameters) + return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS) .setExpiration(Duration.ofSeconds(unit.toSeconds(timeout))) .build(); } @@ -355,7 +345,7 @@ private static RetryOptions getRetryOptionWithTimeout(long timeout, TimeUnit uni long timeout, TimeUnit unit) { RetryOptions retryOptions = getRetryOptionWithTimeout(timeout, unit); - return Retryer.retryWithResultAsync( + return RpcRetryer.retryWithResultAsync( retryOptions, () -> { CompletableFuture result = new CompletableFuture<>(); diff --git a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java index 6d86d186a..ebcfc67da 100644 --- a/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java @@ -118,9 +118,8 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters StartWorkflowExecutionResponse result; try { result = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.StartWorkflowExecution(request)); + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.StartWorkflowExecution(request)); } catch (WorkflowExecutionAlreadyStartedError e) { throw e; } catch (TException e) { @@ -137,7 +136,7 @@ private RetryOptions getRetryOptionsWithExpiration(RetryOptions o, Long timeoutI if (timeoutInMillis == null || timeoutInMillis <= 0 || timeoutInMillis == Long.MAX_VALUE) { return o; } - return new RetryOptions.Builder(Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS) + return new RetryOptions.Builder(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS) .setExpiration(Duration.ofMillis((timeoutInMillis))) .build(); } @@ -146,9 +145,8 @@ private CompletableFuture startWorkflowAsyncInternal( StartWorkflowExecutionParameters startParameters, Long timeoutInMillis) { StartWorkflowExecutionRequest request = getStartRequest(startParameters); - return Retryer.retryWithResultAsync( - getRetryOptionsWithExpiration( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis), + return RpcRetryer.retryWithResultAsync( + getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis), () -> { CompletableFuture result = new CompletableFuture<>(); try { @@ -272,9 +270,7 @@ public void signalWorkflowExecution(SignalExternalWorkflowParameters signalParam SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.SignalWorkflowExecution(request)); + RpcRetryer.retry(() -> service.SignalWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -290,9 +286,8 @@ public CompletableFuture signalWorkflowExecutionAsync( public CompletableFuture signalWorkflowExecutionAsync( SignalExternalWorkflowParameters signalParameters, Long timeoutInMillis) { SignalWorkflowExecutionRequest request = getSignalRequest(signalParameters); - return Retryer.retryWithResultAsync( - getRetryOptionsWithExpiration( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, timeoutInMillis), + return RpcRetryer.retryWithResultAsync( + getRetryOptionsWithExpiration(RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, timeoutInMillis), () -> { CompletableFuture result = new CompletableFuture<>(); try { @@ -387,8 +382,8 @@ private WorkflowExecution signalWithStartWorkflowInternal( StartWorkflowExecutionResponse result; try { result = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.SignalWithStartWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); @@ -405,9 +400,7 @@ public void requestCancelWorkflowExecution(WorkflowExecution execution) { request.setDomain(domain); request.setWorkflowExecution(execution); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RequestCancelWorkflowExecution(request)); + RpcRetryer.retry(() -> service.RequestCancelWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } @@ -427,9 +420,8 @@ public QueryWorkflowResponse queryWorkflow(QueryWorkflowParameters queryParamete request.setQueryRejectCondition(queryParameters.getQueryRejectCondition()); try { QueryWorkflowResponse response = - Retryer.retryWithResult( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.QueryWorkflow(request)); + RpcRetryer.retryWithResult( + RpcRetryer.DEFAULT_RPC_RETRY_OPTIONS, () -> service.QueryWorkflow(request)); return response; } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); @@ -451,9 +443,7 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionParameters term request.setReason(terminateParameters.getReason()); // request.setChildPolicy(terminateParameters.getChildPolicy()); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.TerminateWorkflowExecution(request)); + RpcRetryer.retry(() -> service.TerminateWorkflowExecution(request)); } catch (TException e) { throw CheckedExceptionWrapper.wrap(e); } diff --git a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java index ab30159bc..3367e705a 100644 --- a/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java +++ b/src/main/java/com/uber/cadence/internal/external/ManualActivityCompletionClientImpl.java @@ -31,7 +31,7 @@ import com.uber.cadence.client.ActivityCompletionFailureException; import com.uber.cadence.client.ActivityNotExistsException; import com.uber.cadence.converter.DataConverter; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.serviceclient.IWorkflowService; import com.uber.m3.tally.Scope; @@ -95,9 +95,7 @@ public void complete(Object result) { request.setResult(convertedResult); request.setTaskToken(taskToken); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskCompleted(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskCompleted(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -139,9 +137,7 @@ public void fail(Throwable failure) { request.setDetails(dataConverter.toData(failure)); request.setTaskToken(taskToken); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskFailed(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailed(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); @@ -156,9 +152,7 @@ public void fail(Throwable failure) { request.setWorkflowID(execution.getWorkflowId()); request.setRunID(execution.getRunId()); try { - Retryer.retry( - Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS, - () -> service.RespondActivityTaskFailedByID(request)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailedByID(request)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_BY_ID_COUNTER).inc(1); } catch (EntityNotExistsError e) { throw new ActivityNotExistsException(e); diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index d019005e0..b3ddcf760 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -32,7 +32,7 @@ import com.uber.cadence.WorkflowType; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.internal.common.OptionsUtils; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents; @@ -667,7 +667,7 @@ public HistoryEvent next() { try { GetWorkflowExecutionHistoryResponse r = - Retryer.retryWithResult( + RpcRetryer.retryWithResult( retryOptions, () -> service.GetWorkflowExecutionHistory(request)); current = r.getHistory().getEventsIterator(); nextPageToken = r.getNextPageToken(); diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index d06a0c01b..c1c6d9b05 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -113,7 +113,7 @@ public DecisionTaskHandler.Result handleDecisionTask(PollForDecisionTaskResponse e.printStackTrace(pw); String stackTrace = sw.toString(); failedRequest.setDetails(stackTrace.getBytes(StandardCharsets.UTF_8)); - return new DecisionTaskHandler.Result(null, failedRequest, null, null); + return new DecisionTaskHandler.Result(null, failedRequest, null); } } @@ -236,7 +236,7 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) { cache.markProcessingDone(decisionTask); } } - return new Result(null, null, queryCompletedRequest, null); + return new Result(null, null, queryCompletedRequest); } private Result createCompletedRequest( @@ -254,7 +254,7 @@ private Result createCompletedRequest( (int) stickyTaskListScheduleToStartTimeout.getSeconds()); completedRequest.setStickyAttributes(attributes); } - return new Result(completedRequest, null, null, null); + return new Result(completedRequest, null, null); } @Override diff --git a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java index ec49d1469..8f907875e 100644 --- a/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java @@ -147,8 +147,7 @@ private ActivityTaskHandler.Result mapToActivityFailure( failure = CheckedExceptionWrapper.unwrap(failure); result.setReason(failure.getClass().getName()); result.setDetails(dataConverter.toData(failure)); - return new ActivityTaskHandler.Result( - null, new Result.TaskFailedResult(result, failure), null, null); + return new ActivityTaskHandler.Result(null, new Result.TaskFailedResult(result, failure), null); } @Override @@ -214,12 +213,12 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc Object result = method.invoke(activity, args); RespondActivityTaskCompletedRequest request = new RespondActivityTaskCompletedRequest(); if (context.isDoNotCompleteOnReturn()) { - return new ActivityTaskHandler.Result(null, null, null, null); + return new ActivityTaskHandler.Result(null, null, null); } if (method.getReturnType() != Void.TYPE) { request.setResult(dataConverter.toData(result)); } - return new ActivityTaskHandler.Result(request, null, null, null); + return new ActivityTaskHandler.Result(request, null, null); } catch (RuntimeException | IllegalAccessException e) { return mapToActivityFailure(e, metricsScope, false); } catch (InvocationTargetException e) { @@ -252,7 +251,7 @@ public ActivityTaskHandler.Result execute(ActivityTaskImpl task, Scope metricsSc if (method.getReturnType() != Void.TYPE) { request.setResult(dataConverter.toData(result)); } - return new ActivityTaskHandler.Result(request, null, null, null); + return new ActivityTaskHandler.Result(request, null, null); } catch (RuntimeException | IllegalAccessException e) { return mapToActivityFailure(e, metricsScope, true); } catch (InvocationTargetException e) { diff --git a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java index 4e61afc6f..236aa3bca 100644 --- a/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java +++ b/src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java @@ -139,9 +139,6 @@ public Worker newWorker( WorkerOptions.Builder builder = WorkerOptions.newBuilder() .setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory()); - if (testEnvironmentOptions.getDataConverter() != null) { - builder.setDataConverter(testEnvironmentOptions.getDataConverter()); - } builder = overrideOptions.apply(builder); return workerFactory.newWorker(taskList, builder.build()); } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java b/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java index e49e5ff56..9d18c1ad9 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityTaskHandler.java @@ -21,7 +21,6 @@ import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; -import com.uber.cadence.common.RetryOptions; import com.uber.m3.tally.Scope; import java.time.Duration; @@ -37,7 +36,6 @@ final class Result { private final RespondActivityTaskCompletedRequest taskCompleted; private final TaskFailedResult taskFailed; private final RespondActivityTaskCanceledRequest taskCancelled; - private final RetryOptions requestRetryOptions; private int attempt; private Duration backoff; @@ -68,12 +66,10 @@ public Throwable getFailure() { public Result( RespondActivityTaskCompletedRequest taskCompleted, TaskFailedResult taskFailed, - RespondActivityTaskCanceledRequest taskCancelled, - RetryOptions requestRetryOptions) { + RespondActivityTaskCanceledRequest taskCancelled) { this.taskCompleted = taskCompleted; this.taskFailed = taskFailed; this.taskCancelled = taskCancelled; - this.requestRetryOptions = requestRetryOptions; } public RespondActivityTaskCompletedRequest getTaskCompleted() { @@ -88,10 +84,6 @@ public RespondActivityTaskCanceledRequest getTaskCancelled() { return taskCancelled; } - public RetryOptions getRequestRetryOptions() { - return requestRetryOptions; - } - public void setAttempt(int attempt) { this.attempt = attempt; } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 01d65383b..a0a86014a 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -17,18 +17,14 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.BadRequestError; -import com.uber.cadence.DomainNotActiveError; -import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.Header; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.RespondActivityTaskCanceledRequest; import com.uber.cadence.RespondActivityTaskCompletedRequest; import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.WorkflowExecution; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -150,7 +146,7 @@ public void handle(PollForActivityTaskResponse task) throws Exception { cancelledRequest.setDetails( String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8)); Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); - sendReply(task, new Result(null, null, cancelledRequest, null), metricsScope); + sendReply(task, new Result(null, null, cancelledRequest), metricsScope); sw.stop(); } finally { MDC.remove(LoggerTag.ACTIVITY_ID); @@ -201,49 +197,26 @@ public Throwable wrapFailure(PollForActivityTaskResponse task, Throwable failure private void sendReply( PollForActivityTaskResponse task, ActivityTaskHandler.Result response, Scope metricsScope) throws TException { - RetryOptions ro = response.getRequestRetryOptions(); RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted(); if (taskCompleted != null) { - ro = - options - .getReportCompletionRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class); taskCompleted.setTaskToken(task.getTaskToken()); taskCompleted.setIdentity(options.getIdentity()); - Retryer.retry(ro, () -> service.RespondActivityTaskCompleted(taskCompleted)); + RpcRetryer.retry(() -> service.RespondActivityTaskCompleted(taskCompleted)); metricsScope.counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1); } else { if (response.getTaskFailedResult() != null) { RespondActivityTaskFailedRequest taskFailed = response.getTaskFailedResult().getTaskFailedRequest(); - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); taskFailed.setTaskToken(task.getTaskToken()); taskFailed.setIdentity(options.getIdentity()); - Retryer.retry(ro, () -> service.RespondActivityTaskFailed(taskFailed)); + RpcRetryer.retry(() -> service.RespondActivityTaskFailed(taskFailed)); metricsScope.counter(MetricsType.ACTIVITY_TASK_FAILED_COUNTER).inc(1); } else { RespondActivityTaskCanceledRequest taskCancelled = response.getTaskCancelled(); if (taskCancelled != null) { taskCancelled.setTaskToken(task.getTaskToken()); taskCancelled.setIdentity(options.getIdentity()); - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); - Retryer.retry(ro, () -> service.RespondActivityTaskCanceled(taskCancelled)); + RpcRetryer.retry(() -> service.RespondActivityTaskCanceled(taskCancelled)); metricsScope.counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java index 2193e82d1..70c10f55f 100644 --- a/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/worker/DecisionTaskHandler.java @@ -21,7 +21,6 @@ import com.uber.cadence.RespondDecisionTaskCompletedRequest; import com.uber.cadence.RespondDecisionTaskFailedRequest; import com.uber.cadence.RespondQueryTaskCompletedRequest; -import com.uber.cadence.common.RetryOptions; /** * Interface of workflow task handlers. @@ -34,17 +33,14 @@ final class Result { private final RespondDecisionTaskCompletedRequest taskCompleted; private final RespondDecisionTaskFailedRequest taskFailed; private final RespondQueryTaskCompletedRequest queryCompleted; - private final RetryOptions requestRetryOptions; public Result( RespondDecisionTaskCompletedRequest taskCompleted, RespondDecisionTaskFailedRequest taskFailed, - RespondQueryTaskCompletedRequest queryCompleted, - RetryOptions requestRetryOptions) { + RespondQueryTaskCompletedRequest queryCompleted) { this.taskCompleted = taskCompleted; this.taskFailed = taskFailed; this.queryCompleted = queryCompleted; - this.requestRetryOptions = requestRetryOptions; } public RespondDecisionTaskCompletedRequest getTaskCompleted() { @@ -59,10 +55,6 @@ public RespondQueryTaskCompletedRequest getQueryCompleted() { return queryCompleted; } - public RetryOptions getRequestRetryOptions() { - return requestRetryOptions; - } - @Override public String toString() { return "Result{" @@ -72,8 +64,6 @@ public String toString() { + taskFailed + ", queryCompleted=" + queryCompleted - + ", requestRetryOptions=" - + requestRetryOptions + '}'; } } diff --git a/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java b/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java index 9b78546b1..23d43feea 100644 --- a/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/PollerOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java index 3a6d66916..3216a66e0 100644 --- a/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/com/uber/cadence/internal/worker/SingleWorkerOptions.java @@ -17,11 +17,9 @@ package com.uber.cadence.internal.worker; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.converter.JsonDataConverter; -import com.uber.cadence.internal.common.Retryer; import com.uber.cadence.internal.metrics.NoopScope; import com.uber.m3.tally.Scope; import java.time.Duration; @@ -43,10 +41,6 @@ public static final class Builder { private int taskExecutorThreadPoolSize = 100; private double taskListActivitiesPerSecond; private PollerOptions pollerOptions; - /** TODO: Dynamic expiration based on activity timeout */ - private RetryOptions reportCompletionRetryOptions; - - private RetryOptions reportFailureRetryOptions; private Scope metricsScope; private boolean enableLoggingInReplay; private List contextPropagators; @@ -59,8 +53,6 @@ public Builder(SingleWorkerOptions options) { this.pollerOptions = options.getPollerOptions(); this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond(); this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize(); - this.reportCompletionRetryOptions = options.getReportCompletionRetryOptions(); - this.reportFailureRetryOptions = options.getReportFailureRetryOptions(); this.metricsScope = options.getMetricsScope(); this.enableLoggingInReplay = options.getEnableLoggingInReplay(); this.contextPropagators = options.getContextPropagators(); @@ -101,16 +93,6 @@ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond return this; } - public Builder setReportCompletionRetryOptions(RetryOptions reportCompletionRetryOptions) { - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - return this; - } - - public Builder setReportFailureRetryOptions(RetryOptions reportFailureRetryOptions) { - this.reportFailureRetryOptions = reportFailureRetryOptions; - return this; - } - /** Specifies the list of context propagators to use during this workflow. */ public Builder setContextPropagators(List contextPropagators) { this.contextPropagators = contextPropagators; @@ -118,14 +100,6 @@ public Builder setContextPropagators(List contextPropagators) } public SingleWorkerOptions build() { - if (reportCompletionRetryOptions == null) { - reportCompletionRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; - } - - if (reportFailureRetryOptions == null) { - reportFailureRetryOptions = Retryer.DEFAULT_SERVICE_OPERATION_RETRY_OPTIONS; - } - if (pollerOptions == null) { pollerOptions = PollerOptions.newBuilder() @@ -149,8 +123,6 @@ public SingleWorkerOptions build() { taskExecutorThreadPoolSize, taskListActivitiesPerSecond, pollerOptions, - reportCompletionRetryOptions, - reportFailureRetryOptions, metricsScope, enableLoggingInReplay, contextPropagators); @@ -162,8 +134,6 @@ public SingleWorkerOptions build() { private final int taskExecutorThreadPoolSize; private final double taskListActivitiesPerSecond; private final PollerOptions pollerOptions; - private final RetryOptions reportCompletionRetryOptions; - private final RetryOptions reportFailureRetryOptions; private final Scope metricsScope; private final boolean enableLoggingInReplay; private List contextPropagators; @@ -174,8 +144,6 @@ private SingleWorkerOptions( int taskExecutorThreadPoolSize, double taskListActivitiesPerSecond, PollerOptions pollerOptions, - RetryOptions reportCompletionRetryOptions, - RetryOptions reportFailureRetryOptions, Scope metricsScope, boolean enableLoggingInReplay, List contextPropagators) { @@ -184,8 +152,6 @@ private SingleWorkerOptions( this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.pollerOptions = pollerOptions; - this.reportCompletionRetryOptions = reportCompletionRetryOptions; - this.reportFailureRetryOptions = reportFailureRetryOptions; this.metricsScope = metricsScope; this.enableLoggingInReplay = enableLoggingInReplay; this.contextPropagators = contextPropagators; @@ -207,14 +173,6 @@ PollerOptions getPollerOptions() { return pollerOptions; } - RetryOptions getReportCompletionRetryOptions() { - return reportCompletionRetryOptions; - } - - RetryOptions getReportFailureRetryOptions() { - return reportFailureRetryOptions; - } - double getTaskListActivitiesPerSecond() { return taskListActivitiesPerSecond; } diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java index 959024ea3..d6a7bbeee 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java @@ -18,9 +18,6 @@ package com.uber.cadence.internal.worker; import com.google.common.base.Strings; -import com.uber.cadence.BadRequestError; -import com.uber.cadence.DomainNotActiveError; -import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; @@ -32,9 +29,8 @@ import com.uber.cadence.WorkflowExecutionStartedEventAttributes; import com.uber.cadence.WorkflowQuery; import com.uber.cadence.WorkflowType; -import com.uber.cadence.common.RetryOptions; import com.uber.cadence.common.WorkflowExecutionHistory; -import com.uber.cadence.internal.common.Retryer; +import com.uber.cadence.internal.common.RpcRetryer; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; @@ -237,32 +233,17 @@ public Throwable wrapFailure(PollForDecisionTaskResponse task, Throwable failure private void sendReply( IWorkflowService service, byte[] taskToken, DecisionTaskHandler.Result response) throws TException { - RetryOptions ro = response.getRequestRetryOptions(); RespondDecisionTaskCompletedRequest taskCompleted = response.getTaskCompleted(); if (taskCompleted != null) { - ro = - options - .getReportCompletionRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class); taskCompleted.setIdentity(options.getIdentity()); taskCompleted.setTaskToken(taskToken); - Retryer.retry(ro, () -> service.RespondDecisionTaskCompleted(taskCompleted)); + RpcRetryer.retry(() -> service.RespondDecisionTaskCompleted(taskCompleted)); } else { RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed(); if (taskFailed != null) { - ro = - options - .getReportFailureRetryOptions() - .merge(ro) - .addDoNotRetry( - BadRequestError.class, - EntityNotExistsError.class, - DomainNotActiveError.class); taskFailed.setIdentity(options.getIdentity()); taskFailed.setTaskToken(taskToken); - Retryer.retry(ro, () -> service.RespondDecisionTaskFailed(taskFailed)); + RpcRetryer.retry(() -> service.RespondDecisionTaskFailed(taskFailed)); } else { RespondQueryTaskCompletedRequest queryCompleted = response.getQueryCompleted(); if (queryCompleted != null) { diff --git a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java index b8781f861..3cb9b08ce 100644 --- a/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java +++ b/src/main/java/com/uber/cadence/serviceclient/ClientOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017-2020 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 6fd167988..1dd9eb79e 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -84,11 +84,9 @@ public final class Worker implements Suspendable { SingleWorkerOptions activityOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getActivityPollerOptions()) - .setReportCompletionRetryOptions(options.getReportActivityCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportActivityFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) @@ -100,11 +98,9 @@ public final class Worker implements Suspendable { SingleWorkerOptions workflowOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) - .setReportCompletionRetryOptions(options.getReportWorkflowCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportWorkflowFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) @@ -112,11 +108,9 @@ public final class Worker implements Suspendable { .build(); SingleWorkerOptions localActivityOptions = SingleWorkerOptions.newBuilder() - .setDataConverter(options.getDataConverter()) + .setDataConverter(client.getOptions().getDataConverter()) .setIdentity(client.getOptions().getIdentity()) .setPollerOptions(options.getWorkflowPollerOptions()) - .setReportCompletionRetryOptions(options.getReportWorkflowCompletionRetryOptions()) - .setReportFailureRetryOptions(options.getReportWorkflowFailureRetryOptions()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentLocalActivityExecutionSize()) .setMetricsScope(metricsScope) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java index 5eb876d68..6d197a0c2 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactoryOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at diff --git a/src/main/java/com/uber/cadence/worker/WorkerOptions.java b/src/main/java/com/uber/cadence/worker/WorkerOptions.java index 799b81e94..cb66b3ade 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerOptions.java +++ b/src/main/java/com/uber/cadence/worker/WorkerOptions.java @@ -1,8 +1,8 @@ /* + * Modifications Copyright (c) 2017-2020 Uber Technologies Inc. + * Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License is * located at @@ -17,9 +17,6 @@ package com.uber.cadence.worker; -import com.uber.cadence.common.RetryOptions; -import com.uber.cadence.converter.DataConverter; -import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.worker.PollerOptions; import com.uber.cadence.workflow.WorkflowInterceptor; import java.util.Objects; @@ -47,24 +44,18 @@ public static WorkerOptions defaultInstance() { public static final class Builder { private double workerActivitiesPerSecond; - private DataConverter dataConverter = JsonDataConverter.getInstance(); private int maxConcurrentActivityExecutionSize = 100; private int maxConcurrentWorkflowExecutionSize = 50; private int maxConcurrentLocalActivityExecutionSize = 100; private double taskListActivitiesPerSecond = 100000; private PollerOptions activityPollerOptions; private PollerOptions workflowPollerOptions; - private RetryOptions reportActivityCompletionRetryOptions; - private RetryOptions reportActivityFailureRetryOptions; - private RetryOptions reportWorkflowCompletionRetryOptions; - private RetryOptions reportWorkflowFailureRetryOptions; private Function interceptorFactory = (n) -> n; private Builder() {} private Builder(WorkerOptions options) { this.workerActivitiesPerSecond = options.workerActivitiesPerSecond; - this.dataConverter = options.dataConverter; this.maxConcurrentActivityExecutionSize = options.maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = options.maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = @@ -72,22 +63,9 @@ private Builder(WorkerOptions options) { this.taskListActivitiesPerSecond = options.taskListActivitiesPerSecond; this.activityPollerOptions = options.activityPollerOptions; this.workflowPollerOptions = options.workflowPollerOptions; - this.reportActivityCompletionRetryOptions = options.reportActivityCompletionRetryOptions; - this.reportActivityFailureRetryOptions = options.reportActivityFailureRetryOptions; - this.reportWorkflowCompletionRetryOptions = options.reportWorkflowCompletionRetryOptions; - this.reportWorkflowFailureRetryOptions = options.reportWorkflowFailureRetryOptions; this.interceptorFactory = options.interceptorFactory; } - /** - * Override a data converter implementation used by workflows and activities executed by this - * worker. Default is {@link com.uber.cadence.converter.JsonDataConverter} data converter. - */ - public Builder setDataConverter(DataConverter dataConverter) { - this.dataConverter = Objects.requireNonNull(dataConverter); - return this; - } - /** Maximum number of activities started per second. Default is 0 which means unlimited. */ public Builder setWorkerActivitiesPerSecond(double workerActivitiesPerSecond) { if (workerActivitiesPerSecond <= 0) { @@ -138,34 +116,6 @@ public Builder setWorkflowPollerOptions(PollerOptions workflowPollerOptions) { return this; } - public Builder setReportActivityCompletionRetryOptions( - RetryOptions reportActivityCompletionRetryOptions) { - this.reportActivityCompletionRetryOptions = - Objects.requireNonNull(reportActivityCompletionRetryOptions); - return this; - } - - public Builder setReportActivityFailureRetryOptions( - RetryOptions reportActivityFailureRetryOptions) { - this.reportActivityFailureRetryOptions = - Objects.requireNonNull(reportActivityFailureRetryOptions); - return this; - } - - public Builder setReportWorkflowCompletionRetryOptions( - RetryOptions reportWorkflowCompletionRetryOptions) { - this.reportWorkflowCompletionRetryOptions = - Objects.requireNonNull(reportWorkflowCompletionRetryOptions); - return this; - } - - public Builder setReportWorkflowFailureRetryOptions( - RetryOptions reportWorkflowFailureRetryOptions) { - this.reportWorkflowFailureRetryOptions = - Objects.requireNonNull(reportWorkflowFailureRetryOptions); - return this; - } - public Builder setInterceptorFactory( Function interceptorFactory) { this.interceptorFactory = Objects.requireNonNull(interceptorFactory); @@ -188,61 +138,41 @@ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond public WorkerOptions build() { return new WorkerOptions( workerActivitiesPerSecond, - dataConverter, maxConcurrentActivityExecutionSize, maxConcurrentWorkflowExecutionSize, maxConcurrentLocalActivityExecutionSize, taskListActivitiesPerSecond, activityPollerOptions, workflowPollerOptions, - reportActivityCompletionRetryOptions, - reportActivityFailureRetryOptions, - reportWorkflowCompletionRetryOptions, - reportWorkflowFailureRetryOptions, interceptorFactory); } } private final double workerActivitiesPerSecond; - private final DataConverter dataConverter; private final int maxConcurrentActivityExecutionSize; private final int maxConcurrentWorkflowExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; private final double taskListActivitiesPerSecond; private final PollerOptions activityPollerOptions; private final PollerOptions workflowPollerOptions; - private final RetryOptions reportActivityCompletionRetryOptions; - private final RetryOptions reportActivityFailureRetryOptions; - private final RetryOptions reportWorkflowCompletionRetryOptions; - private final RetryOptions reportWorkflowFailureRetryOptions; private final Function interceptorFactory; private WorkerOptions( double workerActivitiesPerSecond, - DataConverter dataConverter, int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowExecutionSize, int maxConcurrentLocalActivityExecutionSize, double taskListActivitiesPerSecond, PollerOptions activityPollerOptions, PollerOptions workflowPollerOptions, - RetryOptions reportActivityCompletionRetryOptions, - RetryOptions reportActivityFailureRetryOptions, - RetryOptions reportWorkflowCompletionRetryOptions, - RetryOptions reportWorkflowFailureRetryOptions, Function interceptorFactory) { this.workerActivitiesPerSecond = workerActivitiesPerSecond; - this.dataConverter = dataConverter; this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowExecutionSize = maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.activityPollerOptions = activityPollerOptions; this.workflowPollerOptions = workflowPollerOptions; - this.reportActivityCompletionRetryOptions = reportActivityCompletionRetryOptions; - this.reportActivityFailureRetryOptions = reportActivityFailureRetryOptions; - this.reportWorkflowCompletionRetryOptions = reportWorkflowCompletionRetryOptions; - this.reportWorkflowFailureRetryOptions = reportWorkflowFailureRetryOptions; this.interceptorFactory = interceptorFactory; } @@ -250,10 +180,6 @@ public double getWorkerActivitiesPerSecond() { return workerActivitiesPerSecond; } - public DataConverter getDataConverter() { - return dataConverter; - } - public int getMaxConcurrentActivityExecutionSize() { return maxConcurrentActivityExecutionSize; } @@ -274,22 +200,6 @@ public PollerOptions getWorkflowPollerOptions() { return workflowPollerOptions; } - public RetryOptions getReportActivityCompletionRetryOptions() { - return reportActivityCompletionRetryOptions; - } - - public RetryOptions getReportActivityFailureRetryOptions() { - return reportActivityFailureRetryOptions; - } - - public RetryOptions getReportWorkflowCompletionRetryOptions() { - return reportWorkflowCompletionRetryOptions; - } - - public RetryOptions getReportWorkflowFailureRetryOptions() { - return reportWorkflowFailureRetryOptions; - } - public Function getInterceptorFactory() { return interceptorFactory; } @@ -299,8 +209,6 @@ public String toString() { return "WorkerOptions{" + "workerActivitiesPerSecond=" + workerActivitiesPerSecond - + ", dataConverter=" - + dataConverter + ", maxConcurrentActivityExecutionSize=" + maxConcurrentActivityExecutionSize + ", maxConcurrentWorkflowExecutionSize=" @@ -313,14 +221,6 @@ public String toString() { + activityPollerOptions + ", workflowPollerOptions=" + workflowPollerOptions - + ", reportActivityCompletionRetryOptions=" - + reportActivityCompletionRetryOptions - + ", reportActivityFailureRetryOptions=" - + reportActivityFailureRetryOptions - + ", reportWorkflowCompletionRetryOptions=" - + reportWorkflowCompletionRetryOptions - + ", reportWorkflowFailureRetryOptions=" - + reportWorkflowFailureRetryOptions + '}'; } } diff --git a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java index 1cbf543fe..612764e85 100644 --- a/src/test/java/com/uber/cadence/internal/common/RetryerTest.java +++ b/src/test/java/com/uber/cadence/internal/common/RetryerTest.java @@ -40,7 +40,7 @@ public void testExpiration() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { throw new IllegalArgumentException("simulated"); @@ -64,7 +64,7 @@ public void testExpirationFuture() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -91,7 +91,7 @@ public void testInterruptedException() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -117,7 +117,7 @@ public void testAddDoNotRetry() throws InterruptedException { options = options.addDoNotRetry(InterruptedException.class); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { CompletableFuture result = new CompletableFuture<>(); @@ -144,7 +144,7 @@ public void testMaxAttempt() throws InterruptedException { .validateBuildWithDefaults(); long start = System.currentTimeMillis(); try { - Retryer.retryWithResultAsync( + RpcRetryer.retryWithResultAsync( options, () -> { throw new IllegalArgumentException("simulated");