Skip to content

Commit

Permalink
Limit alarm handler walltime to 15minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Feb 6, 2024
1 parent 15f4640 commit 2a7afc0
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 9 deletions.
38 changes: 30 additions & 8 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ void ServiceWorkerGlobalScope::startScheduled(

kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
kj::Date scheduledTime,
kj::Duration timeout,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler) {

auto& context = IoContext::current();
Expand All @@ -386,21 +387,41 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

auto alarmResultPromise = context
.run([exportedHandler, &alarm,
return context
.run([exportedHandler, &context, timeout, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]
(Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeout).then([&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// We don't want to delete the alarm since we have not successfully completed the alarm
// execution.
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
// We don't want the handler to keep running after timeout.
context.abort(JSG_KJ_EXCEPTION(FAILED, Error, "Alarm exceeded its allowed execution time"));
// We want timed out alarms to be treated as user errors. As such, we'll mark them as
// retriable, and we'll count the retries against the alarm retries limit. This will ensure
// that the handler will attempt to run for a number of times before giving up and deleting
// the alarm.
return WorkerInterface::AlarmResult {
.retry = true,
.retryCountsAgainstLimit = true,
.outcome = EventOutcome::EXCEEDED_CPU
};
});

return alarm(lock).then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult {
.retry = false,
.outcome = EventOutcome::OK
};
});
});

return alarmResultPromise
.catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
}).exclusiveJoin(kj::mv(timeoutPromise));
}).catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
Expand All @@ -419,7 +440,8 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
.retryCountsAgainstLimit = !context.isOutputGateBroken(),
.outcome = outcome
};
}).then([&context](WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
})
.then([&context](WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then([result]() {
return kj::mv(result);
}, [](kj::Exception&& e) {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// Received runAlarm (called from C++, not JS).
kj::Promise<WorkerInterface::AlarmResult> runAlarm(
kj::Date scheduledTime,
kj::Duration timeout,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler);

// Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/limit-enforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class LimitEnforcer {
// Like limitDrain() but applies a time limit to scheduled event processing.
virtual kj::Promise<void> limitScheduled() = 0;

// Like limitDrain() and limitScheduled() but applies a time limit to alarm event processing.
virtual kj::Duration getAlarmLimitMs() = 0;

// Gets a byte size limit to apply to operations that will buffer a possibly large amount of
// data in C++ memory, such as reading an entire HTTP response into an `ArrayBuffer`.
virtual size_t getBufferingLimit() = 0;
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,15 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
[scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

// If we have an invalid timeout, set it to the default value of 15 minutes.
auto timeout = context.getLimitEnforcer().getAlarmLimitMs();
if (timeout == 0 * kj::MILLISECONDS) {
LOG_NOSENTRY(WARNING, "Invalid alarm timeout value. Using 15 minutes", timeout);
timeout = 15 * kj::MINUTES;
}

auto handler = lock.getExportedHandler(entrypointName, context.getActor());
return lock.getGlobalScope().runAlarm(scheduledTime, lock, handler);
return lock.getGlobalScope().runAlarm(scheduledTime, timeout, lock, handler);
});

// The alarm handler was successfully complete. We must guarantee this same alarm does not
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,7 @@ private:
void newAnalyticsEngineRequest() override {}
kj::Promise<void> limitDrain() override { return kj::NEVER_DONE; }
kj::Promise<void> limitScheduled() override { return kj::NEVER_DONE; }
kj::Duration getAlarmLimitMs() override { return 0 * kj::MILLISECONDS; }
size_t getBufferingLimit() override { return kj::maxValue; }
kj::Maybe<EventOutcome> getLimitsExceeded() override { return kj::none; }
kj::Promise<void> onLimitsExceeded() override { return kj::NEVER_DONE; }
Expand Down
1 change: 1 addition & 0 deletions src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ struct MockLimitEnforcer final: public LimitEnforcer {
void newAnalyticsEngineRequest() override {}
kj::Promise<void> limitDrain() override { return kj::NEVER_DONE; }
kj::Promise<void> limitScheduled() override { return kj::NEVER_DONE; }
kj::Duration getAlarmLimitMs() override { return 0 * kj::MILLISECONDS; }
size_t getBufferingLimit() override { return kj::maxValue; }
kj::Maybe<EventOutcome> getLimitsExceeded() override { return kj::none; }
kj::Promise<void> onLimitsExceeded() override { return kj::NEVER_DONE; }
Expand Down

0 comments on commit 2a7afc0

Please sign in to comment.