Skip to content

Commit

Permalink
Limit alarm handler walltime to 15minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Feb 5, 2024
1 parent 61e459a commit 2d5ce68
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 10 deletions.
39 changes: 31 additions & 8 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ void ServiceWorkerGlobalScope::startScheduled(

kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
kj::Date scheduledTime,
uint32_t timeout,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler) {

auto& context = IoContext::current();
Expand All @@ -386,21 +387,42 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(

auto& alarm = KJ_ASSERT_NONNULL(handler.alarm);

auto alarmResultPromise = context
.run([exportedHandler, &alarm,
return context
.run([exportedHandler, &context, timeoutMs = timeout, &alarm,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]
(Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);

// We want to limit alarm handler walltime to 15 minutes at most. If the timeout promise
// completes we want to cancel the alarm handler. If the alarm handler promise completes first
// timeout will be canceled.
auto timeoutPromise = context.afterLimitTimeout(timeoutMs*kj::MILLISECONDS).then([&context]() -> kj::Promise<WorkerInterface::AlarmResult> {
LOG_NOSENTRY(WARNING, "Alarm exceeded its allowed execution time");
// We don't want to delete the alarm sicne we have not successfully completed the alarm
// execution.
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
// We don't want the handler to keep running after timeout.
context.abort(JSG_KJ_EXCEPTION(FAILED, Error, "Alarm exceeded its allowed execution time"));
// For now, we don't want to make the timeout destructive. As such, we will mark it as
// retriable, but let's now count retries against the limit. This behaviour will most likely
// change in the near future.
// TODO(soon): Make alarm timeouts not retriable.
return WorkerInterface::AlarmResult {
.retry = true,
.retryCountsAgainstLimit = false,
.outcome = EventOutcome::EXCEPTION
};
});

return alarm(lock).then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
return WorkerInterface::AlarmResult {
.retry = false,
.outcome = EventOutcome::OK
};
});
});

return alarmResultPromise
.catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
}).exclusiveJoin(kj::mv(timeoutPromise));
}).catch_([&context, deferredDelete = kj::mv(deferredDelete)](kj::Exception&& e) mutable {
auto& actor = KJ_ASSERT_NONNULL(context.getActor());
auto& persistent = KJ_ASSERT_NONNULL(actor.getPersistent());
persistent.cancelDeferredAlarmDeletion();
Expand All @@ -419,7 +441,8 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(
.retryCountsAgainstLimit = !context.isOutputGateBroken(),
.outcome = outcome
};
}).then([&context](WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
})
.then([&context](WorkerInterface::AlarmResult result) -> kj::Promise<WorkerInterface::AlarmResult> {
return context.waitForOutputLocks().then([result]() {
return kj::mv(result);
}, [](kj::Exception&& e) {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
// Received runAlarm (called from C++, not JS).
kj::Promise<WorkerInterface::AlarmResult> runAlarm(
kj::Date scheduledTime,
uint32_t timeout,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler);

// Received test() (called from C++, not JS). See WorkerInterface::test(). This version returns
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/limit-enforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class LimitEnforcer {
// Like limitDrain() but applies a time limit to scheduled event processing.
virtual kj::Promise<void> limitScheduled() = 0;

// Like limitDrain() and limitScheduled() but applies a time limit to alarm event processing.
virtual uint32_t getAlarmLimitMs() = 0;

// Gets a byte size limit to apply to operations that will buffer a possibly large amount of
// data in C++ memory, such as reading an entire HTTP response into an `ArrayBuffer`.
virtual size_t getBufferingLimit() = 0;
Expand Down
10 changes: 8 additions & 2 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,17 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(

try {
auto result = co_await context.run(
[scheduledTime, entrypointName=entrypointName, &context](Worker::Lock& lock){
[scheduledTime, timeout=context.getLimitEnforcer().getAlarmLimitMs(), entrypointName=entrypointName, &context](Worker::Lock& lock){
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

// If we have an invalid timeout, set it to the default value of 15 minutes.
auto timeoutMs = timeout;
if (timeout == 0) {
timeoutMs = 900000;
}

auto handler = lock.getExportedHandler(entrypointName, context.getActor());
return lock.getGlobalScope().runAlarm(scheduledTime, lock, handler);
return lock.getGlobalScope().runAlarm(scheduledTime, timeoutMs, lock, handler);
});

// We succeeded, inform any other entrypoints that may be waiting upon us.
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2090,6 +2090,7 @@ private:
void newAnalyticsEngineRequest() override {}
kj::Promise<void> limitDrain() override { return kj::NEVER_DONE; }
kj::Promise<void> limitScheduled() override { return kj::NEVER_DONE; }
uint32_t getAlarmLimitMs() override { return 0; }
size_t getBufferingLimit() override { return kj::maxValue; }
kj::Maybe<EventOutcome> getLimitsExceeded() override { return kj::none; }
kj::Promise<void> onLimitsExceeded() override { return kj::NEVER_DONE; }
Expand Down
1 change: 1 addition & 0 deletions src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ struct MockLimitEnforcer final: public LimitEnforcer {
void newAnalyticsEngineRequest() override {}
kj::Promise<void> limitDrain() override { return kj::NEVER_DONE; }
kj::Promise<void> limitScheduled() override { return kj::NEVER_DONE; }
uint32_t getAlarmLimitMs() override { return 0; }
size_t getBufferingLimit() override { return kj::maxValue; }
kj::Maybe<EventOutcome> getLimitsExceeded() override { return kj::none; }
kj::Promise<void> onLimitsExceeded() override { return kj::NEVER_DONE; }
Expand Down

0 comments on commit 2d5ce68

Please sign in to comment.