From 31ff8c641e72cde646a55ae18a41f41ca6c62975 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 7 Oct 2024 08:26:41 -0700 Subject: [PATCH] More cleanups and refinements on revised trace api --- src/workerd/api/trace.c++ | 4 +- src/workerd/io/trace-common-test.c++ | 39 ++++++------ src/workerd/io/trace-common.c++ | 81 ++++++++++++++++-------- src/workerd/io/trace-common.h | 82 ++++++++++--------------- src/workerd/io/trace-legacy.c++ | 30 +++++++-- src/workerd/io/trace-legacy.h | 48 +++++++++++++-- src/workerd/io/trace-streaming-test.c++ | 39 ++++-------- src/workerd/io/trace-streaming.c++ | 45 ++++++-------- src/workerd/io/trace-streaming.h | 18 +++--- src/workerd/io/trace.c++ | 13 ++-- src/workerd/io/trace.h | 5 +- src/workerd/io/worker-interface.capnp | 40 +++++++----- src/workerd/server/server.c++ | 2 +- 13 files changed, 254 insertions(+), 192 deletions(-) diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index ae8cf3cf592..e8f76f58425 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -206,8 +206,8 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace) trace.onsetInfo.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })), scriptTags(getTraceScriptTags(trace)), outcome(getTraceOutcome(trace)), - cpuTime(trace.outcomeInfo.cpuTime / kj::MILLISECONDS), - wallTime(trace.outcomeInfo.wallTime / kj::MILLISECONDS), + cpuTime(trace.cpuTime / kj::MILLISECONDS), + wallTime(trace.wallTime / kj::MILLISECONDS), truncated(trace.truncated) {} kj::Maybe TraceItem::getEvent(jsg::Lock& js) { diff --git a/src/workerd/io/trace-common-test.c++ b/src/workerd/io/trace-common-test.c++ index dfd0a7ecb83..66be52e8ac5 100644 --- a/src/workerd/io/trace-common-test.c++ +++ b/src/workerd/io/trace-common-test.c++ @@ -292,10 +292,8 @@ KJ_TEST("TraceEventInfo works") { } KJ_TEST("Outcome works") { - trace::Outcome outcome(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); + trace::Outcome outcome(EventOutcome::OK); KJ_EXPECT(outcome.outcome == EventOutcome::OK); - KJ_EXPECT(outcome.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome.wallTime == 2 * kj::MILLISECONDS); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -304,13 +302,9 @@ KJ_TEST("Outcome works") { auto reader = builder.asReader(); trace::Outcome outcome2(reader); KJ_EXPECT(outcome2.outcome == EventOutcome::OK); - KJ_EXPECT(outcome2.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome2.wallTime == 2 * kj::MILLISECONDS); auto outcome3 = outcome.clone(); KJ_EXPECT(outcome3.outcome == EventOutcome::OK); - KJ_EXPECT(outcome3.cpuTime == 1 * kj::MILLISECONDS); - KJ_EXPECT(outcome3.wallTime == 2 * kj::MILLISECONDS); } KJ_TEST("DiagnosticChannelEvent works") { @@ -359,12 +353,13 @@ KJ_TEST("Log works") { KJ_EXPECT(log3.message == "foo"_kj); } -KJ_TEST("LogV2 workers") { +KJ_TEST("LogV2 works") { kj::Date date = 0 * kj::MILLISECONDS + kj::UNIX_EPOCH; trace::LogV2 log(date, LogLevel::INFO, kj::heapArray(1)); KJ_EXPECT(log.timestamp == date); KJ_EXPECT(log.logLevel == LogLevel::INFO); - KJ_EXPECT(log.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log.message.tryGet>()).size() == 1); + KJ_EXPECT(!log.truncated); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -374,12 +369,14 @@ KJ_TEST("LogV2 workers") { trace::LogV2 log2(reader); KJ_EXPECT(log2.timestamp == date); KJ_EXPECT(log2.logLevel == LogLevel::INFO); - KJ_EXPECT(log2.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log2.message.tryGet>()).size() == 1); + KJ_EXPECT(!log2.truncated); auto log3 = log.clone(); KJ_EXPECT(log3.timestamp == date); KJ_EXPECT(log3.logLevel == LogLevel::INFO); - KJ_EXPECT(log3.data.size() == 1); + KJ_EXPECT(KJ_ASSERT_NONNULL(log3.message.tryGet>()).size() == 1); + KJ_EXPECT(!log3.truncated); } KJ_TEST("Exception works") { @@ -430,9 +427,9 @@ KJ_TEST("Subrequest works") { } KJ_TEST("SubrequestOutcome works") { - trace::SubrequestOutcome outcome(1U, kj::none, trace::SpanEvent::Outcome::OK); + trace::SubrequestOutcome outcome(1U, kj::none, trace::Span::Outcome::OK); KJ_EXPECT(outcome.id == 1U); - KJ_EXPECT(outcome.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(outcome.outcome == trace::Span::Outcome::OK); capnp::MallocMessageBuilder message; auto builder = message.initRoot(); @@ -441,18 +438,18 @@ KJ_TEST("SubrequestOutcome works") { auto reader = builder.asReader(); trace::SubrequestOutcome outcome2(reader); KJ_EXPECT(outcome2.id == 1U); - KJ_EXPECT(outcome2.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(outcome2.outcome == trace::Span::Outcome::OK); auto outcome3 = outcome.clone(); KJ_EXPECT(outcome3.id == 1U); - KJ_EXPECT(outcome3.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(outcome3.outcome == trace::Span::Outcome::OK); } -KJ_TEST("SpanEvent works") { - trace::SpanEvent event(1U, 0U, trace::SpanEvent::Outcome::OK, false, kj::none, nullptr); +KJ_TEST("Span works") { + trace::Span event(1U, 0U, trace::Span::Outcome::OK, false, kj::none, nullptr); KJ_EXPECT(event.id == 1U); KJ_EXPECT(event.parent == 0U); - KJ_EXPECT(event.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(event.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event.transactional == false); KJ_EXPECT(event.info == kj::none); KJ_EXPECT(event.tags.size() == 0); @@ -462,10 +459,10 @@ KJ_TEST("SpanEvent works") { event.copyTo(builder); auto reader = builder.asReader(); - trace::SpanEvent event2(reader); + trace::Span event2(reader); KJ_EXPECT(event2.id == 1U); KJ_EXPECT(event2.parent == 0U); - KJ_EXPECT(event2.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(event2.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event2.transactional == false); KJ_EXPECT(event2.info == kj::none); KJ_EXPECT(event2.tags.size() == 0); @@ -473,7 +470,7 @@ KJ_TEST("SpanEvent works") { auto event3 = event.clone(); KJ_EXPECT(event3.id == 1U); KJ_EXPECT(event3.parent == 0U); - KJ_EXPECT(event3.outcome == trace::SpanEvent::Outcome::OK); + KJ_EXPECT(event3.outcome == trace::Span::Outcome::OK); KJ_EXPECT(event3.transactional == false); KJ_EXPECT(event3.info == kj::none); KJ_EXPECT(event3.tags.size() == 0); diff --git a/src/workerd/io/trace-common.c++ b/src/workerd/io/trace-common.c++ index 8467dbbf08b..96ddc5dd128 100644 --- a/src/workerd/io/trace-common.c++ +++ b/src/workerd/io/trace-common.c++ @@ -273,24 +273,16 @@ Onset Onset::clone() const { // ====================================================================================== // Outcome -Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) - : outcome(outcome), - cpuTime(cpuTime), - wallTime(wallTime) {} +Outcome::Outcome(EventOutcome outcome): outcome(outcome) {} -Outcome::Outcome(rpc::Trace::Outcome::Reader reader) - : outcome(reader.getOutcome()), - cpuTime(reader.getCpuTime() * kj::MILLISECONDS), - wallTime(reader.getWallTime() * kj::MILLISECONDS) {} +Outcome::Outcome(rpc::Trace::Outcome::Reader reader): outcome(reader.getOutcome()) {} void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const { builder.setOutcome(outcome); - builder.setCpuTime(cpuTime / kj::MILLISECONDS); - builder.setWallTime(wallTime / kj::MILLISECONDS); } Outcome Outcome::clone() const { - return Outcome{outcome, cpuTime, wallTime}; + return Outcome{outcome}; } // ====================================================================================== @@ -623,6 +615,22 @@ DiagnosticChannelEvent DiagnosticChannelEvent::clone() const { // ====================================================================================== // Log +namespace { +kj::OneOf, kj::String> getMessageForLog( + const rpc::Trace::LogV2::Reader& reader) { + auto message = reader.getMessage(); + switch (message.which()) { + case rpc::Trace::LogV2::Message::Which::TEXT: { + return kj::str(message.getText()); + } + case rpc::Trace::LogV2::Message::Which::DATA: { + return kj::heapArray(message.getData()); + } + } + KJ_UNREACHABLE; +} +} // namespace + Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) : timestamp(timestamp), logLevel(logLevel), @@ -643,21 +651,35 @@ Log Log::clone() const { return Log(timestamp, logLevel, kj::str(message)); } -LogV2::LogV2(kj::Date timestamp, LogLevel logLevel, kj::Array data, Tags tags) +LogV2::LogV2(kj::Date timestamp, + LogLevel logLevel, + kj::OneOf, kj::String> message, + Tags tags, + bool truncated) : timestamp(timestamp), logLevel(logLevel), - data(kj::mv(data)), - tags(kj::mv(tags)) {} + message(kj::mv(message)), + tags(kj::mv(tags)), + truncated(truncated) {} LogV2::LogV2(rpc::Trace::LogV2::Reader reader) : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), logLevel(reader.getLogLevel()), - data(kj::heapArray(reader.getData())) {} + message(getMessageForLog(reader)), + truncated(reader.getTruncated()) {} void LogV2::copyTo(rpc::Trace::LogV2::Builder builder) const { builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setLogLevel(logLevel); - builder.setData(data); + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + builder.initMessage().setText(str); + } + KJ_CASE_ONEOF(data, kj::Array) { + builder.initMessage().setData(data); + } + } + builder.setTruncated(truncated); auto outTags = builder.initTags(tags.size()); for (size_t n = 0; n < tags.size(); n++) { tags[n].copyTo(outTags[n]); @@ -669,7 +691,18 @@ LogV2 LogV2::clone() const { for (auto& tag: tags) { newTags.add(tag.clone()); } - return LogV2(timestamp, logLevel, kj::heapArray(data), newTags.releaseAsArray()); + auto newMessage = ([&]() -> kj::OneOf, kj::String> { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(str, kj::String) { + return kj::str(str); + } + KJ_CASE_ONEOF(data, kj::Array) { + return kj::heapArray(data); + } + } + KJ_UNREACHABLE; + })(); + return LogV2(timestamp, logLevel, kj::mv(newMessage), newTags.releaseAsArray(), truncated); } // ====================================================================================== @@ -941,10 +974,10 @@ SubrequestOutcome SubrequestOutcome::clone() const { } // ====================================================================================== -// SpanEvent +// Span namespace { -kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) { +kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) { // if (!reader.hasInfo()) return kj::none; auto info = reader.getInfo(); switch (info.which()) { @@ -970,7 +1003,7 @@ kj::Maybe maybeGetInfo(const rpc::Trace::Span::Reader& reader) } } // namespace -SpanEvent::SpanEvent(uint32_t id, +Span::Span(uint32_t id, uint32_t parent, rpc::Trace::Span::SpanOutcome outcome, bool transactional, @@ -983,7 +1016,7 @@ SpanEvent::SpanEvent(uint32_t id, info(kj::mv(maybeInfo)), tags(kj::mv(tags)) {} -SpanEvent::SpanEvent(rpc::Trace::Span::Reader reader) +Span::Span(rpc::Trace::Span::Reader reader) : id(reader.getId()), parent(reader.getParent()), outcome(reader.getOutcome()), @@ -991,7 +1024,7 @@ SpanEvent::SpanEvent(rpc::Trace::Span::Reader reader) info(maybeGetInfo(reader)), tags(maybeGetTags(reader)) {} -void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const { +void Span::copyTo(rpc::Trace::Span::Builder builder) const { builder.setId(id); builder.setParent(parent); builder.setOutcome(outcome); @@ -1019,7 +1052,7 @@ void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const { } } -SpanEvent SpanEvent::clone() const { +Span Span::clone() const { auto newInfo = ([&]() -> kj::Maybe { KJ_IF_SOME(i, info) { KJ_SWITCH_ONEOF(i) { @@ -1040,7 +1073,7 @@ SpanEvent SpanEvent::clone() const { } return kj::none; })(); - return SpanEvent( + return Span( id, parent, outcome, transactional, kj::mv(newInfo), KJ_MAP(tag, tags) { return tag.clone(); }); } diff --git a/src/workerd/io/trace-common.h b/src/workerd/io/trace-common.h index 244483d1d2c..076f4d19727 100644 --- a/src/workerd/io/trace-common.h +++ b/src/workerd/io/trace-common.h @@ -50,7 +50,7 @@ concept IsEnum = std::is_enum::value; // a double, a string, or an arbitrary byte array. When a byte array is used, the // specific format is specific to the tag key. No general assumptions can be made // about the value without knowledge of the specific key. -struct Tag { +struct Tag final { using TagValue = kj::OneOf>; using TagKey = kj::OneOf; TagKey key; @@ -91,6 +91,8 @@ struct Onset final { Onset& operator=(Onset&&) = default; KJ_DISALLOW_COPY(Onset); + // Note that all of these fields could be represented by tags but are kept + // as separate fields for legacy reasons. kj::Maybe accountId = kj::none; kj::Maybe stableId = kj::none; kj::Maybe scriptName = kj::none; @@ -262,7 +264,7 @@ struct HibernatableWebSocketEventInfo final { HibernatableWebSocketEventInfo clone() const; }; -struct CustomEventInfo { +struct CustomEventInfo final { explicit CustomEventInfo() {}; CustomEventInfo(rpc::Trace::CustomEventInfo::Reader reader) {}; CustomEventInfo(CustomEventInfo&&) = default; @@ -316,15 +318,13 @@ using EventInfo = kj::OneOf data, Tags tags = nullptr); + explicit LogV2(kj::Date timestamp, + LogLevel logLevel, + kj::OneOf, kj::String> message, + Tags tags = nullptr, + bool truncated = false); LogV2(rpc::Trace::LogV2::Reader reader); LogV2(LogV2&&) = default; LogV2& operator=(LogV2&&) = default; @@ -375,8 +378,9 @@ struct LogV2 final { kj::Date timestamp; LogLevel logLevel; - kj::Array data; + kj::OneOf, kj::String> message; Tags tags = nullptr; + bool truncated = false; void copyTo(rpc::Trace::LogV2::Builder builder) const; LogV2 clone() const; @@ -449,19 +453,19 @@ struct SubrequestOutcome final { SubrequestOutcome clone() const; }; -struct SpanEvent final { +struct Span final { using Outcome = rpc::Trace::Span::SpanOutcome; using Info = kj::OneOf; - explicit SpanEvent(uint32_t id, + explicit Span(uint32_t id, uint32_t parentId, rpc::Trace::Span::SpanOutcome outcome, bool transactional = false, kj::Maybe maybeInfo = kj::none, Tags tags = nullptr); - SpanEvent(rpc::Trace::Span::Reader reader); - SpanEvent(SpanEvent&&) = default; - SpanEvent& operator=(SpanEvent&&) = default; - KJ_DISALLOW_COPY(SpanEvent); + Span(rpc::Trace::Span::Reader reader); + Span(Span&&) = default; + Span& operator=(Span&&) = default; + KJ_DISALLOW_COPY(Span); uint32_t id; uint32_t parent; @@ -471,7 +475,7 @@ struct SpanEvent final { Tags tags; void copyTo(rpc::Trace::Span::Builder builder) const; - SpanEvent clone() const; + Span clone() const; }; struct Mark final { @@ -492,6 +496,11 @@ struct Metric final { using Value = kj::OneOf; using Type = rpc::Trace::Metric::Type; + enum class Common { + CPU_TIME, + WALL_TIME, + }; + explicit Metric(Type type, Key key, Value value); template @@ -516,6 +525,14 @@ struct Metric final { void copyTo(rpc::Trace::Metric::Builder builder) const; Metric clone() const; + + static inline Metric forWallTime(kj::Duration duration) { + return Metric(Type::COUNTER, Common::WALL_TIME, duration / kj::MILLISECONDS); + } + + static Metric forCpuTime(kj::Duration duration) { + return Metric(Type::COUNTER, Common::CPU_TIME, duration / kj::MILLISECONDS); + } }; using Metrics = kj::Array; @@ -542,41 +559,6 @@ using EventDetail = kj::OneOf; -// ====================================================================================== -// Represents a trace span. `Span` objects are delivered to `SpanObserver`s for recording. To -// create a `Span`, use a `SpanBuilder`. (Used in the legacy trace api) -struct Span { - using TagValue = kj::OneOf; - // TODO(someday): Support binary bytes, too. - using TagMap = kj::HashMap; - using Tag = TagMap::Entry; - - struct Log { - kj::Date timestamp; - Tag tag; - }; - - kj::ConstString operationName; - kj::Date startTime; - kj::Date endTime; - TagMap tags; - kj::Vector logs; - - // We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this, - // we report how many in a final "dropped_logs" log. - // - // At the risk of being too clever, I chose a limit that is one below a power of two so that - // we'll typically have space for one last element available for the "dropped_logs" log without - // needing to grow the vector. - static constexpr auto MAX_LOGS = 1023; - uint droppedLogs = 0; - - explicit Span(kj::ConstString operationName, kj::Date startTime) - : operationName(kj::mv(operationName)), - startTime(startTime), - endTime(startTime) {} -}; - // ====================================================================================== // The base class for both the original legacy Trace (defined in trace-legacy.h) // and the new StreamingTrace (defined in trace-streaming.h) diff --git a/src/workerd/io/trace-legacy.c++ b/src/workerd/io/trace-legacy.c++ index 4523fd55e45..0d217c3c0d0 100644 --- a/src/workerd/io/trace-legacy.c++ +++ b/src/workerd/io/trace-legacy.c++ @@ -49,8 +49,8 @@ void Trace::copyTo(rpc::Trace::Builder builder) { builder.setTruncated(truncated); builder.setOutcome(outcomeInfo.outcome); - builder.setCpuTime(outcomeInfo.cpuTime / kj::MILLISECONDS); - builder.setWallTime(outcomeInfo.wallTime / kj::MILLISECONDS); + builder.setCpuTime(cpuTime / kj::MILLISECONDS); + builder.setWallTime(wallTime / kj::MILLISECONDS); KJ_IF_SOME(name, onsetInfo.scriptName) { builder.setScriptName(name); } @@ -144,8 +144,8 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev truncated = reader.getTruncated(); outcomeInfo.outcome = reader.getOutcome(); - outcomeInfo.cpuTime = reader.getCpuTime() * kj::MILLISECONDS; - outcomeInfo.wallTime = reader.getWallTime() * kj::MILLISECONDS; + cpuTime = reader.getCpuTime() * kj::MILLISECONDS; + wallTime = reader.getWallTime() * kj::MILLISECONDS; // mergeFrom() is called both when deserializing traces from a sandboxed // worker and when deserializing traces sent to a sandboxed trace worker. In @@ -318,7 +318,7 @@ void Trace::addDiagnosticChannelEvent(trace::DiagnosticChannelEvent&& event) { diagnosticChannelEvents.add(kj::mv(event)); } -void Trace::addSpan(const trace::Span&& span, kj::String spanContext) { +void Trace::addSpan(const Span&& span, kj::String spanContext) { // This is where we'll actually encode the span for now. // Drop any spans beyond MAX_LIME_SPANS. if (numSpans >= MAX_LIME_SPANS) { @@ -339,7 +339,7 @@ void Trace::addSpan(const trace::Span&& span, kj::String spanContext) { // TODO(cleanup): Create a function in kj::OneOf to automatically convert to a given type (i.e // String) to avoid having to handle each type explicitly here. - for (const trace::Span::TagMap::Entry& tag: span.tags) { + for (const Span::TagMap::Entry& tag: span.tags) { auto value = [&]() { KJ_SWITCH_ONEOF(tag.value) { KJ_CASE_ONEOF(str, kj::String) { @@ -368,4 +368,22 @@ void Trace::setFetchResponseInfo(trace::FetchResponseInfo&& info) { fetchResponseInfo = kj::mv(info); } +void Trace::addMetrics(trace::Metrics&& metrics) { + for (auto& metric: metrics) { + if (metric.keyMatches(trace::Metric::Common::CPU_TIME)) { + // The CPU_TIME metric will always be a int64_t converted from a kj::Duration + // If it's not, we'll ignore it. + KJ_IF_SOME(i, metric.value.tryGet()) { + cpuTime = i * kj::MILLISECONDS; + } + } else if (metric.keyMatches(trace::Metric::Common::WALL_TIME)) { + // The WALL_TIME metric will always be a int64_t converted from a kj::Duration + // If it's not, we'll ignore it. + KJ_IF_SOME(i, metric.value.tryGet()) { + wallTime = i * kj::MILLISECONDS; + } + } + } +} + } // namespace workerd diff --git a/src/workerd/io/trace-legacy.h b/src/workerd/io/trace-legacy.h index a76db347100..51348799d7c 100644 --- a/src/workerd/io/trace-legacy.h +++ b/src/workerd/io/trace-legacy.h @@ -12,6 +12,46 @@ namespace workerd { +// ====================================================================================== +// Represents a trace span. `Span` objects are delivered to `SpanObserver`s for recording. To +// create a `Span`, use a `SpanBuilder`. (Used in the legacy trace api) +// Note that this is not the same thing as a trace::Span. This class is part of the legacy +// API for representing spans using log messages instead. +struct Span final { + using TagValue = kj::OneOf; + // TODO(someday): Support binary bytes, too. + using TagMap = kj::HashMap; + using Tag = TagMap::Entry; + + struct Log { + kj::Date timestamp; + Tag tag; + }; + + kj::ConstString operationName; + kj::Date startTime; + kj::Date endTime; + TagMap tags; + kj::Vector logs; + + // We set an arbitrary (-ish) cap on log messages for safety. If we drop logs because of this, + // we report how many in a final "dropped_logs" log. + // + // At the risk of being too clever, I chose a limit that is one below a power of two so that + // we'll typically have space for one last element available for the "dropped_logs" log without + // needing to grow the vector. + static constexpr auto MAX_LOGS = 1023; + kj::uint droppedLogs = 0; + + explicit Span(kj::ConstString operationName, kj::Date startTime) + : operationName(kj::mv(operationName)), + startTime(startTime), + endTime(startTime) {} + Span(Span&&) = default; + Span& operator=(Span&&) = default; + KJ_DISALLOW_COPY(Span); +}; + // This is the original implementation of how trace worker data was collected. All of the // data is stored in an in-memory structure and delivered as a single unit to the trace // worker only when the request is fully completed. The data is held in memory and capped @@ -51,6 +91,8 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { trace::Onset onsetInfo; trace::Outcome outcomeInfo{}; + kj::Duration cpuTime; + kj::Duration wallTime; kj::Vector logs; // TODO(o11y): Convert this to actually store spans. @@ -68,7 +110,7 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { void setEventInfo(kj::Date timestamp, trace::EventInfo&& info); void setOutcome(trace::Outcome&& outcome); void setFetchResponseInfo(trace::FetchResponseInfo&& info); - void addSpan(const trace::Span&& span, kj::String spanContext); + void addSpan(const Span&& span, kj::String spanContext); void addLog(trace::Log&& log, bool isSpan = false); void addException(trace::Exception&& exception) override; @@ -78,9 +120,7 @@ class Trace final: public kj::Refcounted, public trace::TraceBase { // These are currently ignored for legacy traces. } - void addMetrics(trace::Metrics&& metrics) override { - // These are currently ignored for legacy traces. - } + void addMetrics(trace::Metrics&& metrics) override; void addSubrequest(trace::Subrequest&& subrequest) override { // These are currently ignored for legacy traces. diff --git a/src/workerd/io/trace-streaming-test.c++ b/src/workerd/io/trace-streaming-test.c++ index 4147ef37606..5af2e43fe61 100644 --- a/src/workerd/io/trace-streaming-test.c++ +++ b/src/workerd/io/trace-streaming-test.c++ @@ -12,20 +12,14 @@ struct MockTimeProvider final: public StreamingTrace::TimeProvider { kj::Date getNow() const override { return 0 * kj::MILLISECONDS + kj::UNIX_EPOCH; } - kj::Duration getCpuTime() const override { - return 0 * kj::MILLISECONDS; - } - kj::Duration getWallTime() const override { - return 0 * kj::MILLISECONDS; - } }; MockTimeProvider mockTimeProvider; -KJ_TEST("We can create a simple, empty streaming trace session with implicit canceled outcome") { +KJ_TEST("We can create a simple, empty streaming trace session with implicit unknown outcome") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling + // once with an implicit unknown outcome (since we're not explicitly calling // setOutcome ourselves) int callCount = 0; kj::String id = nullptr; @@ -49,7 +43,7 @@ KJ_TEST("We can create a simple, empty streaming trace session with implicit can KJ_EXPECT(event.sequence == 1, "the sequence should have been incremented"); auto& outcome = KJ_ASSERT_NONNULL( event.event.tryGet(), "the event should be an outcome event"); - KJ_EXPECT(outcome.outcome == EventOutcome::CANCELED, "the outcome should be canceled"); + KJ_EXPECT(outcome.outcome == EventOutcome::UNKNOWN, "the outcome should be unknown"); break; } } @@ -62,8 +56,7 @@ KJ_TEST("We can create a simple, empty streaming trace session with expicit canc trace::Onset onset; // In this test we are creating a simple trace with no events or spans. // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // once with an explicit canceled outcome. int callCount = 0; kj::String id = nullptr; auto streamingTrace = workerd::StreamingTrace::create( @@ -89,18 +82,15 @@ KJ_TEST("We can create a simple, empty streaming trace session with expicit canc } } }, mockTimeProvider); - streamingTrace->setOutcome( - trace::Outcome(EventOutcome::CANCELED, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + streamingTrace->setOutcome(trace::Outcome(EventOutcome::CANCELED)); KJ_EXPECT(callCount == 2); } KJ_TEST( - "We can create a simple, streaming trace session with a single implicitly canceled stage span") { + "We can create a simple, streaming trace session with a single implicitly unknown stage span") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. - // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // The delegate should be called exactly four times. int callCount = 0; kj::String id = nullptr; { @@ -135,8 +125,8 @@ KJ_TEST( KJ_EXPECT(event.span.transactional == false, "the root span should not be transactional"); KJ_EXPECT(event.id == id); auto& span = KJ_ASSERT_NONNULL( - event.event.tryGet(), "the event should be a span event"); - KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::CANCELED); + event.event.tryGet(), "the event should be a span event"); + KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::UNKNOWN); break; } case 3: { @@ -147,7 +137,7 @@ KJ_TEST( KJ_EXPECT(event.sequence == 3, "the sequence should have been incremented"); auto& outcome = KJ_ASSERT_NONNULL( event.event.tryGet(), "the event should be an outcome event"); - KJ_EXPECT(outcome.outcome == EventOutcome::CANCELED, "the outcome should be canceled"); + KJ_EXPECT(outcome.outcome == EventOutcome::UNKNOWN, "the outcome should be canceled"); break; } } @@ -164,9 +154,7 @@ KJ_TEST( KJ_TEST("We can create a simple, streaming trace session with a single explicitly canceled trace") { trace::Onset onset; // In this test we are creating a simple trace with no events or spans. - // The delegate should be called exactly twice, once with the onset and - // once with an implicit cancele outcome (since we're not explicitly calling - // setOutcome ourselves) + // The delegate should be called exactly five times. int callCount = 0; kj::String id = nullptr; { @@ -214,7 +202,7 @@ KJ_TEST("We can create a simple, streaming trace session with a single explicitl KJ_EXPECT(event.sequence, 3); KJ_EXPECT(event.id == id); auto& span = KJ_ASSERT_NONNULL( - event.event.tryGet(), "the event should be a span event"); + event.event.tryGet(), "the event should be a span event"); KJ_EXPECT(span.outcome == rpc::Trace::Span::SpanOutcome::CANCELED); break; } @@ -237,8 +225,7 @@ KJ_TEST("We can create a simple, streaming trace session with a single explicitl kj::HttpMethod::GET, kj::str("http://example.com"), kj::String(), {})); stage->addMark(trace::Mark(kj::str("bar"))); // Intentionally not calling setOutcome on the stage span or the trace itself. - streamingTrace->setOutcome( - trace::Outcome(EventOutcome::CANCELED, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + streamingTrace->setOutcome(trace::Outcome(EventOutcome::CANCELED)); // Once the outcome is set, no more events should be emitted but calling the methods on // the span shouldn't crash or error. diff --git a/src/workerd/io/trace-streaming.c++ b/src/workerd/io/trace-streaming.c++ index bcba44f789f..30f62edd41b 100644 --- a/src/workerd/io/trace-streaming.c++ +++ b/src/workerd/io/trace-streaming.c++ @@ -10,6 +10,10 @@ namespace workerd { // ====================================================================================== // TailIDs namespace { +// The UuidId implementation is really intended only for testing and local development. +// In production, it likely makes more sense to use a RayID or something that can be +// better correlated to other diagnostic and tracing mechanisms, and that can be better +// guaranteed to be sufficiently unique across the entire production environment. class UuidId final: public StreamingTrace::IdFactory::Id { public: UuidId(): uuid(randomUUID(kj::none)) {} @@ -48,6 +52,8 @@ kj::Own StreamingTrace::IdFactory::newUuidIdFactory() kj::Own StreamingTrace::IdFactory::newIdFromString( kj::StringPtr str) { + // This is cheating a bit. We're not actually creating a UUID here but the UuidId class + // is really just a wrapper around a string so we can safely use it here. return kj::heap(kj::str(str)); } @@ -84,20 +90,6 @@ constexpr rpc::Trace::Span::SpanOutcome eventOutcomeToSpanOutcome(const EventOut } } // namespace -// When the StreamingTrace is created it will need to be passed an RPC Client to forward -// traces on to. The initial onset event will be forwarded as part of the initial request. -// The StreamingTrace will then be responsible for forwarding all subsequent trace events -// to the RPC Client. To prevent too many small requests, the StreamingTrace will need to -// buffer events until a certain threshold is reached, at which point it will forward the -// buffered events to the RPC Client. -// -// The threshold be the combined size of the events in the queue coupled with a timeout. -// Whichever condition is met first will trigger the forwarding of the events. -// -// The setEventInfo(...) *MUST* be called first. -// The setOutoutcomeInfo(...) *MUST* be called last. Once called, no other events can be -// emitted by the StreamingTrace. - struct StreamingTrace::Impl { kj::Own id; trace::Onset onsetInfo; @@ -136,9 +128,10 @@ StreamingTrace::StreamingTrace(kj::Own id, } StreamingTrace::~StreamingTrace() noexcept(false) { - KJ_IF_SOME(i, impl) { - setOutcome(trace::Outcome( - EventOutcome::CANCELED, i->timeProvider.getCpuTime(), i->timeProvider.getWallTime())); + if (impl != kj::none) { + // If the streaming tracing is dropped without having an outcome explicitly + // specified, the outcome is explicitly set to unknown. + setOutcome(trace::Outcome(EventOutcome::UNKNOWN)); } // Stage spans should be closed by calling setOutcome above KJ_ASSERT(spans.empty(), "all stage spans must be closed before the trace is destroyed"); @@ -242,8 +235,8 @@ struct StreamingTrace::Span::Impl { traceImpl->timeProvider.getNow(), trace.getNextSequence(), kj::mv(payload)); } - StreamEvent makeSpanEvent(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo, + StreamEvent makeSpan(rpc::Trace::Span::SpanOutcome outcome, + kj::Maybe maybeInfo, trace::Tags tags) { auto& tailId = KJ_ASSERT_NONNULL(trace.getId(), "the streaming trace is closed"); auto& traceImpl = KJ_ASSERT_NONNULL(trace.impl); @@ -254,7 +247,7 @@ struct StreamingTrace::Span::Impl { .transactional = (options & Options::TRANSACTIONAL) == Options::TRANSACTIONAL, }, traceImpl->timeProvider.getNow(), trace.getNextSequence(), - trace::SpanEvent(id, parentSpan, outcome, + trace::Span(id, parentSpan, outcome, (options & Options::TRANSACTIONAL) == Options::TRANSACTIONAL, kj::mv(maybeInfo), kj::mv(tags))); } @@ -270,20 +263,20 @@ StreamingTrace::Span::Span(kj::List& parentSpans, } void StreamingTrace::Span::setOutcome(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo, + kj::Maybe maybeInfo, trace::Tags tags) { KJ_IF_SOME(i, impl) { // Then close out the stream by destroying the impl for (auto& span: spans) { span.setOutcome(outcome); } KJ_ASSERT(spans.empty(), "all child spans must be closed before the parent span is closed"); - i->trace.addStreamEvent(i->makeSpanEvent(outcome, kj::mv(maybeInfo), kj::mv(tags))); + i->trace.addStreamEvent(i->makeSpan(outcome, kj::mv(maybeInfo), kj::mv(tags))); impl = kj::none; } } StreamingTrace::Span::~Span() noexcept(false) { - setOutcome(rpc::Trace::Span::SpanOutcome::CANCELED); + setOutcome(rpc::Trace::Span::SpanOutcome::UNKNOWN); KJ_ASSERT(spans.empty(), "all schild spans must be closed before the trace is destroyed"); } @@ -377,7 +370,7 @@ StreamEvent::Event getEvent(const rpc::Trace::StreamEvent::Reader& reader) { return trace::Dropped(event.getDropped()); } case rpc::Trace::StreamEvent::Event::Which::SPAN: { - return trace::SpanEvent(event.getSpan()); + return trace::Span(event.getSpan()); } case rpc::Trace::StreamEvent::Event::Which::INFO: { auto info = event.getInfo(); @@ -494,7 +487,7 @@ void StreamEvent::copyTo(rpc::Trace::StreamEvent::Builder builder) const { KJ_CASE_ONEOF(dropped, trace::Dropped) { dropped.copyTo(builder.getEvent().getDropped()); } - KJ_CASE_ONEOF(span, trace::SpanEvent) { + KJ_CASE_ONEOF(span, trace::Span) { span.copyTo(builder.getEvent().getSpan()); } KJ_CASE_ONEOF(info, Info) { @@ -582,7 +575,7 @@ StreamEvent StreamEvent::clone() const { KJ_CASE_ONEOF(dropped, trace::Dropped) { return dropped.clone(); } - KJ_CASE_ONEOF(span, trace::SpanEvent) { + KJ_CASE_ONEOF(span, trace::Span) { return span.clone(); } KJ_CASE_ONEOF(info, Info) { diff --git a/src/workerd/io/trace-streaming.h b/src/workerd/io/trace-streaming.h index 78dc085c481..ffa7062aeec 100644 --- a/src/workerd/io/trace-streaming.h +++ b/src/workerd/io/trace-streaming.h @@ -13,13 +13,15 @@ namespace workerd { // // The streaming trace itself is considered the root span, whose span ID is always 0. The // root span will always start with an Onset event that communicates basic metadata about -// the worker being traced; and always ends with an Outcome event that communicates the -// final disposition of the traced worker. +// the worker being traced (for instance, script ID, script version, etc); and always ends +// with an Outcome event that communicates the final disposition of the traced worker. // // The root span may have zero or more "Stage Spans". These represent pipeline stages. // There is no requirement that a streaming trace will cover multiple stages in a worker // pipeline but the design allows for it. Generally speaking there should be at least -// one stage span per streaming trace. +// one stage span per streaming trace. This model allows for a single trace to cover +// multiple stages of a pipeline but in actual practice we're most likely to see only +// a single request per trace, even with durable objects. // // A stage span will never be transactional and will always have the root span as its // parent. It should always start with an "Info" event that describes the trigger event @@ -56,7 +58,8 @@ namespace workerd { // ====================================================================================== // StreamEvent -// All events on the streaming trace are StreamEvents. +// All events on the streaming trace are StreamEvents. A StreamEvent is essentialy +// just an envelope for the actual event data. struct StreamEvent final { // The ID of the streaming trace session. This is used to correlate all events // occurring within the same trace session. @@ -79,8 +82,7 @@ struct StreamEvent final { using Info = trace::EventInfo; using Detail = trace::EventDetail; - using Event = - kj::OneOf; + using Event = kj::OneOf; Event event; explicit StreamEvent( @@ -134,8 +136,6 @@ class StreamingTrace final { // to abstract exactly how the trace gets current time. struct TimeProvider { virtual kj::Date getNow() const = 0; - virtual kj::Duration getCpuTime() const = 0; - virtual kj::Duration getWallTime() const = 0; }; static kj::Own create(IdFactory& idFactory, @@ -185,7 +185,7 @@ class StreamingTrace final { virtual ~Span() noexcept(false); void setOutcome(rpc::Trace::Span::SpanOutcome outcome, - kj::Maybe maybeInfo = kj::none, + kj::Maybe maybeInfo = kj::none, trace::Tags tags = nullptr); void addLog(trace::LogV2&& log); diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index ccbec206c06..99470a382b1 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -96,9 +96,10 @@ kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline kj::Maybe dispatchNamespace, kj::Array scriptTags, kj::Maybe entrypoint) { - auto trace = kj::refcounted(trace::Onset(kj::none, // TODO(now): Pass the account id - kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion), kj::mv(dispatchNamespace), - kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint))); + // TODO(streaming-trace): Pass the account id + auto trace = kj::refcounted( + trace::Onset(kj::none, kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion), + kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint))); traces.add(kj::addRef(*trace)); return kj::refcounted(kj::addRef(*this), kj::mv(trace), pipelineLogLevel); } @@ -125,7 +126,7 @@ void WorkerTracer::log(kj::Date timestamp, LogLevel logLevel, kj::String message trace->addLog(trace::Log(timestamp, logLevel, kj::mv(message)), isSpan); } -void WorkerTracer::addSpan(const trace::Span& span, kj::String spanContext) { +void WorkerTracer::addSpan(const Span& span, kj::String spanContext) { // TODO(someday): For now, we're using logLevel == none as a hint to avoid doing anything // expensive while tracing. We may eventually want separate configuration for exceptions vs. // logs. @@ -169,6 +170,10 @@ void WorkerTracer::setEventInfo(kj::Date timestamp, trace::EventInfo&& info) { trace->setEventInfo(timestamp, kj::mv(info)); } +void WorkerTracer::addMetrics(trace::Metrics&& metrics) { + trace->addMetrics(kj::mv(metrics)); +} + void WorkerTracer::setOutcomeInfo(trace::Outcome&& info) { trace->setOutcome(kj::mv(info)); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index 18871cc5a6a..dcd502120e2 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -30,8 +30,6 @@ namespace workerd { using kj::byte; using kj::uint; -using Span = trace::Span; - // ======================================================================================= class WorkerTracer; @@ -97,6 +95,9 @@ class WorkerTracer final: public kj::Refcounted { // Sets info about the event that triggered the trace. Must not be called more than once. void setEventInfo(kj::Date timestamp, trace::EventInfo&&); + // Add metrics to the trace. Can be called more than once. + void addMetrics(trace::Metrics&& metrics); + // Sets info about the result of this trace. Can be called more than once, overriding the // previous detail. void setOutcomeInfo(trace::Outcome&& info); diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index a0c21edbbce..7b5a390d838 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -32,6 +32,27 @@ struct Trace @0x8e8d911203762d34 { message @2 :Text; } + # Streaming tail workers support an expanded version of Log that supports arbitrary + # v8 serialized data or a text message. We define this as a separate new + # struct in order to avoid any possible non-backwards compatible disruption to anything + # using the existing Log struct in the original trace worker impl. The two structs are + # virtually identical with the exception that the message field can be v8 serialized data. + struct LogV2 { + timestampNs @0 :Int64; + logLevel @1 :Log.Level; + message :union { + # When data is used, the LogV2 message is expected to be a v8 serialized value. + data @2 :Data; + text @3 :Text; + } + tags @4 :List(Tag); + # A Log entry might be truncated if it exceeds the maximum size limit configured + # for the process. Truncation should occur before the data is serialized so it + # should always be possible to deserialize the data field successfully, regardless + # of the specific format of the data. + truncated @5 :Bool; + } + exceptions @1 :List(Exception); struct Exception { timestampNs @0 :Int64; @@ -184,11 +205,9 @@ struct Trace @0x8e8d911203762d34 { # the entire trace steam. struct Outcome { outcome @0 :EventOutcome; - cpuTime @1 :UInt64; - wallTime @2 :UInt64; # Any additional arbitrary metadata that should be associated with the outcome. - tags @3 :List(Tag); + tags @1 :List(Tag); } # An outcome information object that describes additional detail about the outcome @@ -201,23 +220,10 @@ struct Trace @0x8e8d911203762d34 { tags @0 :List(Tag); } - # Streaming tail workers support an expanded version of Log that supports arbitrary - # v8 serialized data rather than a text message. We define this as a separate new - # struct in order to avoid any possible non-backwards compatible disruption to anything - # using the existing Log struct in the original trace worker impl. The two structs are - # virtually identical with the exception that the message field here will always be - # v8 serialized data. - struct LogV2 { - timestampNs @0 :Int64; - logLevel @1 :Log.Level; - data @2 :Data; - tags @3 :List(Tag); - } - # A detail event indicating a subrequest that was made during a request. This # can be a fetch subrequest, an RPC subrequest, a call out to a KV namespace, # etc. - # TODO(now): This needs to be flushed out more. + # TODO(streaming-trace): This needs to be flushed out more. struct Subrequest { # A monotonic sequence number that is unique within the tail. The id is # used primarily to correlate with the SubrequestOutcome. diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index e8ea95d6f42..61c2b88f3ff 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1451,7 +1451,7 @@ public: if (fetchStatus != 0) { t->setFetchResponseInfo(trace::FetchResponseInfo(fetchStatus)); } - t->setOutcomeInfo(trace::Outcome(outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS)); + t->setOutcomeInfo(trace::Outcome(outcome)); } }