Skip to content

Commit

Permalink
Change WorkerInterface so prewarm returns a promise.
Browse files Browse the repository at this point in the history
  • Loading branch information
danlapid committed Oct 8, 2024
1 parent 8e7faed commit d1fd3d0
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 40 deletions.
5 changes: 3 additions & 2 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public:
kj::AsyncIoStream& connection,
ConnectResponse& response,
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<void> prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<bool> test() override;
Expand Down Expand Up @@ -446,7 +446,7 @@ kj::Promise<void> WorkerEntrypoint::connect(kj::StringPtr host,
JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported");
}

void WorkerEntrypoint::prewarm(kj::StringPtr url) {
kj::Promise<void> WorkerEntrypoint::prewarm(kj::StringPtr url) {
// Nothing to do, the worker is already loaded.
TRACE_EVENT("workerd", "WorkerEntrypoint::prewarm()", "url", url.cStr());
auto incomingRequest =
Expand All @@ -458,6 +458,7 @@ void WorkerEntrypoint::prewarm(kj::StringPtr url) {
// TODO(someday): Ideally, middleware workers would forward prewarm() to the next stage. At
// present we don't have a good way to decide what stage that is, especially given that we'll
// be switching to `next` being a binding in the future.
return kj::READY_NOW;
}

kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
Expand Down
37 changes: 14 additions & 23 deletions src/workerd/io/worker-interface.c++
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ namespace {
// interface the promise resolved to.
class PromisedWorkerInterface final: public kj::Refcounted, public WorkerInterface {
public:
PromisedWorkerInterface(
kj::TaskSet& waitUntilTasks, kj::Promise<kj::Own<WorkerInterface>> promise)
: waitUntilTasks(waitUntilTasks),
promise(promise.then([this](kj::Own<WorkerInterface> result) { worker = kj::mv(result); })
PromisedWorkerInterface(kj::Promise<kj::Own<WorkerInterface>> promise)
: promise(promise.then([this](kj::Own<WorkerInterface> result) { worker = kj::mv(result); })
.fork()) {}

kj::Promise<void> request(kj::HttpMethod method,
Expand Down Expand Up @@ -49,18 +47,12 @@ public:
}
}

void prewarm(kj::StringPtr url) override {
kj::Promise<void> prewarm(kj::StringPtr url) override {
KJ_IF_SOME(w, worker) {
w.get()->prewarm(url);
co_return co_await w.get()->prewarm(url);
} else {
static auto constexpr handlePrewarm =
[](kj::Promise<void> promise, kj::String url,
kj::Own<PromisedWorkerInterface> self) -> kj::Promise<void> {
co_await promise;
KJ_ASSERT_NONNULL(self->worker)->prewarm(url);
};

waitUntilTasks.add(handlePrewarm(promise.addBranch(), kj::str(url), kj::addRef(*this)));
co_return co_await KJ_ASSERT_NONNULL(worker)->prewarm(url);
}
}

Expand Down Expand Up @@ -92,15 +84,13 @@ public:
}

private:
kj::TaskSet& waitUntilTasks;
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<WorkerInterface>> worker;
};
} // namespace

kj::Own<WorkerInterface> newPromisedWorkerInterface(
kj::TaskSet& waitUntilTasks, kj::Promise<kj::Own<WorkerInterface>> promise) {
return kj::refcounted<PromisedWorkerInterface>(waitUntilTasks, kj::mv(promise));
kj::Own<WorkerInterface> newPromisedWorkerInterface(kj::Promise<kj::Own<WorkerInterface>> promise) {
return kj::refcounted<PromisedWorkerInterface>(kj::mv(promise));
}

kj::Own<kj::HttpClient> asHttpClient(kj::Own<WorkerInterface> workerInterface) {
Expand Down Expand Up @@ -238,7 +228,7 @@ public:
kj::AsyncIoStream& connection,
ConnectResponse& response,
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<void> prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;
Expand Down Expand Up @@ -273,8 +263,8 @@ RevocableWebSocketWorkerInterface::RevocableWebSocketWorkerInterface(
: worker(worker),
revokeProm(revokeProm.fork()) {}

void RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) {
worker.prewarm(url);
kj::Promise<void> RevocableWebSocketWorkerInterface::prewarm(kj::StringPtr url) {
return worker.prewarm(url);
}

kj::Promise<WorkerInterface::ScheduledResult> RevocableWebSocketWorkerInterface::runScheduled(
Expand Down Expand Up @@ -324,8 +314,9 @@ public:
kj::throwFatalException(kj::mv(exception));
}

void prewarm(kj::StringPtr url) override {
kj::Promise<void> prewarm(kj::StringPtr url) override {
// ignore
return kj::READY_NOW;
}

kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
Expand Down Expand Up @@ -381,10 +372,10 @@ kj::Promise<void> RpcWorkerInterface::connect(kj::StringPtr host,
return promise.attach(kj::mv(inner));
}

void RpcWorkerInterface::prewarm(kj::StringPtr url) {
kj::Promise<void> RpcWorkerInterface::prewarm(kj::StringPtr url) {
auto req = dispatcher.prewarmRequest(capnp::MessageSize{url.size() / sizeof(capnp::word) + 4, 0});
req.setUrl(url);
waitUntilTasks.add(req.send().ignoreResult());
return req.send().ignoreResult();
}

kj::Promise<WorkerInterface::ScheduledResult> RpcWorkerInterface::runScheduled(
Expand Down
9 changes: 3 additions & 6 deletions src/workerd/io/worker-interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WorkerInterface: public kj::HttpService {
// to be invoked.
//
// If prewarm() has to do anything asynchronous, it should use "waitUntil" tasks.
virtual void prewarm(kj::StringPtr url) = 0;
virtual kj::Promise<void> prewarm(kj::StringPtr url) = 0;

struct ScheduledResult {
bool retry = true;
Expand Down Expand Up @@ -147,10 +147,7 @@ class WorkerInterface: public kj::HttpService {

// Given a Promise for a WorkerInterface, return a WorkerInterface whose methods will first wait
// for the promise, then invoke the destination object.
kj::Own<WorkerInterface> newPromisedWorkerInterface(
kj::TaskSet& waitUntilTasks, kj::Promise<kj::Own<WorkerInterface>> promise);
// TODO(cleanup): `waitUntilTasks` is only needed to handle `prewarm` since they
// don't return promises. We should maybe change them to return promises?
kj::Own<WorkerInterface> newPromisedWorkerInterface(kj::Promise<kj::Own<WorkerInterface>> promise);

// Adapts WorkerInterface to HttpClient, including taking ownership.
//
Expand Down Expand Up @@ -184,7 +181,7 @@ class RpcWorkerInterface: public WorkerInterface {
ConnectResponse& tunnel,
kj::HttpConnectSettings settings) override;

void prewarm(kj::StringPtr url) override;
kj::Promise<void> prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3968,7 +3968,7 @@ public:
kj::AsyncIoStream& connection,
kj::HttpService::ConnectResponse& tunnel,
kj::HttpConnectSettings settings) override;
void prewarm(kj::StringPtr url) override;
kj::Promise<void> prewarm(kj::StringPtr url) override;
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override;
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override;
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override;
Expand Down Expand Up @@ -4189,8 +4189,8 @@ kj::Promise<void> Worker::Isolate::SubrequestClient::connect(kj::StringPtr host,
}

// TODO(someday): Log other kinds of subrequests?
void Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) {
inner->prewarm(url);
kj::Promise<void> Worker::Isolate::SubrequestClient::prewarm(kj::StringPtr url) {
return inner->prewarm(url);
}
kj::Promise<WorkerInterface::ScheduledResult> Worker::Isolate::SubrequestClient::runScheduled(
kj::Date scheduledTime, kj::StringPtr cron) {
Expand Down
11 changes: 5 additions & 6 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private:
co_await kj::joinPromisesFailFast(promises.finish()).attach(kj::mv(io_stream));
}

void prewarm(kj::StringPtr url) override {}
kj::Promise<void> prewarm(kj::StringPtr url) override {}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
Expand Down Expand Up @@ -687,7 +687,7 @@ private:
return parent.serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings));
}

void prewarm(kj::StringPtr url) override {}
kj::Promise<void> prewarm(kj::StringPtr url) override {}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
Expand Down Expand Up @@ -862,7 +862,7 @@ private:
return serviceAdapter->connect(host, headers, connection, tunnel, kj::mv(settings));
}

void prewarm(kj::StringPtr url) override {}
kj::Promise<void> prewarm(kj::StringPtr url) override {}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
Expand Down Expand Up @@ -1140,7 +1140,7 @@ private:
kj::HttpConnectSettings settings) override {
throwUnsupported();
}
void prewarm(kj::StringPtr url) override {}
kj::Promise<void> prewarm(kj::StringPtr url) override {}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
throwUnsupported();
}
Expand Down Expand Up @@ -1575,8 +1575,7 @@ public:

kj::Own<WorkerInterface> getActor(
kj::String id, IoChannelFactory::SubrequestMetadata metadata) {
return newPromisedWorkerInterface(
service.waitUntilTasks, getActorThenStartRequest(kj::mv(id), kj::mv(metadata)));
return newPromisedWorkerInterface(getActorThenStartRequest(kj::mv(id), kj::mv(metadata)));
}

kj::Own<IoChannelFactory::ActorChannel> getActorChannel(Worker::Actor::Id id) {
Expand Down

0 comments on commit d1fd3d0

Please sign in to comment.