diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index d96da0082f6..2b2fc54b615 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -364,6 +364,7 @@ void ServiceWorkerGlobalScope::startScheduled( kj::Promise ServiceWorkerGlobalScope::runAlarm( kj::Date scheduledTime, + kj::Duration timeout, Worker::Lock& lock, kj::Maybe exportedHandler) { auto& context = IoContext::current(); @@ -386,21 +387,41 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); - auto alarmResultPromise = context - .run([exportedHandler, &alarm, + return context + .run([exportedHandler, &context, timeout, &alarm, maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)] (Worker::Lock& lock) mutable -> kj::Promise { jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); + // We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise + // completes we want to cancel the alarm handler. If the alarm handler promise completes first + // timeout will be canceled. + auto timeoutPromise = context.afterLimitTimeout(timeout).then([&context]() -> kj::Promise { + LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time"); + // We don't want to delete the alarm since we have not successfully completed the alarm + // execution. + auto& actor = KJ_ASSERT_NONNULL(context.getActor()); + auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); + persistent.cancelDeferredAlarmDeletion(); + // We don't want the handler to keep running after timeout. + context.abort(JSG_KJ_EXCEPTION(FAILED, Error, "Alarm exceeded its allowed execution time")); + // We want timed out alarms to be treated as user errors. As such, we'll mark them as + // retriable, and we'll count the retries against the alarm retries limit. This will ensure + // that the handler will attempt to run for a number of times before giving up and deleting + // the alarm. + return WorkerInterface::AlarmResult { + .retry = true, + .retryCountsAgainstLimit = true, + .outcome = EventOutcome::EXCEEDED_CPU + }; + }); + return alarm(lock).then([]() -> kj::Promise { return WorkerInterface::AlarmResult { .retry = false, .outcome = EventOutcome::OK }; - }); - }); - - return alarmResultPromise - .catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { + }).exclusiveJoin(kj::mv(timeoutPromise)); + }).catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { auto& actor = KJ_ASSERT_NONNULL(context.getActor()); auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); persistent.cancelDeferredAlarmDeletion(); @@ -419,7 +440,8 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( .retryCountsAgainstLimit = !context.isOutputGateBroken(), .outcome = outcome }; - }).then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { + }) + .then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { return context.waitForOutputLocks().then([result]() { return kj::mv(result); }, [](kj::Exception&& e) { diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 7e525dae2ea..f04a517253b 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -303,6 +303,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // Received runAlarm (called from C++, not JS). kj::Promise runAlarm( kj::Date scheduledTime, + kj::Duration timeout, Worker::Lock& lock, kj::Maybe exportedHandler); // Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns diff --git a/src/workerd/io/limit-enforcer.h b/src/workerd/io/limit-enforcer.h index a182cbb2923..40091556f7e 100644 --- a/src/workerd/io/limit-enforcer.h +++ b/src/workerd/io/limit-enforcer.h @@ -120,6 +120,9 @@ class LimitEnforcer { // Like limitDrain() but applies a time limit to scheduled event processing. virtual kj::Promise limitScheduled() = 0; + // Like limitDrain() and limitScheduled() but applies a time limit to alarm event processing. + virtual kj::Duration getAlarmLimit() = 0; + // Gets a byte size limit to apply to operations that will buffer a possibly large amount of // data in C++ memory, such as reading an entire HTTP response into an `ArrayBuffer`. virtual size_t getBufferingLimit() = 0; diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 602ca01d82a..0bb9b50d50d 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -554,8 +554,16 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( [scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){ jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + // TODO(soon): Set the alarm timeout to 15 minutes. + // If we have an invalid timeout, set it to the default value of 30 minutes. + auto timeout = context.getLimitEnforcer().getAlarmLimit(); + if (timeout == 0 * kj::MILLISECONDS) { + LOG_NOSENTRY(WARNING, "Invalid alarm timeout value. Using 30 minutes", timeout); + timeout = 30 * kj::MINUTES; + } + auto handler = lock.getExportedHandler(entrypointName, context.getActor()); - return lock.getGlobalScope().runAlarm(scheduledTime, lock, handler); + return lock.getGlobalScope().runAlarm(scheduledTime, timeout, lock, handler); }); // The alarm handler was successfully complete. We must guarantee this same alarm does not diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 108fbddf032..798142da361 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2091,6 +2091,7 @@ private: void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + kj::Duration getAlarmLimit() override { return 0 * kj::MILLISECONDS; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 0b8becd8299..419671ec3cb 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -130,6 +130,7 @@ struct MockLimitEnforcer final: public LimitEnforcer { void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + kj::Duration getAlarmLimit() override { return 0 * kj::MILLISECONDS; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; }