Skip to content

Commit

Permalink
Merge pull request #1608 from cloudflare/jsnell/more-meory-tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Feb 6, 2024
2 parents 18eea99 + 13de7bd commit ec3a570
Show file tree
Hide file tree
Showing 12 changed files with 645 additions and 1 deletion.
16 changes: 16 additions & 0 deletions src/workerd/api/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ class Blob: public jsg::Object {
JSG_METHOD(stream);
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
KJ_SWITCH_ONEOF(ownData) {
KJ_CASE_ONEOF(data, kj::Array<byte>) {
tracker.trackField("ownData", data);
}
KJ_CASE_ONEOF(data, jsg::Ref<Blob>) {
tracker.trackField("ownData", data);
}
}
tracker.trackField("type", type);
}

private:
kj::OneOf<kj::Array<byte>, jsg::Ref<Blob>> ownData;
kj::ArrayPtr<const byte> data;
Expand Down Expand Up @@ -106,6 +118,10 @@ class File: public Blob {
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("name", name);
}

private:
kj::String name;
double lastModified;
Expand Down
26 changes: 26 additions & 0 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ class ReadableStreamController {
// Used by sockets to signal that the ReadableStream shouldn't allow reads due to pending
// closure.
virtual void setPendingClosure() = 0;

virtual kj::StringPtr jsgGetMemoryName() const = 0;
virtual size_t jsgGetMemorySelfSize() const = 0;
virtual void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const = 0;
};

kj::Own<ReadableStreamController> newReadableStreamJsController();
Expand Down Expand Up @@ -624,6 +628,12 @@ class WritableStreamController {
}

static kj::Maybe<PendingAbort> dequeue(kj::Maybe<PendingAbort>& maybePendingAbort);

JSG_MEMORY_INFO(PendingAbort) {
tracker.trackField("resolver", resolver);
tracker.trackField("promise", promise);
tracker.trackField("reason", reason);
}
};

virtual ~WritableStreamController() noexcept(false) {}
Expand Down Expand Up @@ -696,6 +706,11 @@ class WritableStreamController {
// Used by sockets to signal that the WritableStream shouldn't allow writes due to pending
// closure.
virtual void setPendingClosure() = 0;

// For menmory tracking
virtual kj::StringPtr jsgGetMemoryName() const = 0;
virtual size_t jsgGetMemorySelfSize() const = 0;
virtual void jsgGetMemoryInfo(jsg::MemoryTracker& info) const = 0;
};

kj::Own<WritableStreamController> newWritableStreamJsController();
Expand Down Expand Up @@ -748,6 +763,12 @@ class ReaderLocked {
canceler = kj::none;
}

JSG_MEMORY_INFO(ReaderLocked) {
tracker.trackField("closedFulfiller", closedFulfiller);
tracker.trackFieldWithSize("IoOwn<kj::Canceler>",
sizeof(IoOwn<kj::Canceler>));
}

private:
kj::Maybe<ReadableStreamController::Reader&> reader;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
Expand Down Expand Up @@ -800,6 +821,11 @@ class WriterLocked {
readyFulfiller = kj::none;
}

JSG_MEMORY_INFO(WriterLocked) {
tracker.trackField("closedFulfiller", closedFulfiller);
tracker.trackField("readyFulfiller", readyFulfiller);
}

private:
kj::Maybe<WritableStreamController::Writer&> writer;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
Expand Down
78 changes: 78 additions & 0 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2287,4 +2287,82 @@ kj::Own<WritableStreamController> newWritableStreamInternalController(
kj::mv(maybeClosureWaitable));
}

kj::StringPtr WritableStreamInternalController::jsgGetMemoryName() const {
return "WritableStreamInternalController"_kjc;
}

size_t WritableStreamInternalController::jsgGetMemorySelfSize() const {
return sizeof(WritableStreamInternalController);
}
void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
tracker.trackField("error", errored);
}
KJ_CASE_ONEOF(_, Writable) {
// Ideally we'd be able to track the size of any pending writes held in the sink's
// queue but since it is behind an IoOwn and we won't be holding the IoContext here,
// we can't.
tracker.trackFieldWithSize("IoOwn<WritableStreamSink>",
sizeof(IoOwn<WritableStreamSink>));
}
}
KJ_IF_SOME(writerLocked, writeState.tryGet<WriterLocked>()) {
tracker.trackField("writerLocked", writerLocked);
}
tracker.trackField("pendingAbort", maybePendingAbort);
tracker.trackField("maybeClosureWaitable", maybeClosureWaitable);

for (auto& event : queue) {
tracker.trackField("event", event);
}
}

kj::StringPtr ReadableStreamInternalController::PipeLocked::jsgGetMemoryName() const {
return "ReadableStreamInternalController::PipeLocked"_kjc;
}
size_t ReadableStreamInternalController::PipeLocked::jsgGetMemorySelfSize() const {
return sizeof(PipeLocked);
}

