Skip to content

Commit

Permalink
Use WeakRef for tracking locked streams readers and writers
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Feb 1, 2024
1 parent 8ed21a8 commit 5e78eaa
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 170 deletions.
80 changes: 55 additions & 25 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,22 @@ class ReadableStreamController {
// passing along the closed promise that will be used to communicate state to the
// user code.
//
// The Reader will hold a reference to the controller that will be cleared when the reader
// is released or destroyed. The controller is guaranteed to either outlive or detach the
// reader so the ReadableStreamController& reference should remain valid.
// The Reader holds a strong reference to the controller. The Controller will hold a weak
// reference to the reader. It is ok for the reader itself to be freed/garbage collected
// while still being attached to the controller, but not the other way around.
virtual void attach(
ReadableStreamController& controller,
jsg::Promise<void> closedPromise) = 0;

// When a Reader lock is released, the controller will signal to the reader that it has been
// detached.
virtual void detach() = 0;

virtual kj::Own<WeakRef<Reader>> addWeakRef() = 0;

private:
static kj::Badge<Reader> getBadge() { return kj::Badge<Reader>(); }
friend class ReaderImpl;
};

struct ByobOptions {
Expand Down Expand Up @@ -479,12 +485,12 @@ class ReadableStreamController {

// Locks this controller to the given reader, returning true if the lock was successful, or false
// if the controller was already locked.
virtual bool lockReader(jsg::Lock& js, Reader& reader) = 0;
virtual bool lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) = 0;

// Removes the lock and releases the reader from this controller.
// maybeJs will be nullptr when the isolate lock is not available.
// If maybeJs is set, the reader's closed promise will be resolved.
virtual void releaseReader(Reader& reader, kj::Maybe<jsg::Lock&> maybeJs) = 0;
virtual void releaseReader(jsg::Lock& js, Reader& reader) = 0;

virtual kj::Maybe<PipeController&> tryPipeLock(jsg::Ref<WritableStream> destination) = 0;

Expand Down Expand Up @@ -574,8 +580,9 @@ class WritableStreamController {
// passing along the closed and ready promises that will be used to communicate state to the
// user code.
//
// The controller is guaranteed to either outlive the Writer or will detach the Writer so the
// WritableStreamController& reference should always remain valid.
// The Writer holds a strong reference to the controller. The Controller will hold a weak
// reference to the writer. It is ok for the writer itself to be freed/garbage collected
// while still being attached to the controller, but not the other way around.
virtual void attach(
WritableStreamController& controller,
jsg::Promise<void> closedPromise,
Expand All @@ -588,6 +595,12 @@ class WritableStreamController {
// The ready promise can be replaced whenever backpressure is signaled by the underlying
// controller.
virtual void replaceReadyPromise(jsg::Promise<void> readyPromise) = 0;

virtual kj::Own<WeakRef<Writer>> addWeakRef() = 0;

private:
static kj::Badge<Writer> getBadge() { return kj::Badge<Writer>(); }
friend class WritableStreamDefaultWriter;
};

struct PendingAbort {
Expand Down Expand Up @@ -672,12 +685,10 @@ class WritableStreamController {

// Locks this controller to the given writer, returning true if the lock was successful, or false
// if the controller was already locked.
virtual bool lockWriter(jsg::Lock& js, Writer& writer) = 0;
virtual bool lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>>) = 0;

// Removes the lock and releases the writer from this controller.
// maybeJs will be nullptr when the isolate lock is not available.
// If maybeJs is set, the writer's closed and ready promises will be resolved.
virtual void releaseWriter(Writer& writer, kj::Maybe<jsg::Lock&> maybeJs) = 0;
virtual void releaseWriter(jsg::Lock& js, Writer& writer) = 0;

virtual kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) = 0;

Expand Down Expand Up @@ -710,28 +721,37 @@ struct Locked {};

// When a reader is locked to a ReadableStream, a ReaderLock instance
// is used internally to represent the locked state in the ReadableStreamController.
// ReaderLocked maintains a weak referene to the actual Reader instance. It's ok
// for the Reader to be garbage collected while the ReadableStream is still alive but
// not vis versa. The Reader holds a strong reference to the ReadableStream only while
// it is attached.
class ReaderLocked {
public:
ReaderLocked(
ReadableStreamController::Reader& reader,
kj::Own<WeakRef<ReadableStreamController::Reader>> reader,
jsg::Promise<void>::Resolver closedFulfiller,
kj::Maybe<IoOwn<kj::Canceler>> canceler = kj::none)
: reader(reader),
: reader(kj::mv(reader)),
closedFulfiller(kj::mv(closedFulfiller)),
canceler(kj::mv(canceler)) {}

ReaderLocked(ReaderLocked&&) = default;
~ReaderLocked() noexcept(false) {
KJ_IF_SOME(r, reader) { r.detach(); }
if (reader.get() != nullptr) {
reader->runIfAlive([](auto& r) {
r.detach();
});
}
}
KJ_DISALLOW_COPY(ReaderLocked);

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(closedFulfiller);
}

ReadableStreamController::Reader& getReader() {
return KJ_ASSERT_NONNULL(reader);
kj::Maybe<ReadableStreamController::Reader&> getReader() {
if (reader.get() == nullptr) return kj::none;
return reader->tryGet();
}

kj::Maybe<jsg::Promise<void>::Resolver>& getClosedFulfiller() {
Expand All @@ -743,34 +763,43 @@ class ReaderLocked {
}

private:
kj::Maybe<ReadableStreamController::Reader&> reader;
kj::Own<WeakRef<ReadableStreamController::Reader>> reader;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
kj::Maybe<IoOwn<kj::Canceler>> canceler;
};

// When a writer is locked to a WritableStream, a WriterLock instance
// is used internally to represent the locked state in the WritableStreamController.
// WriterLocked maintains a weak reference to the actual Writer instance. It's ok
// for the Writer to be garbage collected while the WritableStream is still alive but
// not vis versa. The Writer holds a strong reference to the WritableStream only while
// it is attached.
class WriterLocked {
public:
WriterLocked(
WritableStreamController::Writer& writer,
kj::Own<WeakRef<WritableStreamController::Writer>> writer,
jsg::Promise<void>::Resolver closedFulfiller,
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller = kj::none)
: writer(writer),
: writer(kj::mv(writer)),
closedFulfiller(kj::mv(closedFulfiller)),
readyFulfiller(kj::mv(readyFulfiller)) {}

WriterLocked(WriterLocked&&) = default;
~WriterLocked() noexcept(false) {
KJ_IF_SOME(w, writer) { w.detach(); }
if (writer.get() != nullptr) {
writer->runIfAlive([&](auto& w) {
w.detach();
});
}
}

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(closedFulfiller, readyFulfiller);
}

WritableStreamController::Writer& getWriter() {
return KJ_ASSERT_NONNULL(writer);
kj::Maybe<WritableStreamController::Writer&> getWriter() {
if (writer.get() == nullptr) return kj::none;
return writer->tryGet();
}

kj::Maybe<jsg::Promise<void>::Resolver>& getClosedFulfiller() {
Expand All @@ -782,14 +811,15 @@ class WriterLocked {
}

void setReadyFulfiller(jsg::PromiseResolverPair<void>& pair) {
KJ_IF_SOME(w, writer) {
if (writer.get() == nullptr) return;
writer->runIfAlive([&](auto& w) {
readyFulfiller = kj::mv(pair.resolver);
w.replaceReadyPromise(kj::mv(pair.promise));
}
});
}

private:
kj::Maybe<WritableStreamController::Writer&> writer;
kj::Own<WeakRef<WritableStreamController::Writer>> writer;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller;
};
Expand Down
89 changes: 35 additions & 54 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,7 @@ kj::Maybe<kj::Promise<DeferredProxy<void>>> WritableStreamSink::tryPumpFrom(

// =======================================================================================

ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) {
KJ_IF_SOME(locked, readState.tryGet<ReaderLocked>()) {
auto lock = kj::mv(locked);
readState.init<Unlocked>();
}
}
ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) {}

jsg::Ref<ReadableStream> ReadableStreamInternalController::addRef() {
return KJ_ASSERT_NONNULL(owner).addRef();
Expand Down Expand Up @@ -761,16 +756,17 @@ kj::Maybe<kj::Own<ReadableStreamSource>> ReadableStreamInternalController::remov
KJ_UNREACHABLE;
}

bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) {
bool ReadableStreamInternalController::lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) {
if (isLockedToReader()) {
return false;
}

auto prp = js.newPromiseAndResolver<void>();
prp.promise.markAsHandled(js);

auto lock = ReaderLocked(reader, kj::mv(prp.resolver),
auto lock = ReaderLocked(kj::mv(reader), kj::mv(prp.resolver),
IoContext::current().addObject(kj::heap<kj::Canceler>()));
// Take care not to access reader directly after this point. Use the lock.

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
Expand All @@ -785,42 +781,30 @@ bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader)
}

readState = kj::mv(lock);
reader.attach(*this, kj::mv(prp.promise));

auto& inner = KJ_ASSERT_NONNULL(readState.get<ReaderLocked>().getReader());
inner.attach(*this, kj::mv(prp.promise));
return true;
}

void ReadableStreamInternalController::releaseReader(
Reader& reader,
kj::Maybe<jsg::Lock&> maybeJs) {
void ReadableStreamInternalController::releaseReader(jsg::Lock& js, Reader& reader) {
KJ_IF_SOME(locked, readState.tryGet<ReaderLocked>()) {
KJ_ASSERT(&locked.getReader() == &reader);
KJ_IF_SOME(js, maybeJs) {
JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError,
"Cannot call releaseLock() on a reader with outstanding read promises.");
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This ReadableStream reader has been released."_kj));
KJ_IF_SOME(r, locked.getReader()) {
KJ_ASSERT(&r == &reader);
}
auto lock = kj::mv(locked);

// When maybeJs is nullptr, that means releaseReader was called when the reader is
// being deconstructed and not as the result of explicitly calling releaseLock. In
// that case, we don't want to change the lock state itself because we do not have
// an isolate lock. Moving the lock above will free the lock state while keeping the
// ReadableStream marked as locked.
if (maybeJs != kj::none) {
readState.template init<Unlocked>();
}
}
}
JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError,
"Cannot call releaseLock() on a reader with outstanding read promises.");
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This ReadableStream reader has been released."_kj));

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
auto lock = kj::mv(locked);
writeState.init<Unlocked>();
readState.template init<Unlocked>();
}
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {}

jsg::Ref<WritableStream> WritableStreamInternalController::addRef() {
return KJ_ASSERT_NONNULL(owner).addRef();
}
Expand Down Expand Up @@ -1246,7 +1230,7 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_UNREACHABLE;
}

bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) {
bool WritableStreamInternalController::lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>> writer) {
if (isLockedToWriter()) {
return false;
}
Expand All @@ -1257,7 +1241,8 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
auto readyPrp = js.newPromiseAndResolver<void>();
readyPrp.promise.markAsHandled(js);

auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
auto lock = WriterLocked(kj::mv(writer), kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
// Careful not to access writer directly after this point. Access is through the lock.

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
Expand All @@ -1274,30 +1259,26 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
}

writeState = kj::mv(lock);
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));

auto& inner = KJ_ASSERT_NONNULL(writeState.get<WriterLocked>().getWriter());
inner.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));

return true;
}

void WritableStreamInternalController::releaseWriter(
Writer& writer,
kj::Maybe<jsg::Lock&> maybeJs) {
void WritableStreamInternalController::releaseWriter(jsg::Lock& js, Writer& writer) {
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
KJ_ASSERT(&locked.getWriter() == &writer);
KJ_IF_SOME(js, maybeJs) {
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This WritableStream writer has been released."_kj));
KJ_IF_SOME(w, locked.getWriter()) {
// Just an extra verification.
KJ_ASSERT(&w == &writer);
}
auto lock = kj::mv(locked);

// When maybeJs is nullptr, that means releaseWriter was called when the writer is
// being deconstructed and not as the result of explicitly calling releaseLock and
// we do not have an isolate lock. In that case, we don't want to change the lock
// state itself. Moving the lock above will free the lock state while keeping the
// WritableStream marked as locked.
if (maybeJs != kj::none) {
writeState.template init<Unlocked>();
}
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This WritableStream writer has been released."_kj));

auto lock = kj::mv(locked);
writeState.template init<Unlocked>();
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ class ReadableStreamInternalController: public ReadableStreamController {

bool isLockedToReader() const override { return !readState.is<Unlocked>(); }

bool lockReader(jsg::Lock& js, Reader& reader) override;
bool lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) override;

void releaseReader(Reader& reader, kj::Maybe<jsg::Lock&> maybeJs) override;
// See the comment for releaseReader in common.h for details on the use of maybeJs
void releaseReader(jsg::Lock& js, Reader& reader) override;

kj::Maybe<PipeController&> tryPipeLock(jsg::Ref<WritableStream> destination) override;

Expand Down Expand Up @@ -201,9 +200,9 @@ class WritableStreamInternalController: public WritableStreamController {

bool isLockedToWriter() const override { return !writeState.is<Unlocked>(); }

bool lockWriter(jsg::Lock& js, Writer& writer) override;
bool lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>> writer) override;

void releaseWriter(Writer& writer, kj::Maybe<jsg::Lock&> maybeJs) override;
void releaseWriter(jsg::Lock& js, Writer& writer) override;
// See the comment for releaseWriter in common.h for details on the use of maybeJs

kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) override {
Expand Down
Loading

0 comments on commit 5e78eaa

Please sign in to comment.