Skip to content

Commit

Permalink
Configurable local v8 heap limits
Browse files Browse the repository at this point in the history
This is intended to provide a mechanism for testing the memory heap
limits of a worker locally. It is intentionally more flexible than
the production limits in order to give more flexibility in local
testing. Heap snapshots will be generated whenever the limit is
reached if the heapSnapshotNearHeapLimit is set to a positive,
non-zero value. No more than the number of snapshots specified will
be generated.

```
const helloWorld :Workerd.Worker = (
  modules = [
    (name = "worker", esModule = embed "worker.js")
  ],
  compatibilityDate = "2023-02-28",
  limits = (
    # Emit a warning if the worker's heap size exceeds 128MB.
    heapSoftLimitMb = 128,

    # Generate a heap snapshot when the limit is reached.
    # Generate no more than this number of snapshots...
    heapSnapshotNearHeapLimit = 10,

    # Whenever the heap limit is reached, multiply the limit
    # by this factor and continue.
    heapLimitMultiplier = 2,

    # The number of times the heap limit can be exceeded before
    # the isolate is terminated.
    heapLimitExceedsMax = 1,
  )
);
```
  • Loading branch information
jasnell committed Feb 6, 2024
1 parent 6b63c70 commit 28ac0ba
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 45 deletions.
2 changes: 2 additions & 0 deletions src/workerd/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ wd_cc_library(
"server.c++",
"v8-platform-impl.c++",
"workerd-api.c++",
"isolate-limit-enforcer.c++",
],
hdrs = [
"server.h",
"v8-platform-impl.h",
"workerd-api.h",
"isolate-limit-enforcer.h",
],
defines = select({
"//src/workerd/io:set_enable_experimental_webgpu": ["WORKERD_EXPERIMENTAL_ENABLE_WEBGPU"],
Expand Down
239 changes: 239 additions & 0 deletions src/workerd/server/isolate-limit-enforcer.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include "isolate-limit-enforcer.h"
#include <workerd/io/actor-cache.h>
#include <workerd/jsg/memory.h>
#include <workerd/jsg/setup.h>
#include <kj/filesystem.h>
#include <fcntl.h>

namespace workerd {
namespace {

class NullIsolateLimitEnforcer final: public IsolateLimitEnforcer {
public:
v8::Isolate::CreateParams getCreateParams() override { return {}; }
void customizeIsolate(v8::Isolate* isolate) override {}
ActorCacheSharedLruOptions getActorCacheLruOptions() override {
// TODO(someday): Make this configurable?
return {
.softLimit = 16 * (1ull << 20), // 16 MiB
.hardLimit = 128 * (1ull << 20), // 128 MiB
.staleTimeout = 30 * kj::SECONDS,
.dirtyListByteLimit = 8 * (1ull << 20), // 8 MiB
.maxKeysPerRpc = 128,

// For now, we use `neverFlush` to implement in-memory-only actors.
// See WorkerService::getActor().
.neverFlush = true
};
}
kj::Own<void> enterStartupJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterDynamicImportJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterLoggingJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterInspectorJs(
jsg::Lock& loc, kj::Maybe<kj::Exception>& error) const override {
return {};
}
void completedRequest(kj::StringPtr id) const override {}
bool exitJs(jsg::Lock& lock) const override { return false; }
void reportMetrics(IsolateObserver& isolateMetrics) const override {}
kj::Maybe<size_t> checkPbkdfIterations(jsg::Lock& lock, size_t iterations) const override {
// No limit on the number of iterations in workerd
return kj::none;
}
};

class LimitedArrayBufferAllocator final: public v8::ArrayBuffer::Allocator {
public:
LimitedArrayBufferAllocator(size_t limit): limit(limit) {}
~LimitedArrayBufferAllocator() {}

void* Allocate(size_t length) override {
if (length > limit) return nullptr;
return calloc(length, 1);
}
void* AllocateUninitialized(size_t length) override {
if (length > limit) return nullptr;
return malloc(length);
}
void Free(void* data, size_t length) override {
free(data);
}

private:
size_t limit;
};

class ConfiguredIsolateLimitEnforcer final: public IsolateLimitEnforcer {
public:
ConfiguredIsolateLimitEnforcer(server::config::Worker::Limits::Reader limits)
: softHeapLimitMb(limits.getHeapSoftLimitMb()),
heapSnapshotNearHeapLimit(limits.getHeapSnapshotNearHeapLimit()),
heapLimitMultiplier(limits.getHeapLimitMultiplier()),
heapLimitExceedsMax(limits.getHeapLimitExceedsMax()) {}

v8::Isolate::CreateParams getCreateParams() override {
v8::Isolate::CreateParams params;
uint64_t softLimit = softHeapLimitMb * 1024 * 1024;
if (softLimit > 0) {
params.constraints.set_max_young_generation_size_in_bytes(
kj::min(softLimit, 2 * 1024 * 1024));
params.constraints.set_max_old_generation_size_in_bytes(softLimit);
params.array_buffer_allocator_shared =
std::make_shared<LimitedArrayBufferAllocator>(softLimit);
}
return params;
}

static size_t nearHeapLimit(void* data, size_t currentHeapLimit, size_t initialHeapLimit) {
auto& self = *static_cast<ConfiguredIsolateLimitEnforcer*>(data);

// We can hit this again when taking the heapsnapshot... just increase the limit
// and continue.
if (self.inNearLimitCallback) return currentHeapLimit * self.heapLimitMultiplier;
self.inNearLimitCallback = true;
KJ_DEFER(self.inNearLimitCallback = false);

if (self.exceededCounter >= self.heapLimitExceedsMax) {
KJ_LOG(ERROR, "Exceeded the configured hard heap limit.",
currentHeapLimit,
self.exceededCounter);
self.maybeGenerateHeapshot();
self.v8Isolate->TerminateExecution();
} else {
size_t newLimit = currentHeapLimit * self.heapLimitMultiplier;
KJ_LOG(WARNING, "Exceeded the configured soft heap limit. Setting new limit",
currentHeapLimit,
newLimit,
self.exceededCounter++);
self.maybeGenerateHeapshot();
}
return currentHeapLimit * self.heapLimitMultiplier;
}

void customizeIsolate(v8::Isolate* isolate) override {
KJ_REQUIRE(v8Isolate == nullptr, "one IsolateLimitEnforcer can only be used by one isolate");
v8Isolate = isolate;

isolate->AddNearHeapLimitCallback(&nearHeapLimit, this);

lastMemoryNotification.timestamp.store(
(kj::systemPreciseMonotonicClock().now() - kj::origin<kj::TimePoint>()) / kj::NANOSECONDS,
std::memory_order_relaxed);

}

ActorCacheSharedLruOptions getActorCacheLruOptions() override {
// TODO(someday): Make this configurable?
return {
.softLimit = 16 * (1ull << 20), // 16 MiB
.hardLimit = 128 * (1ull << 20), // 128 MiB
.staleTimeout = 30 * kj::SECONDS,
.dirtyListByteLimit = 8 * (1ull << 20), // 8 MiB
.maxKeysPerRpc = 128,

// For now, we use `neverFlush` to implement in-memory-only actors.
// See WorkerService::getActor().
.neverFlush = true
};
}
kj::Own<void> enterStartupJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterDynamicImportJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterLoggingJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterInspectorJs(
jsg::Lock& loc, kj::Maybe<kj::Exception>& error) const override {
return {};
}
void completedRequest(kj::StringPtr id) const override {}
bool exitJs(jsg::Lock& lock) const override { return false; }
void reportMetrics(IsolateObserver& isolateMetrics) const override {}
kj::Maybe<size_t> checkPbkdfIterations(jsg::Lock& lock, size_t iterations) const override {
// No limit on the number of iterations in workerd
return kj::none;
}

void maybeGenerateHeapshot() {
if (heapSnapshotCounter >= heapSnapshotNearHeapLimit || v8Isolate == nullptr) return;

static jsg::HeapSnapshotActivity activity([](auto, auto) {
return true;
});
static jsg::HeapSnapshotDeleter deleter;

auto snapshot = kj::Own<const v8::HeapSnapshot>(
v8Isolate->GetHeapProfiler()->TakeHeapSnapshot(&activity, nullptr, true, true),
deleter);

jsg::IsolateBase& base = jsg::IsolateBase::from(v8Isolate);
kj::String filename = kj::str("heapshot-", base.getUuid(), "-",
heapSnapshotCounter++, ".heapsnapshot");

KJ_LOG(WARNING, kj::str("Generating heap snapshot: ", filename));

auto fd = open(filename.cStr(), O_CREAT | O_WRONLY | O_TRUNC, 0644);
KJ_REQUIRE(fd >= 0, "Unable to open heap snapshot file for writing");
kj::AutoCloseFd autoFd(fd);
kj::FdOutputStream out(autoFd.get());

jsg::HeapSnapshotWriter writer([&](kj::Maybe<kj::ArrayPtr<char>> maybeChunk) {
KJ_IF_SOME(chunk, maybeChunk) {
out.write(chunk.begin(), chunk.size());
} else {
out.write(nullptr, 0);
}
return true;
});

snapshot->Serialize(&writer);
}

private:
uint64_t softHeapLimitMb = 0;
uint32_t heapSnapshotNearHeapLimit = 0;
uint32_t heapLimitMultiplier = 2;
uint32_t heapLimitExceedsMax = 3;
v8::Isolate* v8Isolate = nullptr;

// Indicates that we've hit the soft limit. When this happens we will double
// avaliable limit and if we hit it again, we'll terminate.
uint32_t exceededCounter = 0;
uint32_t heapSnapshotCounter = 0;
bool inNearLimitCallback = false;

struct MemoryNotificationMetadataImpl {
mutable std::atomic<uint64_t> timestamp{0};
mutable std::atomic<size_t> memoryUsage{0};
mutable std::atomic<uint> lockCount{0};
};
MemoryNotificationMetadataImpl lastMemoryNotification;
};

} // namespace

kj::Own<IsolateLimitEnforcer> newNullIsolateLimitEnforcer() {
return kj::heap<NullIsolateLimitEnforcer>();
}

kj::Own<workerd::IsolateLimitEnforcer> newConfiguredIsolateLimitEnforcer(
server::config::Worker::Limits::Reader configuredLimits) {
return kj::heap<ConfiguredIsolateLimitEnforcer>(configuredLimits);
}

} // namespace workerd
14 changes: 14 additions & 0 deletions src/workerd/server/isolate-limit-enforcer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <workerd/io/limit-enforcer.h>
#include <workerd/server/workerd.capnp.h>

namespace workerd {

// IsolateLimitEnforcer that enforces no limits.
kj::Own<workerd::IsolateLimitEnforcer> newNullIsolateLimitEnforcer();

kj::Own<workerd::IsolateLimitEnforcer> newConfiguredIsolateLimitEnforcer(
server::config::Worker::Limits::Reader configuredLimits);

} // namespace workerd
51 changes: 6 additions & 45 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0

#include "server.h"
#include "isolate-limit-enforcer.h"
#include <kj/debug.h>
#include <kj/glob-filter.h>
#include <kj/compat/http.h>
Expand Down Expand Up @@ -2492,52 +2493,12 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name, config::Worker::
errorReporter.addError(kj::str("Worker must specify compatibilityDate."));
}

// IsolateLimitEnforcer that enforces no limits.
class NullIsolateLimitEnforcer final: public IsolateLimitEnforcer {
public:
v8::Isolate::CreateParams getCreateParams() override { return {}; }
void customizeIsolate(v8::Isolate* isolate) override {}
ActorCacheSharedLruOptions getActorCacheLruOptions() override {
// TODO(someday): Make this configurable?
return {
.softLimit = 16 * (1ull << 20), // 16 MiB
.hardLimit = 128 * (1ull << 20), // 128 MiB
.staleTimeout = 30 * kj::SECONDS,
.dirtyListByteLimit = 8 * (1ull << 20), // 8 MiB
.maxKeysPerRpc = 128,

// For now, we use `neverFlush` to implement in-memory-only actors.
// See WorkerService::getActor().
.neverFlush = true
};
}
kj::Own<void> enterStartupJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterDynamicImportJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterLoggingJs(
jsg::Lock& lock, kj::Maybe<kj::Exception>& error) const override {
return {};
}
kj::Own<void> enterInspectorJs(
jsg::Lock& loc, kj::Maybe<kj::Exception>& error) const override {
return {};
}
void completedRequest(kj::StringPtr id) const override {}
bool exitJs(jsg::Lock& lock) const override { return false; }
void reportMetrics(IsolateObserver& isolateMetrics) const override {}
kj::Maybe<size_t> checkPbkdfIterations(jsg::Lock& lock, size_t iterations) const override {
// No limit on the number of iterations in workerd
return kj::none;
}
};

auto observer = kj::atomicRefcounted<IsolateObserver>();
auto limitEnforcer = kj::heap<NullIsolateLimitEnforcer>();

auto limitEnforcer = conf.hasLimits() ?
newConfiguredIsolateLimitEnforcer(conf.getLimits()) :
newNullIsolateLimitEnforcer();

auto api = kj::heap<WorkerdApi>(globalContext->v8System,
featureFlags.asReader(),
*limitEnforcer,
Expand Down
25 changes: 25 additions & 0 deletions src/workerd/server/workerd.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,31 @@ struct Worker {

moduleFallback @13 :Text;

struct Limits {
# Limits optionally applied to the worker. If not specified, no limits are applied.

heapSoftLimitMb @0 : UInt64;
# When set, if the isolate heap size exceeds this limit, a warning will be emitted
# to the console. This limit is advisory only and does not prevent the worker from
# exceeding it. If heapSnapshotNearHeapLimit is set to a value > 0, and the maximum
# number of snapshots has not yet been taken, a snapshot will be taken when this
# limit is exceeded.

heapSnapshotNearHeapLimit @1 :UInt32;
# When specified with a value > 0, instructs the memory limit enforcer to generate a
# heapsnapshot when the v8 heap approaches the memory limits. The value indicates the
# maximum number of heapsnapshots to generate.

heapLimitMultiplier @2 :UInt32 = 2;
# When specified, specifies the multiplier to apply to the heapSoftLimitMb when hit.

heapLimitExceedsMax @3 :UInt32 = 1;
# The maximum number of times the near heap limit can be exceeded before the isolate is
# terminated.
}

limits @14 :Limits;

}

struct ExternalServer {
Expand Down

0 comments on commit 28ac0ba

Please sign in to comment.