Skip to content

Commit

Permalink
Use timer directly in memory cache lock record and write tag directly…
Browse files Browse the repository at this point in the history
… to span.
  • Loading branch information
mar-cf committed Sep 26, 2024
1 parent 9cf0483 commit 0aee910
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 56 deletions.
42 changes: 21 additions & 21 deletions src/workerd/api/memory-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ static bool hasExpired(const kj::Maybe<double>& expiration, bool allowOutsideIoC

SharedMemoryCache::SharedMemoryCache(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer)
: data(),
provider(provider),
id(kj::str(id)),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler) {}
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler),
timer(timer) {}

SharedMemoryCache::~SharedMemoryCache() noexcept(false) {
KJ_IF_SOME(p, provider) {
Expand Down Expand Up @@ -218,8 +220,9 @@ void SharedMemoryCache::removeIfExistsWhileLocked(
kj::Own<const SharedMemoryCache> SharedMemoryCache::create(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler) {
return kj::atomicRefcounted<const SharedMemoryCache>(provider, id, handler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler,
const kj::MonotonicClock& timer) {
return kj::atomicRefcounted<const SharedMemoryCache>(provider, id, handler, timer);
}

SharedMemoryCache::Use::Use(kj::Own<const SharedMemoryCache> cache, const Limits& limits)
Expand All @@ -240,25 +243,21 @@ SharedMemoryCache::Use::~Use() noexcept(false) {

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(
const kj::String& key, SpanBuilder& span) const {
kj::Locked<ThreadUnsafeData> data;
{
auto memoryCacheLockTiming =
IoContext::current().getMetrics().tryCreateMemoryCacheLockTiming(span);
auto memoryCacheLockRecord = MemoryCacheLockRecord(kj::mv(memoryCacheLockTiming));
data = cache->data.lockExclusive();
}
kj::Locked<ThreadUnsafeData> data = [&] {
auto memoryCacheLockRecord =
ScopedDurationTagger(span, "memory_cache_lock_wait_time_ns"_kjc, cache->timer);
return cache->data.lockExclusive();
}();
return cache->getWhileLocked(*data, key);
}

kj::OneOf<kj::Own<CacheValue>, kj::Promise<SharedMemoryCache::Use::GetWithFallbackOutcome>>
SharedMemoryCache::Use::getWithFallback(const kj::String& key, SpanBuilder& span) const {
kj::Locked<ThreadUnsafeData> data;
{
auto memoryCacheLockTiming =
IoContext::current().getMetrics().tryCreateMemoryCacheLockTiming(span);
auto memoryCacheLockRecord = MemoryCacheLockRecord(kj::mv(memoryCacheLockTiming));
data = cache->data.lockExclusive();
}
kj::Locked<ThreadUnsafeData> data = [&] {
auto memoryCacheLockRecord =
ScopedDurationTagger(span, "memory_cache_lock_wait_time_ns"_kjc, cache->timer);
return cache->data.lockExclusive();
}();
KJ_IF_SOME(existingValue, cache->getWhileLocked(*data, key)) {
return kj::mv(existingValue);
} else KJ_IF_SOME(existingInProgress, data->inProgress.find(key)) {
Expand Down Expand Up @@ -444,10 +443,11 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,

// ======================================================================================

MemoryCacheProvider::MemoryCacheProvider(
MemoryCacheProvider::MemoryCacheProvider(const kj::MonotonicClock& timer,
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)),
timer(timer) {}

MemoryCacheProvider::~MemoryCacheProvider() noexcept(false) {
// TODO(cleanup): Later, assuming progress is made on kj::Ptr<T>, we ought to be able
Expand All @@ -467,7 +467,7 @@ kj::Own<const SharedMemoryCache> MemoryCacheProvider::getInstance(
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
return const_cast<SharedMemoryCache::AdditionalResizeMemoryLimitHandler&>(handler);
});
return SharedMemoryCache::create(provider, id, handler);
return SharedMemoryCache::create(provider, id, handler, timer);
};

KJ_IF_SOME(cid, cacheId) {
Expand Down
16 changes: 12 additions & 4 deletions src/workerd/api/memory-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <kj/map.h>
#include <kj/mutex.h>
#include <kj/table.h>
#include <kj/timer.h>

#include <set>

Expand Down Expand Up @@ -151,7 +152,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {

SharedMemoryCache(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer);

~SharedMemoryCache() noexcept(false);

Expand All @@ -161,7 +163,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {

static kj::Own<const SharedMemoryCache> create(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer);

public:
// RAII class that attaches itself to a cache, suggests cache limits to the
Expand Down Expand Up @@ -455,6 +458,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {
// Same as above, the MemoryCacheProvider owns the actual handler here. Since that is guaranteed
// to outlive this SharedMemoryCache instance, so is the handler.
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler;

const kj::MonotonicClock& timer;
};

// JavaScript class that allows accessing an in-memory cache.
Expand Down Expand Up @@ -490,8 +495,9 @@ class MemoryCache: public jsg::Object {
// the in memory cache is being used.
class MemoryCacheProvider {
public:
MemoryCacheProvider(kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
MemoryCacheProvider(const kj::MonotonicClock& timer,
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheProvider);
~MemoryCacheProvider() noexcept(false);

Expand All @@ -509,6 +515,8 @@ class MemoryCacheProvider {
// to avoid the use of the bare pointer to SharedMemoryCache* here. When the SharedMemoryCache
// is destroyed, it will remove itself from this cache by calling removeInstance.
kj::MutexGuarded<kj::HashMap<kj::String, const SharedMemoryCache*>> caches;

const kj::MonotonicClock& timer;
};

// clang-format off
Expand Down
29 changes: 0 additions & 29 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <workerd/jsg/observer.h>
#include <workerd/util/sqlite.h>

#include <kj/common.h>
#include <kj/exception.h>
#include <kj/refcount.h>
#include <kj/string.h>
Expand All @@ -31,29 +30,6 @@ class WebSocketObserver: public kj::Refcounted {
virtual void receivedMessage(size_t bytes) {};
};

class MemoryCacheLockTiming {
public:
virtual void start() {}
virtual void stop() {}
virtual ~MemoryCacheLockTiming() noexcept(false) {}
};

class MemoryCacheLockRecord {
public:
explicit MemoryCacheLockRecord(
kj::Maybe<kj::Own<MemoryCacheLockTiming>> memoryCacheLockTimingParam)
: memoryCacheLockTiming(kj::mv(memoryCacheLockTimingParam)) {
KJ_IF_SOME(timing, memoryCacheLockTiming) timing.get()->start();
}
~MemoryCacheLockRecord() noexcept(false) {
KJ_IF_SOME(timing, memoryCacheLockTiming) timing.get()->stop();
}
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheLockRecord);

private:
kj::Maybe<kj::Own<MemoryCacheLockTiming>> memoryCacheLockTiming;
};

// Observes a specific request to a specific worker. Also observes outgoing subrequests.
//
// Observing anything is optional. Default implementations of all methods observe nothing.
Expand Down Expand Up @@ -137,11 +113,6 @@ class RequestObserver: public kj::Refcounted {
virtual uint64_t clockRead() {
return 0;
}

virtual kj::Maybe<kj::Own<MemoryCacheLockTiming>> tryCreateMemoryCacheLockTiming(
SpanBuilder& span) {
return kj::none;
};
};

class IsolateObserver: public kj::AtomicRefcounted, public jsg::IsolateObserver {
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -795,4 +795,19 @@ void WorkerTracer::setTrace(rpc::Trace::Reader reader) {
trace->mergeFrom(reader, pipelineLogLevel);
}

ScopedDurationTagger::ScopedDurationTagger(
SpanBuilder& span, kj::ConstString key, const kj::MonotonicClock& timer)
: span(span),
key(kj::mv(key)),
timer(timer),
startTime(timer.now()) {}

ScopedDurationTagger::~ScopedDurationTagger() noexcept(false) {
auto duration = timer.now() - startTime;
if (isPredictableModeForTest()) {
duration = 0 * kj::NANOSECONDS;
}
span.setTag(kj::mv(key), duration / kj::NANOSECONDS);
}

} // namespace workerd
17 changes: 17 additions & 0 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -704,4 +704,21 @@ struct TraceParentContext {
SpanParent limeParentSpan;
};

// RAII object that measures the time duration over its lifetime. It tags this duration onto a
// given request span using a specified tag name. Ideal for automatically tracking and logging
// execution times within a scoped block.
class ScopedDurationTagger {
public:
explicit ScopedDurationTagger(
SpanBuilder& span, kj::ConstString key, const kj::MonotonicClock& timer);
~ScopedDurationTagger() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(ScopedDurationTagger);

private:
SpanBuilder& span;
kj::ConstString key;
const kj::MonotonicClock& timer;
const kj::TimePoint startTime;
};

} // namespace workerd
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ Server::Server(kj::Filesystem& fs,
entropySource(entropySource),
reportConfigError(kj::mv(reportConfigError)),
consoleMode(consoleMode),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>(timer)),
tasks(*this) {}

Server::~Server() noexcept(false) {}
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ TestFixture::TestFixture(SetupParams&& params)
false),
isolateLimitEnforcer(kj::heap<MockIsolateLimitEnforcer>()),
errorReporter(kj::heap<MockErrorReporter>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>(*timer)),
api(kj::heap<server::WorkerdApi>(testV8System,
params.featureFlags.orDefault(CompatibilityFlags::Reader()),
*isolateLimitEnforcer,
Expand Down

0 comments on commit 0aee910

Please sign in to comment.