diff --git a/src/workerd/api/blob.h b/src/workerd/api/blob.h index c3ad6b57dfc..44b858a5d6d 100644 --- a/src/workerd/api/blob.h +++ b/src/workerd/api/blob.h @@ -60,6 +60,18 @@ class Blob: public jsg::Object { JSG_METHOD(stream); } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(ownData) { + KJ_CASE_ONEOF(data, kj::Array) { + tracker.trackField("ownData", data); + } + KJ_CASE_ONEOF(data, jsg::Ref) { + tracker.trackField("ownData", data); + } + } + tracker.trackField("type", type); + } + private: kj::OneOf, jsg::Ref> ownData; kj::ArrayPtr data; @@ -106,6 +118,10 @@ class File: public Blob { } } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("name", name); + } + private: kj::String name; double lastModified; diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index 59e9014f4b9..edcb7907480 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -523,6 +523,10 @@ class ReadableStreamController { // Used by sockets to signal that the ReadableStream shouldn't allow reads due to pending // closure. virtual void setPendingClosure() = 0; + + virtual kj::StringPtr jsgGetMemoryName() const = 0; + virtual size_t jsgGetMemorySelfSize() const = 0; + virtual void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const = 0; }; kj::Own newReadableStreamJsController(); @@ -624,6 +628,12 @@ class WritableStreamController { } static kj::Maybe dequeue(kj::Maybe& maybePendingAbort); + + JSG_MEMORY_INFO(PendingAbort) { + tracker.trackField("resolver", resolver); + tracker.trackField("promise", promise); + tracker.trackField("reason", reason); + } }; virtual ~WritableStreamController() noexcept(false) {} @@ -696,6 +706,11 @@ class WritableStreamController { // Used by sockets to signal that the WritableStream shouldn't allow writes due to pending // closure. virtual void setPendingClosure() = 0; + + // For menmory tracking + virtual kj::StringPtr jsgGetMemoryName() const = 0; + virtual size_t jsgGetMemorySelfSize() const = 0; + virtual void jsgGetMemoryInfo(jsg::MemoryTracker& info) const = 0; }; kj::Own newWritableStreamJsController(); @@ -748,6 +763,12 @@ class ReaderLocked { canceler = kj::none; } + JSG_MEMORY_INFO(ReaderLocked) { + tracker.trackField("closedFulfiller", closedFulfiller); + tracker.trackFieldWithSize("IoOwn", + sizeof(IoOwn)); + } + private: kj::Maybe reader; kj::Maybe::Resolver> closedFulfiller; @@ -800,6 +821,11 @@ class WriterLocked { readyFulfiller = kj::none; } + JSG_MEMORY_INFO(WriterLocked) { + tracker.trackField("closedFulfiller", closedFulfiller); + tracker.trackField("readyFulfiller", readyFulfiller); + } + private: kj::Maybe writer; kj::Maybe::Resolver> closedFulfiller; diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 025c3eba130..a42074c902d 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -2287,4 +2287,82 @@ kj::Own newWritableStreamInternalController( kj::mv(maybeClosureWaitable)); } +kj::StringPtr WritableStreamInternalController::jsgGetMemoryName() const { + return "WritableStreamInternalController"_kjc; +} + +size_t WritableStreamInternalController::jsgGetMemorySelfSize() const { + return sizeof(WritableStreamInternalController); +} +void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + tracker.trackField("error", errored); + } + KJ_CASE_ONEOF(_, Writable) { + // Ideally we'd be able to track the size of any pending writes held in the sink's + // queue but since it is behind an IoOwn and we won't be holding the IoContext here, + // we can't. + tracker.trackFieldWithSize("IoOwn", + sizeof(IoOwn)); + } + } + KJ_IF_SOME(writerLocked, writeState.tryGet()) { + tracker.trackField("writerLocked", writerLocked); + } + tracker.trackField("pendingAbort", maybePendingAbort); + tracker.trackField("maybeClosureWaitable", maybeClosureWaitable); + + for (auto& event : queue) { + tracker.trackField("event", event); + } +} + +kj::StringPtr ReadableStreamInternalController::PipeLocked::jsgGetMemoryName() const { + return "ReadableStreamInternalController::PipeLocked"_kjc; +} +size_t ReadableStreamInternalController::PipeLocked::jsgGetMemorySelfSize() const { + return sizeof(PipeLocked); +} + +void ReadableStreamInternalController::PipeLocked::jsgGetMemoryInfo( + jsg::MemoryTracker& tracker) const { + tracker.trackField("ref", ref); +} + +kj::StringPtr ReadableStreamInternalController::jsgGetMemoryName() const { + return "ReadableStreamInternalController"_kjc; +} + +size_t ReadableStreamInternalController::jsgGetMemorySelfSize() const { + return sizeof(ReadableStreamInternalController); +} + +void ReadableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(readable, Readable) { + // Ideally we'd be able to track the size of any pending reads held in the source's + // queue but since it is behind an IoOwn and we won't be holding the IoContext here, + // we can't. + tracker.trackFieldWithSize("IoOwn", + sizeof(IoOwn)); + } + } + KJ_SWITCH_ONEOF(readState) { + KJ_CASE_ONEOF(unlocked, Unlocked) {} + KJ_CASE_ONEOF(locked, Locked) {} + KJ_CASE_ONEOF(pipeLocked, PipeLocked) { + tracker.trackField("pipeLocked", pipeLocked); + } + KJ_CASE_ONEOF(readerLocked, ReaderLocked) { + tracker.trackField("readerLocked", readerLocked); + } + } +} + } // namespace workerd::api diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 89bcab17821..fbbbe71bb24 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -105,6 +105,10 @@ class ReadableStreamInternalController: public ReadableStreamController { isPendingClosure = true; } + kj::StringPtr jsgGetMemoryName() const override; + size_t jsgGetMemorySelfSize() const override; + void jsgGetMemoryInfo(jsg::MemoryTracker& info) const override; + private: void doCancel(jsg::Lock& js, jsg::Optional> reason); void doClose(jsg::Lock& js); @@ -136,6 +140,10 @@ class ReadableStreamInternalController: public ReadableStreamController { void visitForGc(jsg::GcVisitor& visitor) { visitor.visit(ref); } + kj::StringPtr jsgGetMemoryName() const; + size_t jsgGetMemorySelfSize() const; + void jsgGetMemoryInfo(jsg::MemoryTracker& info) const; + private: ReadableStreamInternalController& inner; jsg::Ref ref; @@ -224,6 +232,10 @@ class WritableStreamInternalController: public WritableStreamController { void setPendingClosure() override { isPendingClosure = true; } + + kj::StringPtr jsgGetMemoryName() const override; + size_t jsgGetMemorySelfSize() const override; + void jsgGetMemoryInfo(jsg::MemoryTracker& info) const override; private: struct AbortOptions { @@ -283,12 +295,25 @@ class WritableStreamInternalController: public WritableStreamController { kj::Maybe::Resolver> promise; std::shared_ptr ownBytes; kj::ArrayPtr bytes; + + JSG_MEMORY_INFO(Write) { + tracker.trackField("resolver", promise); + if (ownBytes != nullptr) { + tracker.trackFieldWithSize("backing", ownBytes->ByteLength()); + } + } }; struct Close { kj::Maybe::Resolver> promise; + JSG_MEMORY_INFO(Close) { + tracker.trackField("promise", promise); + } }; struct Flush { kj::Maybe::Resolver> promise; + JSG_MEMORY_INFO(Flush) { + tracker.trackField("promise", promise); + } }; struct Pipe { WritableStreamInternalController& parent; @@ -302,10 +327,28 @@ class WritableStreamInternalController: public WritableStreamController { bool checkSignal(jsg::Lock& js); jsg::Promise pipeLoop(jsg::Lock& js); jsg::Promise write(v8::Local value); + + JSG_MEMORY_INFO(Pipe) { + tracker.trackField("resolver", promise); + tracker.trackField("signal", maybeSignal); + } }; struct WriteEvent { kj::Maybe>> outputLock; // must wait for this before actually writing kj::OneOf event; + + JSG_MEMORY_INFO(WriteEvent) { + if (outputLock != kj::none) { + tracker.trackFieldWithSize("outputLock", + sizeof(IoOwn>)); + } + KJ_SWITCH_ONEOF(event) { + KJ_CASE_ONEOF(w, Write) { tracker.trackField("inner", w); } + KJ_CASE_ONEOF(p, Pipe) { tracker.trackField("inner", p); } + KJ_CASE_ONEOF(c, Close) { tracker.trackField("inner", c); } + KJ_CASE_ONEOF(f, Flush) { tracker.trackField("inner", f); } + } + } }; std::deque queue; diff --git a/src/workerd/api/streams/queue.h b/src/workerd/api/streams/queue.h index d870bb97ccb..0c2f02ab3ab 100644 --- a/src/workerd/api/streams/queue.h +++ b/src/workerd/api/streams/queue.h @@ -263,6 +263,10 @@ class QueueImpl final { return kj::none; } + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Closed {}; using Errored = jsg::Value; @@ -492,6 +496,10 @@ class ConsumerImpl final { } } + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: // A sentinel used in the buffer to signal that close() has been called. struct Close {}; @@ -502,6 +510,10 @@ class ConsumerImpl final { std::deque> buffer; std::deque readRequests; size_t queueTotalSize = 0; + + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; }; QueueImpl& queue; @@ -583,7 +595,9 @@ class ValueQueue final { using ConsumerImpl = ConsumerImpl; using QueueImpl = QueueImpl; - struct State {}; + struct State { + JSG_MEMORY_INFO(ValueQueue::State) {} + }; struct ReadRequest { jsg::Promise::Resolver resolver; @@ -591,6 +605,10 @@ class ValueQueue final { void resolveAsDone(jsg::Lock& js); void resolve(jsg::Lock& js, jsg::Value value); void reject(jsg::Lock& js, jsg::Value& value); + + JSG_MEMORY_INFO(ValueQueue::ReadRequest) { + tracker.trackField("resolver", resolver); + } }; // A value queue entry consists of an arbitrary JavaScript value and a size that is @@ -608,6 +626,10 @@ class ValueQueue final { kj::Own clone(jsg::Lock& js); + JSG_MEMORY_INFO(ValueQueue::Entry) { + tracker.trackField("value", value); + } + private: jsg::Value value; size_t size; @@ -616,6 +638,10 @@ class ValueQueue final { struct QueueEntry { kj::Own entry; QueueEntry clone(jsg::Lock& js); + + JSG_MEMORY_INFO(ValueQueue::QueueEntry) { + tracker.trackField("entry", entry); + } }; class Consumer final { @@ -650,6 +676,10 @@ class ValueQueue final { void visitForGc(jsg::GcVisitor& visitor); + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: ConsumerImpl impl; @@ -678,6 +708,10 @@ class ValueQueue final { void visitForGc(jsg::GcVisitor& visitor); + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: QueueImpl impl; @@ -722,6 +756,10 @@ class ByteQueue final { size_t filled = 0; size_t atLeast = 1; Type type = Type::DEFAULT; + + JSG_MEMORY_INFO(ByteQueue::ReadRequest::PullInto) { + tracker.trackField("store", store); + } } pullInto; ReadRequest(jsg::Promise::Resolver resolver, @@ -734,6 +772,11 @@ class ByteQueue final { void reject(jsg::Lock& js, jsg::Value& value); kj::Own makeByobReadRequest(ConsumerImpl& consumer, QueueImpl& queue); + + JSG_MEMORY_INFO(ByteQueue::ReadRequest) { + tracker.trackField("resolver", resolver); + tracker.trackField("pullInto", pullInto); + } }; // The ByobRequest is essentially a handle to the ByteQueue::ReadRequest that can be given to a @@ -774,6 +817,8 @@ class ByteQueue final { v8::Local getView(jsg::Lock& js); + JSG_MEMORY_INFO(ByteQueue::ByobRequest) {} + private: kj::Maybe request; ConsumerImpl& consumer; @@ -782,6 +827,12 @@ class ByteQueue final { struct State { std::deque> pendingByobReadRequests; + + JSG_MEMORY_INFO(ByteQueue::State) { + for (auto& request : pendingByobReadRequests) { + tracker.trackField("pendingByobReadRequest", request); + } + } }; // A byte queue entry consists of a jsg::BackingStore containing a non-zero-length @@ -798,6 +849,10 @@ class ByteQueue final { kj::Own clone(jsg::Lock& js); + JSG_MEMORY_INFO(ByteQueue::Entry) { + tracker.trackField("store", store); + } + private: jsg::BackingStore store; }; @@ -807,6 +862,10 @@ class ByteQueue final { size_t offset; QueueEntry clone(jsg::Lock& js); + + JSG_MEMORY_INFO(ByteQueue::QueueEntry) { + tracker.trackField("entry", entry); + } }; class Consumer { @@ -840,6 +899,10 @@ class ByteQueue final { void visitForGc(jsg::GcVisitor& visitor); + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: ConsumerImpl impl; }; @@ -876,6 +939,10 @@ class ByteQueue final { void visitForGc(jsg::GcVisitor& visitor); + inline kj::StringPtr jsgGetMemoryName() const; + inline size_t jsgGetMemorySelfSize() const; + inline void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: QueueImpl impl; @@ -897,4 +964,124 @@ class ByteQueue final { friend class Consumer; }; +template +kj::StringPtr QueueImpl::jsgGetMemoryName() const { + return "QueueImpl"_kjc; +} + +template +size_t QueueImpl::jsgGetMemorySelfSize() const { + return sizeof(QueueImpl); +} + +template +void QueueImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(ready, Ready) {} + KJ_CASE_ONEOF(closed, Closed) {} + KJ_CASE_ONEOF(errored, Errored) { + tracker.trackField("error", errored); + } + } +} + +template +kj::StringPtr ConsumerImpl::jsgGetMemoryName() const { + return "ConsumerImpl"_kjc; +} + +template +size_t ConsumerImpl::jsgGetMemorySelfSize() const { + return sizeof(ConsumerImpl); +} + +template +void ConsumerImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(close, Closed) {} + KJ_CASE_ONEOF(error, Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(ready, Ready) { + tracker.trackField("inner", ready); + } + } +} + +template +kj::StringPtr ConsumerImpl::Ready::jsgGetMemoryName() const { + return "ConsumerImpl::Ready"_kjc; +} + +template +size_t ConsumerImpl::Ready::jsgGetMemorySelfSize() const { + return sizeof(Ready); +} + +template +void ConsumerImpl::Ready::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + for (auto& entry : buffer) { + KJ_SWITCH_ONEOF(entry) { + KJ_CASE_ONEOF(c, Close) { + tracker.trackFieldWithSize("pendingClose", sizeof(Close)); + } + KJ_CASE_ONEOF(e, QueueEntry) { + tracker.trackField("entry", e); + } + } + } + + for (auto& request : readRequests) { + tracker.trackField("pendingRead", request); + } +} + +kj::StringPtr ValueQueue::Consumer::jsgGetMemoryName() const { + return "ValueQueue::Consumer"_kjc; +} + +size_t ValueQueue::Consumer::jsgGetMemorySelfSize() const { + return sizeof(ValueQueue::Consumer); +} + +void ValueQueue::Consumer::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); +} + +kj::StringPtr ValueQueue::jsgGetMemoryName() const { + return "ValueQueue"_kjc; +} + +size_t ValueQueue::jsgGetMemorySelfSize() const { + return sizeof(ValueQueue); +} + +void ValueQueue::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); +} + +kj::StringPtr ByteQueue::Consumer::jsgGetMemoryName() const { + return "ByteQueue::Consumer"_kjc; +} + +size_t ByteQueue::Consumer::jsgGetMemorySelfSize() const { + return sizeof(ByteQueue::Consumer); +} + +void ByteQueue::Consumer::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); +} + +kj::StringPtr ByteQueue::jsgGetMemoryName() const { + return "ByteQueue"_kjc; +} + +size_t ByteQueue::jsgGetMemorySelfSize() const { + return sizeof(ByteQueue); +} + +void ByteQueue::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); +} + } // workerd::api diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 5df9ca5708c..5584f74311a 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -524,4 +524,20 @@ jsg::Optional ByteLengthQueuingStrategy::size( return kj::none; } +kj::StringPtr ReaderImpl::jsgGetMemoryName() const { return "ReaderImpl"_kjc; } + +size_t ReaderImpl::jsgGetMemorySelfSize() const { return sizeof(ReaderImpl); } + +void ReaderImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_IF_SOME(stream, state.tryGet()) { + tracker.trackField("stream", stream); + } + tracker.trackField("closedPromise", closedPromise); +} + +void ReadableStream::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("controller", controller); + tracker.trackField("eofResolverPair", eofResolverPair); +} + } // namespace workerd::api diff --git a/src/workerd/api/streams/readable.h b/src/workerd/api/streams/readable.h index 6f1c8a1d71b..42f9594e598 100644 --- a/src/workerd/api/streams/readable.h +++ b/src/workerd/api/streams/readable.h @@ -35,6 +35,10 @@ class ReaderImpl { void visitForGc(jsg::GcVisitor& visitor); + kj::StringPtr jsgGetMemoryName() const; + size_t jsgGetMemorySelfSize() const; + void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Initial {}; // While a Reader is attached to a ReadableStream, it holds a strong reference to the @@ -97,6 +101,10 @@ class ReadableStreamDefaultReader : public jsg::Object, inline bool isByteOriented() const override { return false; } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); + } + private: ReaderImpl impl; @@ -162,6 +170,10 @@ class ReadableStreamBYOBReader: public jsg::Object, inline bool isByteOriented() const override { return true; } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); + } + private: ReaderImpl impl; @@ -369,6 +381,9 @@ class ReadableStream: public jsg::Object { // Used by ReadableStreamInternalController to signal EOF being reached. Can be called even if // `onEof` wasn't called. void signalEof(jsg::Lock& js); + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: kj::Maybe ioContext; kj::Own controller; diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 59d02ecfc36..3c4d8a84979 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -61,6 +61,25 @@ public: void visitForGc(jsg::GcVisitor& visitor); + kj::StringPtr jsgGetMemoryName() const { + return "ReadableLockImpl"_kjc; + } + size_t jsgGetMemorySelfSize() const { + return sizeof(ReadableLockImpl); + } + void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(locked, Locked) {} + KJ_CASE_ONEOF(unlocked, Unlocked) {} + KJ_CASE_ONEOF(pipeLocked, PipeLocked) { + tracker.trackField("pipeLocked", pipeLocked); + } + KJ_CASE_ONEOF(readerLocked, ReaderLocked) { + tracker.trackField("readerLocked", readerLocked); + } + } + } + private: class PipeLocked: public PipeController { public: @@ -104,6 +123,10 @@ private: void visitForGc(jsg::GcVisitor& visitor) ; + JSG_MEMORY_INFO(PipeLocked) { + tracker.trackField("writableStreamRef", writableStreamRef); + } + private: Controller& inner; jsg::Ref writableStreamRef; @@ -137,6 +160,19 @@ public: PipeToOptions& options); void releasePipeLock(); + JSG_MEMORY_INFO(WritableLockImpl) { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(unlocked, Unlocked) {} + KJ_CASE_ONEOF(locked, Locked) {} + KJ_CASE_ONEOF(writerLocked, WriterLocked) { + tracker.trackField("writerLocked", writerLocked); + } + KJ_CASE_ONEOF(pipeLocked, PipeLocked) { + tracker.trackField("pipeLocked", pipeLocked); + } + } + } + private: struct PipeLocked { ReadableStreamController::PipeController& source; @@ -150,6 +186,11 @@ private: kj::Maybe> checkSignal( jsg::Lock& js, Controller& self); + + JSG_MEMORY_INFO(PipeLocked) { + tracker.trackField("readableStreamRef", readableStreamRef); + tracker.trackField("signal", maybeSignal); + } }; kj::OneOf state = Unlocked(); @@ -689,6 +730,10 @@ public: KJ_UNIMPLEMENTED("only implemented for WritableStreamInternalController"); } + kj::StringPtr jsgGetMemoryName() const override; + size_t jsgGetMemorySelfSize() const override; + void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const override; + private: bool hasPendingReadRequests(); @@ -827,6 +872,10 @@ public: KJ_UNIMPLEMENTED("only implemented for WritableStreamInternalController"); } + kj::StringPtr jsgGetMemoryName() const override; + size_t jsgGetMemorySelfSize() const override; + void jsgGetMemoryInfo(jsg::MemoryTracker& info) const override; + private: jsg::Promise pipeLoop(jsg::Lock& js); @@ -1609,6 +1658,11 @@ struct ReadableState { kj::Maybe> owner; kj::Own consumer; + JSG_MEMORY_INFO(ReadableState) { + tracker.trackField("controller", controller); + tracker.trackField("consumer", consumer); + } + ReadableState( jsg::Ref controller, auto owner, auto stateListener) : controller(kj::mv(controller)), @@ -1697,6 +1751,12 @@ struct ValueReadable final: public api::ValueQueue::ConsumerImpl::StateListener using State = ReadableState; kj::Maybe state; + JSG_MEMORY_INFO(ValueReadable) { + KJ_IF_SOME(s, state) { + tracker.trackField("state", s); + } + } + ValueReadable(jsg::Ref controller, auto owner) : state(State(kj::mv(controller), owner, this)) {} @@ -1795,6 +1855,12 @@ struct ByteReadable final: public api::ByteQueue::ConsumerImpl::StateListener { kj::Maybe state; int autoAllocateChunkSize; + JSG_MEMORY_INFO(ByteReadable) { + KJ_IF_SOME(s, state) { + tracker.trackField("state", s); + } + } + ByteReadable( jsg::Ref controller, auto owner, @@ -4074,4 +4140,148 @@ TransformStreamDefaultController::tryGetWritableController() { return kj::none; } +template +kj::StringPtr WritableImpl::jsgGetMemoryName() const { + return "WritableImpl"_kjc; +} + +template +size_t WritableImpl::jsgGetMemorySelfSize() const { + return sizeof(WritableImpl); +} + +template +void WritableImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("signal", signal); + + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(erroring, StreamStates::Erroring) { + tracker.trackField("erroring", erroring.reason); + } + KJ_CASE_ONEOF(writable, Writable) {} + } + + tracker.trackField("abortAlgorithm", algorithms.abort); + tracker.trackField("closeAlgorithm", algorithms.close); + tracker.trackField("writeAlgorithm", algorithms.write); + tracker.trackField("sizeAlgorithm", algorithms.size); + + for (auto& request : writeRequests) { + tracker.trackField("pendingWrite", request); + } + + tracker.trackField("inFlightWrite", inFlightWrite); + tracker.trackField("inFlightClose", inFlightClose); + tracker.trackField("closeRequest", closeRequest); + tracker.trackField("maybePendingAbort", maybePendingAbort); +} + +kj::StringPtr WritableStreamJsController::jsgGetMemoryName() const { + return "WritableStreamJsController"_kjc; +} + +size_t WritableStreamJsController::jsgGetMemorySelfSize() const { + return sizeof(WritableStreamJsController); +} + +void WritableStreamJsController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(controller, Controller) { + tracker.trackField("controller", controller); + } + } + tracker.trackField("lock", lock); + tracker.trackField("maybeAbortPromise", maybeAbortPromise); +} + +void WritableStreamDefaultController::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); +} + +kj::StringPtr ReadableStreamJsController::jsgGetMemoryName() const { + return "ReadableStreamJsController"_kjc; +} + +size_t ReadableStreamJsController::jsgGetMemorySelfSize() const { + return sizeof(ReadableStreamJsController); +} + +void ReadableStreamJsController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(readable, kj::Own) { + tracker.trackField("readable", readable); + } + KJ_CASE_ONEOF(readable, kj::Own) { + tracker.trackField("readable", readable); + } + } + + tracker.trackField("lock", lock); + + KJ_IF_SOME(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("pendingError", error); + } + } + } +} + +template +kj::StringPtr ReadableImpl::jsgGetMemoryName() const { + return "ReadableImpl"_kjc; +} + +template +size_t ReadableImpl::jsgGetMemorySelfSize() const { + return sizeof(ReadableImpl); +} + +template +void ReadableImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + tracker.trackField("error", error); + } + KJ_CASE_ONEOF(queue, Queue) { + tracker.trackField("queue", queue); + } + } + + tracker.trackField("startAlgorithm", algorithms.start); + tracker.trackField("pullAlgorithm", algorithms.pull); + tracker.trackField("cancelAlgorithm", algorithms.cancel); + tracker.trackField("sizeAlgorithm", algorithms.size); + tracker.trackField("pendingCancel", maybePendingCancel); +} + +void ReadableStreamBYOBRequest::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_IF_SOME(impl, maybeImpl) { + tracker.trackField("readRequest", impl.readRequest); + tracker.trackField("controller", impl.controller); + tracker.trackField("view", impl.view); + } +} + +void TransformStreamDefaultController::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("startPromise", startPromise); + tracker.trackField("maybeBackpressureChange", maybeBackpressureChange); + tracker.trackField("transformAlgorithm", algorithms.transform); + tracker.trackField("flushAlgorithm", algorithms.flush); +} + } // namespace workerd::api diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index b3d2ec0c6be..9cc67719131 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -190,6 +190,10 @@ class ReadableImpl { void visitForGc(jsg::GcVisitor& visitor); + kj::StringPtr jsgGetMemoryName() const; + size_t jsgGetMemorySelfSize() const; + void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Algorithms { kj::Maybe> start; @@ -233,6 +237,10 @@ class ReadableImpl { struct PendingCancel { kj::Maybe::Resolver> fulfiller; jsg::Promise promise; + JSG_MEMORY_INFO(PendingCancel) { + tracker.trackField("fulfiller", fulfiller); + tracker.trackField("promise", promise); + } }; kj::Maybe maybePendingCancel; @@ -256,6 +264,11 @@ class WritableImpl { void visitForGc(jsg::GcVisitor& visitor) { visitor.visit(resolver, value); } + + JSG_MEMORY_INFO(WriteRequest) { + tracker.trackField("resolver", resolver); + tracker.trackField("value", value); + } }; WritableImpl(kj::Own> owner); @@ -321,6 +334,10 @@ class WritableImpl { void visitForGc(jsg::GcVisitor& visitor); + kj::StringPtr jsgGetMemoryName() const; + size_t jsgGetMemorySelfSize() const; + void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Algorithms { @@ -418,6 +435,10 @@ class ReadableStreamDefaultController: public jsg::Object { kj::Own> getWeakRef(); + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); + } + private: kj::Maybe ioContext; ReadableImpl impl; @@ -473,6 +494,8 @@ class ReadableStreamBYOBRequest: public jsg::Object { bool isPartiallyFulfilled(); + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Impl { kj::Own readRequest; @@ -534,6 +557,11 @@ class ReadableByteStreamController: public jsg::Object { JSG_METHOD(error); } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("impl", impl); + tracker.trackField("maybeByobRequest", maybeByobRequest); + } + private: kj::Maybe ioContext; ReadableImpl impl; @@ -583,6 +611,8 @@ class WritableStreamDefaultController: public jsg::Object { JSG_METHOD(error); } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: kj::Maybe ioContext; WritableImpl impl; @@ -649,6 +679,8 @@ class TransformStreamDefaultController: public jsg::Object { jsg::Promise pull(jsg::Lock& js); jsg::Promise cancel(jsg::Lock& js, v8::Local reason); + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Algorithms { kj::Maybe> transform; diff --git a/src/workerd/api/streams/transform.h b/src/workerd/api/streams/transform.h index 334d27fa5f5..7549271bd45 100644 --- a/src/workerd/api/streams/transform.h +++ b/src/workerd/api/streams/transform.h @@ -63,6 +63,11 @@ class TransformStream: public jsg::Object { } } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("readable", readable); + tracker.trackField("writable", writable); + } + private: jsg::Ref readable; jsg::Ref writable; diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 1eeab83013d..3376b02aa92 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -280,4 +280,16 @@ jsg::Ref WritableStream::constructor( return kj::mv(stream); } +void WritableStreamDefaultWriter::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + KJ_IF_SOME(ref, state.tryGet()) { + tracker.trackField("attached", ref); + } + tracker.trackField("closedPromise", closedPromise); + tracker.trackField("readyPromise", readyPromise); +} + +void WritableStream::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("controller", controller); +} + } // namespace workerd::api diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index a5063eef17d..99b8a0c3919 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -75,6 +75,8 @@ class WritableStreamDefaultWriter: public jsg::Object, void replaceReadyPromise(jsg::Promise readyPromise) override; + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: struct Initial {}; // While a Writer is attached to a WritableStream, it holds a strong reference to the @@ -156,6 +158,8 @@ class WritableStream: public jsg::Object { }); } + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const; + private: kj::Maybe ioContext; kj::Own controller;