Skip to content

Commit

Permalink
Add initial user tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
fhanau committed Sep 17, 2024
1 parent 3f108e8 commit 0b073a7
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 41 deletions.
22 changes: 11 additions & 11 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ public:
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& ioChannelFactory) {
if (span.isObserved()) {
span.setTag("actor_id"_kjc, kj::str(actorId));
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
if (tracing.span.isObserved()) {
tracing.span.setTag("actor_id"_kjc, kj::str(actorId));
}

// Lazily initialize actorChannel
if (actorChannel == kj::none) {
actorChannel = context.getColoLocalActorChannel(channelId, actorId, span);
actorChannel = context.getColoLocalActorChannel(channelId, actorId, tracing.span);
}

return KJ_REQUIRE_NONNULL(actorChannel)
->startRequest({.cfBlobJson = kj::mv(cfStr), .parentSpan = span});
->startRequest({.cfBlobJson = kj::mv(cfStr), .tracing = tracing});
},
{.inHouse = true,
.wrapMetrics = true,
Expand Down Expand Up @@ -68,19 +68,19 @@ public:
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& ioChannelFactory) {
if (span.isObserved()) {
span.setTag("actor_id"_kjc, id->toString());
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
if (tracing.span.isObserved()) {
tracing.span.setTag("actor_id"_kjc, id->toString());
}

// Lazily initialize actorChannel
if (actorChannel == kj::none) {
actorChannel = context.getGlobalActorChannel(
channelId, id->getInner(), kj::mv(locationHint), mode, enableReplicaRouting, span);
actorChannel = context.getGlobalActorChannel(channelId, id->getInner(),
kj::mv(locationHint), mode, enableReplicaRouting, tracing.span);
}

return KJ_REQUIRE_NONNULL(actorChannel)
->startRequest({.cfBlobJson = kj::mv(cfStr), .parentSpan = span});
->startRequest({.cfBlobJson = kj::mv(cfStr), .tracing = tracing});
},
{.inHouse = true,
.wrapMetrics = true,
Expand Down
10 changes: 9 additions & 1 deletion src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,15 @@ jsg::V8Ref<v8::Object> getTraceLogMessage(jsg::Lock& js, const Trace::Log& log)
}

kj::Array<jsg::Ref<TraceLog>> getTraceLogs(jsg::Lock& js, const Trace& trace) {
return KJ_MAP(x, trace.logs) -> jsg::Ref<TraceLog> { return jsg::alloc<TraceLog>(js, trace, x); };
auto builder = kj::heapArrayBuilder<jsg::Ref<TraceLog>>(trace.logs.size() + trace.spans.size());
for (auto i: kj::indices(trace.logs)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.logs[i]));
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(trace.spans)) {
builder.add(jsg::alloc<TraceLog>(js, trace, trace.spans[i]));
}
return builder.finish();
}

