From 2a7afc061098c4080b8eff392b67989a18ddf7cb Mon Sep 17 00:00:00 2001 From: Joaquim Silva Date: Mon, 29 Jan 2024 13:58:44 +0000 Subject: [PATCH] Limit alarm handler walltime to 15minutes --- src/workerd/api/global-scope.c++ | 38 ++++++++++++++++++++++------ src/workerd/api/global-scope.h | 1 + src/workerd/io/limit-enforcer.h | 3 +++ src/workerd/io/worker-entrypoint.c++ | 9 ++++++- src/workerd/server/server.c++ | 1 + src/workerd/tests/test-fixture.c++ | 1 + 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index d96da0082f6..2b2fc54b615 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -364,6 +364,7 @@ void ServiceWorkerGlobalScope::startScheduled( kj::Promise ServiceWorkerGlobalScope::runAlarm( kj::Date scheduledTime, + kj::Duration timeout, Worker::Lock& lock, kj::Maybe exportedHandler) { auto& context = IoContext::current(); @@ -386,21 +387,41 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( auto& alarm = KJ_ASSERT_NONNULL(handler.alarm); - auto alarmResultPromise = context - .run([exportedHandler, &alarm, + return context + .run([exportedHandler, &context, timeout, &alarm, maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)] (Worker::Lock& lock) mutable -> kj::Promise { jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext); + // We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise + // completes we want to cancel the alarm handler. If the alarm handler promise completes first + // timeout will be canceled. + auto timeoutPromise = context.afterLimitTimeout(timeout).then([&context]() -> kj::Promise { + LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time"); + // We don't want to delete the alarm since we have not successfully completed the alarm + // execution. + auto& actor = KJ_ASSERT_NONNULL(context.getActor()); + auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); + persistent.cancelDeferredAlarmDeletion(); + // We don't want the handler to keep running after timeout. + context.abort(JSG_KJ_EXCEPTION(FAILED, Error, "Alarm exceeded its allowed execution time")); + // We want timed out alarms to be treated as user errors. As such, we'll mark them as + // retriable, and we'll count the retries against the alarm retries limit. This will ensure + // that the handler will attempt to run for a number of times before giving up and deleting + // the alarm. + return WorkerInterface::AlarmResult { + .retry = true, + .retryCountsAgainstLimit = true, + .outcome = EventOutcome::EXCEEDED_CPU + }; + }); + return alarm(lock).then([]() -> kj::Promise { return WorkerInterface::AlarmResult { .retry = false, .outcome = EventOutcome::OK }; - }); - }); - - return alarmResultPromise - .catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { + }).exclusiveJoin(kj::mv(timeoutPromise)); + }).catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable { auto& actor = KJ_ASSERT_NONNULL(context.getActor()); auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent()); persistent.cancelDeferredAlarmDeletion(); @@ -419,7 +440,8 @@ kj::Promise ServiceWorkerGlobalScope::runAlarm( .retryCountsAgainstLimit = !context.isOutputGateBroken(), .outcome = outcome }; - }).then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { + }) + .then([&context](WorkerInterface::AlarmResult result) -> kj::Promise { return context.waitForOutputLocks().then([result]() { return kj::mv(result); }, [](kj::Exception&& e) { diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 7e525dae2ea..f04a517253b 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -303,6 +303,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // Received runAlarm (called from C++, not JS). kj::Promise runAlarm( kj::Date scheduledTime, + kj::Duration timeout, Worker::Lock& lock, kj::Maybe exportedHandler); // Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns diff --git a/src/workerd/io/limit-enforcer.h b/src/workerd/io/limit-enforcer.h index a182cbb2923..c33fc5ef03e 100644 --- a/src/workerd/io/limit-enforcer.h +++ b/src/workerd/io/limit-enforcer.h @@ -120,6 +120,9 @@ class LimitEnforcer { // Like limitDrain() but applies a time limit to scheduled event processing. virtual kj::Promise limitScheduled() = 0; + // Like limitDrain() and limitScheduled() but applies a time limit to alarm event processing. + virtual kj::Duration getAlarmLimitMs() = 0; + // Gets a byte size limit to apply to operations that will buffer a possibly large amount of // data in C++ memory, such as reading an entire HTTP response into an `ArrayBuffer`. virtual size_t getBufferingLimit() = 0; diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 602ca01d82a..d5a68bd9e53 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -554,8 +554,15 @@ kj::Promise WorkerEntrypoint::runAlarmImpl( [scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){ jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + // If we have an invalid timeout, set it to the default value of 15 minutes. + auto timeout = context.getLimitEnforcer().getAlarmLimitMs(); + if (timeout == 0 * kj::MILLISECONDS) { + LOG_NOSENTRY(WARNING, "Invalid alarm timeout value. Using 15 minutes", timeout); + timeout = 15 * kj::MINUTES; + } + auto handler = lock.getExportedHandler(entrypointName, context.getActor()); - return lock.getGlobalScope().runAlarm(scheduledTime, lock, handler); + return lock.getGlobalScope().runAlarm(scheduledTime, timeout, lock, handler); }); // The alarm handler was successfully complete. We must guarantee this same alarm does not diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 108fbddf032..387ef7c63d0 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2091,6 +2091,7 @@ private: void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + kj::Duration getAlarmLimitMs() override { return 0 * kj::MILLISECONDS; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; } diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index e1b1ca776bd..5f73f5e6df4 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -111,6 +111,7 @@ struct MockLimitEnforcer final: public LimitEnforcer { void newAnalyticsEngineRequest() override {} kj::Promise limitDrain() override { return kj::NEVER_DONE; } kj::Promise limitScheduled() override { return kj::NEVER_DONE; } + kj::Duration getAlarmLimitMs() override { return 0 * kj::MILLISECONDS; } size_t getBufferingLimit() override { return kj::maxValue; } kj::Maybe getLimitsExceeded() override { return kj::none; } kj::Promise onLimitsExceeded() override { return kj::NEVER_DONE; }