diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index d96da0082f6..112576f7b74 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -364,6 +364,7 @@ void ServiceWorkerGlobalScope::startScheduled( kj::Promise ServiceWorkerGlobalScope::runAlarm( kj::Date scheduledTime, + uint32_t timeout, Worker::Lock& lock, kj::Maybe exportedHandler) { auto& context = IoContext::current(); @@ -386,21 +387,42 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); - auto alarmResultPromise = context - .run([exportedHandler, &alarm, + return context + .run([exportedHandler, &context, timeoutMs = timeout, &alarm, maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)] (Worker::Lock& lock) mutable -> kj::Promise { jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); + + // We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise + // completes we want to cancel the alarm handler. If the alarm handler promise completes first + // timeout will be canceled. + auto timeoutPromise = context.afterLimitTimeout(timeoutMs*kj::MILLISECONDS).then([&context]() -> kj::Promise { + LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time"); + // We don't want to delete the alarm sicne we have not successfully completed the alarm + // execution. + auto& actor = KJ_ASSERT_NONNULL(context.getActor()); + auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); + persistent.cancelDeferredAlarmDeletion(); + // We don't want the handler to keep running after timeout. + context.abort(JSG_KJ_EXCEPTION(FAILED, Error, "Alarm exceeded its allowed execution time")); + // For now, we don't want to make the timeout destructive. As such, we will mark it as + // retriable, but let's now count retries against the limit. This behaviour will most likely + // change in the near future. + // TODO(soon): Make alarm timeouts not retriable. + return WorkerInterface::AlarmResult { + .retry = true, + .retryCountsAgainstLimit = false, + .outcome = EventOutcome::EXCEPTION + }; + }); + return alarm(lock).then([]() -> kj::Promise { return WorkerInterface::AlarmResult { .retry = false, .outcome = EventOutcome::OK }; - }); - }); - - return alarmResultPromise - .catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { + }).exclusiveJoin(kj::mv(timeoutPromise)); + }).catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { auto& actor = KJ_ASSERT_NONNULL(context.getActor()); auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); persistent.cancelDeferredAlarmDeletion(); @@ -419,7 +441,8 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( .retryCountsAgainstLimit = !context.isOutputGateBroken(), .outcome = outcome }; - }).then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { + }) + .then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { return context.waitForOutputLocks().then([result]() { return kj::mv(result); }, [](kj::Exception&& e) { diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index ccd66c8639b..e8f1e316a40 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -298,6 +298,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // Received runAlarm (called from C++, not JS). kj::Promise runAlarm( kj::Date scheduledTime, + uint32_t timeout, Worker::Lock& lock, kj::Maybe exportedHandler); // Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns diff --git a/src/workerd/io/limit-enforcer.h b/src/workerd/io/limit-enforcer.h index a182cbb2923..44e8b4df21e 100644 --- a/src/workerd/io/limit-enforcer.h +++ b/src/workerd/io/limit-enforcer.h @@ -120,6 +120,9 @@ class LimitEnforcer { // Like limitDrain() but applies a time limit to scheduled event processing. virtual kj::Promise limitScheduled() = 0; + // Like limitDrain() and limitScheduled() but applies a time limit to alarm event processing. + virtual uint32_t getAlarmLimitMs() = 0; + // Gets a byte size limit to apply to operations that will buffer a possibly large amount of // data in C++ memory, such as reading an entire HTTP response into an `ArrayBuffer`. virtual size_t getBufferingLimit() = 0; diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index f6e060a275b..cece1969328 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -551,11 +551,17 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( try { auto result = co_await context.run( - [scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){ + [scheduledTime, timeout=context.getLimitEnforcer().getAlarmLimitMs(), entrypointName=entrypointName, &context](Worker::Lock& lock){ jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + // If we have an invalid timeout, set it to the default value of 15 minutes. + auto timeoutMs = timeout; + if (timeout == 0) { + timeoutMs = 900000; + } + auto handler = lock.getExportedHandler(entrypointName, context.getActor()); - return lock.getGlobalScope().runAlarm(scheduledTime, lock, handler); + return lock.getGlobalScope().runAlarm(scheduledTime, timeoutMs, lock, handler); }); // We succeeded, inform any other entrypoints that may be waiting upon us. diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 4fe5910b30e..22dd873448c 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2090,6 +2090,7 @@ private: void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + uint32_t getAlarmLimitMs() override { return 0; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 24a6cde607a..3aa046ccbdc 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -130,6 +130,7 @@ struct MockLimitEnforcer final: public LimitEnforcer { void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + uint32_t getAlarmLimitMs() override { return 0; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; }