void ReadableStreamInternalController::PipeLocked::jsgGetMemoryInfo(
jsg::MemoryTracker& tracker) const {
tracker.trackField("ref", ref);
}

kj::StringPtr ReadableStreamInternalController::jsgGetMemoryName() const {
return "ReadableStreamInternalController"_kjc;
}

size_t ReadableStreamInternalController::jsgGetMemorySelfSize() const {
return sizeof(ReadableStreamInternalController);
}

void ReadableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {}
KJ_CASE_ONEOF(error, StreamStates::Errored) {
tracker.trackField("error", error);
}
KJ_CASE_ONEOF(readable, Readable) {
// Ideally we'd be able to track the size of any pending reads held in the source's
// queue but since it is behind an IoOwn and we won't be holding the IoContext here,
// we can't.
tracker.trackFieldWithSize("IoOwn<ReadableStreamSource>",
sizeof(IoOwn<ReadableStreamSource>));
}
}
KJ_SWITCH_ONEOF(readState) {
KJ_CASE_ONEOF(unlocked, Unlocked) {}
KJ_CASE_ONEOF(locked, Locked) {}
KJ_CASE_ONEOF(pipeLocked, PipeLocked) {
tracker.trackField("pipeLocked", pipeLocked);
}
KJ_CASE_ONEOF(readerLocked, ReaderLocked) {
tracker.trackField("readerLocked", readerLocked);
}
}
}

} // namespace workerd::api
43 changes: 43 additions & 0 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ class ReadableStreamInternalController: public ReadableStreamController {
isPendingClosure = true;
}

kj::StringPtr jsgGetMemoryName() const override;
size_t jsgGetMemorySelfSize() const override;
void jsgGetMemoryInfo(jsg::MemoryTracker& info) const override;

private:
void doCancel(jsg::Lock& js, jsg::Optional<v8::Local<v8::Value>> reason);
void doClose(jsg::Lock& js);
Expand Down Expand Up @@ -136,6 +140,10 @@ class ReadableStreamInternalController: public ReadableStreamController {

void visitForGc(jsg::GcVisitor& visitor) { visitor.visit(ref); }

kj::StringPtr jsgGetMemoryName() const;
size_t jsgGetMemorySelfSize() const;
void jsgGetMemoryInfo(jsg::MemoryTracker& info) const;

private:
ReadableStreamInternalController& inner;
jsg::Ref<WritableStream> ref;
Expand Down Expand Up @@ -224,6 +232,10 @@ class WritableStreamInternalController: public WritableStreamController {
void setPendingClosure() override {
isPendingClosure = true;
}

kj::StringPtr jsgGetMemoryName() const override;
size_t jsgGetMemorySelfSize() const override;
void jsgGetMemoryInfo(jsg::MemoryTracker& info) const override;
private:

struct AbortOptions {
Expand Down Expand Up @@ -283,12 +295,25 @@ class WritableStreamInternalController: public WritableStreamController {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
std::shared_ptr<v8::BackingStore> ownBytes;
kj::ArrayPtr<const kj::byte> bytes;

JSG_MEMORY_INFO(Write) {
tracker.trackField("resolver", promise);
if (ownBytes != nullptr) {
tracker.trackFieldWithSize("backing", ownBytes->ByteLength());
}
}
};
struct Close {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
JSG_MEMORY_INFO(Close) {
tracker.trackField("promise", promise);
}
};
struct Flush {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
JSG_MEMORY_INFO(Flush) {
tracker.trackField("promise", promise);
}
};
struct Pipe {
WritableStreamInternalController& parent;
Expand All @@ -302,10 +327,28 @@ class WritableStreamInternalController: public WritableStreamController {
bool checkSignal(jsg::Lock& js);
jsg::Promise<void> pipeLoop(jsg::Lock& js);
jsg::Promise<void> write(v8::Local<v8::Value> value);

JSG_MEMORY_INFO(Pipe) {
tracker.trackField("resolver", promise);
tracker.trackField("signal", maybeSignal);
}
};
struct WriteEvent {
kj::Maybe<IoOwn<kj::Promise<void>>> outputLock; // must wait for this before actually writing
kj::OneOf<Write, Pipe, Close, Flush> event;

JSG_MEMORY_INFO(WriteEvent) {
if (outputLock != kj::none) {
tracker.trackFieldWithSize("outputLock",
sizeof(IoOwn<kj::Promise<void>>));
}
KJ_SWITCH_ONEOF(event) {
KJ_CASE_ONEOF(w, Write) { tracker.trackField("inner", w); }
KJ_CASE_ONEOF(p, Pipe) { tracker.trackField("inner", p); }
KJ_CASE_ONEOF(c, Close) { tracker.trackField("inner", c); }
KJ_CASE_ONEOF(f, Flush) { tracker.trackField("inner", f); }
}
}
};

std::deque<WriteEvent> queue;
Expand Down
Loading

0 comments on commit ec3a570

Please sign in to comment.