kj::Array<jsg::Ref<TraceDiagnosticChannelEvent>> getTraceDiagnosticChannelEvents(
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ wd_cc_library(
name = "trace",
srcs = ["trace.c++"],
hdrs = ["trace.h"],
implementation_deps = [
"//src/workerd/util:thread-scopes",
],
visibility = ["//visibility:public"],
deps = [
":worker-interface_capnp",
"//src/workerd/jsg:memory-tracker",
"//src/workerd/util",
"//src/workerd/util:own-util",
"@capnp-cpp//src/capnp:capnp-rpc",
"@capnp-cpp//src/capnp:capnpc",
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class IoChannelFactory {
kj::Maybe<kj::String> cfBlobJson;

// Specifies the parent span for the subrequest for tracing purposes.
SpanParent parentSpan = nullptr;
TraceParentContext tracing = TraceParentContext(nullptr, nullptr);

// Serialized JSON value to pass in ew_compat field of control header to FL. If this subrequest
// does not go directly to FL, this value is ignored. Flags marked with `$neededByFl` in
Expand Down
44 changes: 32 additions & 12 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -782,14 +782,19 @@ kj::Date IoContext::now() {
}

kj::Own<WorkerInterface> IoContext::getSubrequestNoChecks(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(TraceContext&, IoChannelFactory&)> func,
SubrequestOptions options) {
SpanBuilder span = nullptr;
SpanBuilder limeSpan = nullptr;

KJ_IF_SOME(n, options.operationName) {
span = makeTraceSpan(kj::mv(n));
// TODO(cleanup): Avoid cloning the string here if possible.
span = makeTraceSpan(kj::ConstString(kj::str(n)));
limeSpan = makeLimeTraceSpan(kj::ConstString(kj::mv(n)));
}

auto ret = func(span, getIoChannelFactory());
TraceContext tracing(kj::mv(span), kj::mv(limeSpan));
auto ret = func(tracing, getIoChannelFactory());

if (options.wrapMetrics) {
auto& metrics = getMetrics();
Expand All @@ -798,15 +803,18 @@ kj::Own<WorkerInterface> IoContext::getSubrequestNoChecks(
kj::mv(ret), getHeaderIds().contentEncoding, metrics);
}

if (span.isObserved()) {
ret = ret.attach(kj::mv(span));
if (tracing.span.isObserved()) {
ret = ret.attach(kj::mv(tracing.span));
}
if (tracing.limeSpan.isObserved()) {
ret = ret.attach(kj::mv(tracing.limeSpan));
}

return kj::mv(ret);
}

kj::Own<WorkerInterface> IoContext::getSubrequest(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(TraceContext&, IoChannelFactory&)> func,
SubrequestOptions options) {
limitEnforcer->newSubrequest(options.inHouse);
return getSubrequestNoChecks(kj::mv(func), kj::mv(options));
Expand All @@ -815,8 +823,9 @@ kj::Own<WorkerInterface> IoContext::getSubrequest(
kj::Own<WorkerInterface> IoContext::getSubrequestChannel(
uint channel, bool isInHouse, kj::Maybe<kj::String> cfBlobJson, kj::ConstString operationName) {
return getSubrequest(
[&](SpanBuilder& span, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(channel, isInHouse, kj::mv(cfBlobJson), span, channelFactory);
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
Expand All @@ -830,8 +839,9 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelNoChecks(uint channel,
kj::Maybe<kj::String> cfBlobJson,
kj::Maybe<kj::ConstString> operationName) {
return getSubrequestNoChecks(
[&](SpanBuilder& span, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(channel, isInHouse, kj::mv(cfBlobJson), span, channelFactory);
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
return getSubrequestChannelImpl(
channel, isInHouse, kj::mv(cfBlobJson), tracing, channelFactory);
},
SubrequestOptions{
.inHouse = isInHouse,
Expand All @@ -843,11 +853,11 @@ kj::Own<WorkerInterface> IoContext::getSubrequestChannelNoChecks(uint channel,
kj::Own<WorkerInterface> IoContext::getSubrequestChannelImpl(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
SpanBuilder& span,
TraceContext& tracing,
IoChannelFactory& channelFactory) {
IoChannelFactory::SubrequestMetadata metadata{
.cfBlobJson = kj::mv(cfBlobJson),
.parentSpan = span,
.tracing = tracing,
.featureFlagsForFl = worker->getIsolate().getFeatureFlagsForFl(),
};

Expand Down Expand Up @@ -912,10 +922,20 @@ SpanParent IoContext::getCurrentTraceSpan() {
return getMetrics().getSpan();
}

SpanParent IoContext::getCurrentLimeTraceSpan() {
// TODO(o11y): Add support for retrieving span from storage scope lock for more accurate span
// context, as with Jaeger spans.
return getMetrics().getLimeSpan();
}

SpanBuilder IoContext::makeTraceSpan(kj::ConstString operationName) {
return getCurrentTraceSpan().newChild(kj::mv(operationName));
}

SpanBuilder IoContext::makeLimeTraceSpan(kj::ConstString operationName) {
return getCurrentLimeTraceSpan().newChild(kj::mv(operationName));
}

void IoContext::taskFailed(kj::Exception&& exception) {
if (waitUntilStatusValue == EventOutcome::OK) {
KJ_IF_SOME(status, limitEnforcer->getLimitsExceeded()) {
Expand Down
12 changes: 9 additions & 3 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -660,13 +660,17 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
};

kj::Own<WorkerInterface> getSubrequestNoChecks(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(TraceContext&, IoChannelFactory&)> func,
SubrequestOptions options);

// If creating a new subrequest is permitted, calls the given factory function synchronously to
// create one.
// If operationName is specified within options and tracing is enabled, this will add a child span
// to the current trace span for both tracing formats.
// TODO(o11y): In the future we may need to change the interface to support having different span
// names and enforce that only documented spans can be emitted.
kj::Own<WorkerInterface> getSubrequest(
kj::FunctionParam<kj::Own<WorkerInterface>(SpanBuilder&, IoChannelFactory&)> func,
kj::FunctionParam<kj::Own<WorkerInterface>(TraceContext&, IoChannelFactory&)> func,
SubrequestOptions options);

// Get WorkerInterface objects to use for subrequests.
Expand Down Expand Up @@ -747,11 +751,13 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
// Returns the current span being recorded. If called while the JS lock is held, uses the trace
// information from the current async context, if available.
SpanParent getCurrentTraceSpan();
SpanParent getCurrentLimeTraceSpan();

// Returns a builder for recording tracing spans (or a no-op builder if tracing is inactive).
// If called while the JS lock is held, uses the trace information from the current async
// context, if available.
SpanBuilder makeTraceSpan(kj::ConstString operationName);
SpanBuilder makeLimeTraceSpan(kj::ConstString operationName);

// Implement per-IoContext rate limiting for Cache.put(). Pass the body of a Cache API PUT
// request and get a possibly wrapped stream back.
Expand Down Expand Up @@ -860,7 +866,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
kj::Own<WorkerInterface> getSubrequestChannelImpl(uint channel,
bool isInHouse,
kj::Maybe<kj::String> cfBlobJson,
SpanBuilder& span,
TraceContext& tracing,
IoChannelFactory& channelFactory);

friend class IoContext_IncomingRequest;
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class RequestObserver: public kj::Refcounted {
virtual SpanParent getSpan() {
return nullptr;
}
virtual SpanParent getLimeSpan() {
return nullptr;
}

virtual void addedContextTask() {}
virtual void finishedContextTask() {}
Expand Down
68 changes: 64 additions & 4 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
// https://opensource.org/licenses/Apache-2.0

#include <workerd/io/trace.h>
#include <workerd/util/thread-scopes.h>

#include <capnp/message.h>
#include <capnp/schema.h>
#include <kj/debug.h>
#include <kj/time.h>

#include <cstdlib>

Expand All @@ -16,6 +18,10 @@ namespace workerd {
// want this number to be big enough to be useful for tracing, but small enough to make it hard to
// DoS the C++ heap -- keeping in mind we can record a trace per handler run during a request.
static constexpr size_t MAX_TRACE_BYTES = 128 * 1024;
// Limit spans to at most 512, it could be difficult to fit e.g. 1024 spans within MAX_TRACE_BYTES
// unless most of the included spans do not include tags. If use cases arise where this amount is
// insufficient, merge smaller spans together or drop smaller spans.
static constexpr size_t MAX_LIME_SPANS = 512;

namespace {

Expand Down Expand Up @@ -272,10 +278,14 @@ Trace::~Trace() noexcept(false) {}

void Trace::copyTo(rpc::Trace::Builder builder) {
{
auto list = builder.initLogs(logs.size());
auto list = builder.initLogs(logs.size() + spans.size());
for (auto i: kj::indices(logs)) {
logs[i].copyTo(list[i]);
}
// Add spans represented as logs to the logs object.
for (auto i: kj::indices(spans)) {
spans[i].copyTo(list[i + logs.size()]);
}
}

{
Expand Down Expand Up @@ -574,13 +584,15 @@ WorkerTracer::WorkerTracer(
kj::Own<PipelineTracer> parentPipeline, kj::Own<Trace> trace, PipelineLogLevel pipelineLogLevel)
: pipelineLogLevel(pipelineLogLevel),
trace(kj::mv(trace)),
parentPipeline(kj::mv(parentPipeline)) {}
parentPipeline(kj::mv(parentPipeline)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}
WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel)
: pipelineLogLevel(pipelineLogLevel),
trace(kj::refcounted<Trace>(
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none)) {}
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none)),
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}

void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message) {
void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message, bool isSpan) {
if (trace->exceededLogLimit) {
return;
}
Expand All @@ -598,9 +610,57 @@ void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message
return;
}
trace->bytesUsed = newSize;
if (isSpan) {
trace->spans.add(timestamp, logLevel, kj::mv(message));
trace->numSpans++;
return;
}
trace->logs.add(timestamp, logLevel, kj::mv(message));
}

void WorkerTracer::addSpan(const Span& span, kj::String spanContext) {
// This is where we'll actually encode the span for now.
// Drop any spans beyond MAX_LIME_SPANS.
if (trace->numSpans >= MAX_LIME_SPANS) {
return;
}
if (isPredictableModeForTest()) {
// Do not emit span duration information in predictable mode.
log(span.endTime, LogLevel::LOG, kj::str("[\"span: ", span.operationName, "\"]"), true);
} else {
// Time since Unix epoch in seconds, with millisecond precision
double epochSecondsStart = (span.startTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
double epochSecondsEnd = (span.endTime - kj::UNIX_EPOCH) / kj::MILLISECONDS / 1000.0;
auto message = kj::str("[\"span: ", span.operationName, " ", kj::mv(spanContext), " ",
epochSecondsStart, " ", epochSecondsEnd, "\"]");
log(span.endTime, LogLevel::LOG, kj::mv(message), true);
}

// TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e
// String) to avoid having to handle each type explicitly here.
for (const Span::TagMap::Entry& tag: span.tags) {
auto value = [&]() {
KJ_SWITCH_ONEOF(tag.value) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
}
KJ_CASE_ONEOF(val, int64_t) {
return kj::str(val);
}
KJ_CASE_ONEOF(val, double) {
return kj::str(val);
}
KJ_CASE_ONEOF(val, bool) {
return kj::str(val);
}
}
KJ_UNREACHABLE;
}();
kj::String message = kj::str("[\"tag: "_kj, tag.key, " => "_kj, value, "\"]");
log(span.endTime, LogLevel::LOG, kj::mv(message), true);
}
}

void WorkerTracer::addException(
kj::Date timestamp, kj::String name, kj::String message, kj::Maybe<kj::String> stack) {
if (trace->exceededExceptionLimit) {
Expand Down
Loading

0 comments on commit 0b073a7

Please sign in to comment.