From d1fd3d0478d1e715a49432dd5cbe3f077f808439 Mon Sep 17 00:00:00 2001 From: Dan Lapid Date: Mon, 7 Oct 2024 15:19:06 +0000 Subject: [PATCH] Change WorkerInterface so prewarm returns a promise. --- src/workerd/io/worker-entrypoint.c++ | 5 ++-- src/workerd/io/worker-interface.c++ | 37 +++++++++++----------------- src/workerd/io/worker-interface.h | 9 +++---- src/workerd/io/worker.c++ | 6 ++--- src/workerd/server/server.c++ | 11 ++++----- 5 files changed, 28 insertions(+), 40 deletions(-) diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index e5c309ee139..f82aa42208a 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -64,7 +64,7 @@ public: kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise test() override; @@ -446,7 +446,7 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); } -void WorkerEntrypoint::prewarm(kj::StringPtr url) { +kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { // Nothing to do, the worker is already loaded. TRACE_EVENT("workerd", "WorkerEntrypoint::prewarm()", "url", url.cStr()); auto incomingRequest = @@ -458,6 +458,7 @@ void WorkerEntrypoint::prewarm(kj::StringPtr url) { // TODO(someday): Ideally, middleware workers would forward prewarm() to the next stage. At // present we don't have a good way to decide what stage that is, especially given that we'll // be switching to `next` being a binding in the future. + return kj::READY_NOW; } kj::Promise WorkerEntrypoint::runScheduled( diff --git a/src/workerd/io/worker-interface.c++ b/src/workerd/io/worker-interface.c++ index 8c8a6fde4e6..c0ae2f45122 100644 --- a/src/workerd/io/worker-interface.c++ +++ b/src/workerd/io/worker-interface.c++ @@ -16,10 +16,8 @@ namespace { // interface the promise resolved to. class PromisedWorkerInterface final: public kj::Refcounted, public WorkerInterface { public: - PromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise) - : waitUntilTasks(waitUntilTasks), - promise(promise.then([this](kj::Own result) { worker = kj::mv(result); }) + PromisedWorkerInterface(kj::Promise> promise) + : promise(promise.then([this](kj::Own result) { worker = kj::mv(result); }) .fork()) {} kj::Promise request(kj::HttpMethod method, @@ -49,18 +47,12 @@ public: } } - void prewarm(kj::StringPtr url) override { + kj::Promise prewarm(kj::StringPtr url) override { KJ_IF_SOME(w, worker) { - w.get()->prewarm(url); + co_return co_await w.get()->prewarm(url); } else { - static auto constexpr handlePrewarm = - [](kj::Promise promise, kj::String url, - kj::Own self) -> kj::Promise { co_await promise; - KJ_ASSERT_NONNULL(self->worker)->prewarm(url); - }; - - waitUntilTasks.add(handlePrewarm(promise.addBranch(), kj::str(url), kj::addRef(*this))); + co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url); } } @@ -92,15 +84,13 @@ public: } private: - kj::TaskSet& waitUntilTasks; kj::ForkedPromise promise; kj::Maybe> worker; }; } // namespace -kj::Own newPromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise) { - return kj::refcounted(waitUntilTasks, kj::mv(promise)); +kj::Own newPromisedWorkerInterface(kj::Promise> promise) { + return kj::refcounted(kj::mv(promise)); } kj::Own asHttpClient(kj::Own workerInterface) { @@ -238,7 +228,7 @@ public: kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; @@ -273,8 +263,8 @@ RevocableWebSocketWorkerInterface::RevocableWebSocketWorkerInterface( : worker(worker), revokeProm(revokeProm.fork()) {} -void RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) { - worker.prewarm(url); +kj::Promise RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) { + return worker.prewarm(url); } kj::Promise RevocableWebSocketWorkerInterface::runScheduled( @@ -324,8 +314,9 @@ public: kj::throwFatalException(kj::mv(exception)); } - void prewarm(kj::StringPtr url) override { + kj::Promise prewarm(kj::StringPtr url) override { // ignore + return kj::READY_NOW; } kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { @@ -381,10 +372,10 @@ kj::Promise RpcWorkerInterface::connect(kj::StringPtr host, return promise.attach(kj::mv(inner)); } -void RpcWorkerInterface::prewarm(kj::StringPtr url) { +kj::Promise RpcWorkerInterface::prewarm(kj::StringPtr url) { auto req = dispatcher.prewarmRequest(capnp::MessageSize{url.size() / sizeof(capnp::word) + 4, 0}); req.setUrl(url); - waitUntilTasks.add(req.send().ignoreResult()); + return req.send().ignoreResult(); } kj::Promise RpcWorkerInterface::runScheduled( diff --git a/src/workerd/io/worker-interface.h b/src/workerd/io/worker-interface.h index ff7ca2db705..8645c4997cc 100644 --- a/src/workerd/io/worker-interface.h +++ b/src/workerd/io/worker-interface.h @@ -46,7 +46,7 @@ class WorkerInterface: public kj::HttpService { // to be invoked. // // If prewarm() has to do anything asynchronous, it should use "waitUntil" tasks. - virtual void prewarm(kj::StringPtr url) = 0; + virtual kj::Promise prewarm(kj::StringPtr url) = 0; struct ScheduledResult { bool retry = true; @@ -147,10 +147,7 @@ class WorkerInterface: public kj::HttpService { // Given a Promise for a WorkerInterface, return a WorkerInterface whose methods will first wait // for the promise, then invoke the destination object. -kj::Own newPromisedWorkerInterface( - kj::TaskSet& waitUntilTasks, kj::Promise> promise); -// TODO(cleanup): `waitUntilTasks` is only needed to handle `prewarm` since they -// don't return promises. We should maybe change them to return promises? +kj::Own newPromisedWorkerInterface(kj::Promise> promise); // Adapts WorkerInterface to HttpClient, including taking ownership. // @@ -184,7 +181,7 @@ class RpcWorkerInterface: public WorkerInterface { ConnectResponse& tunnel, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 9111eb48f08..dc7f62173ef 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3968,7 +3968,7 @@ public: kj::AsyncIoStream& connection, kj::HttpService::ConnectResponse& tunnel, kj::HttpConnectSettings settings) override; - void prewarm(kj::StringPtr url) override; + kj::Promise prewarm(kj::StringPtr url) override; kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override; kj::Promise runAlarm(kj::Date scheduledTime, uint32_t retryCount) override; kj::Promise customEvent(kj::Own event) override; @@ -4189,8 +4189,8 @@ kj::Promise Worker::Isolate::SubrequestClient::connect(kj::StringPtr host, } // TODO(someday): Log other kinds of subrequests? -void Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) { - inner->prewarm(url); +kj::Promise Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) { + return inner->prewarm(url); } kj::Promise Worker::Isolate::SubrequestClient::runScheduled( kj::Date scheduledTime, kj::StringPtr cron) { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index b0f05243b57..189474ece61 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -550,7 +550,7 @@ private: co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(io_stream)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override {} kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -687,7 +687,7 @@ private: return parent.serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override {} kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -862,7 +862,7 @@ private: return serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings)); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override {} kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -1140,7 +1140,7 @@ private: kj::HttpConnectSettings settings) override { throwUnsupported(); } - void prewarm(kj::StringPtr url) override {} + kj::Promise prewarm(kj::StringPtr url) override {} kj::Promise runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override { throwUnsupported(); } @@ -1575,8 +1575,7 @@ public: kj::Own getActor( kj::String id, IoChannelFactory::SubrequestMetadata metadata) { - return newPromisedWorkerInterface( - service.waitUntilTasks, getActorThenStartRequest(kj::mv(id), kj::mv(metadata))); + return newPromisedWorkerInterface(getActorThenStartRequest(kj::mv(id), kj::mv(metadata))); } kj::Own getActorChannel(Worker::Actor::Id id) {