Skip to content

Commit

Permalink
More cleanups and refinements on revised trace api
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Oct 7, 2024
1 parent e2f05a2 commit 31ff8c6
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 192 deletions.
4 changes: 2 additions & 2 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace)
trace.onsetInfo.dispatchNamespace.map([](auto& ns) { return kj::str(ns); })),
scriptTags(getTraceScriptTags(trace)),
outcome(getTraceOutcome(trace)),
cpuTime(trace.outcomeInfo.cpuTime / kj::MILLISECONDS),
wallTime(trace.outcomeInfo.wallTime / kj::MILLISECONDS),
cpuTime(trace.cpuTime / kj::MILLISECONDS),
wallTime(trace.wallTime / kj::MILLISECONDS),
truncated(trace.truncated) {}

kj::Maybe<TraceItem::EventInfo> TraceItem::getEvent(jsg::Lock& js) {
Expand Down
39 changes: 18 additions & 21 deletions src/workerd/io/trace-common-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,8 @@ KJ_TEST("TraceEventInfo works") {
}

KJ_TEST("Outcome works") {
trace::Outcome outcome(EventOutcome::OK, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS);
trace::Outcome outcome(EventOutcome::OK);
KJ_EXPECT(outcome.outcome == EventOutcome::OK);
KJ_EXPECT(outcome.cpuTime == 1 * kj::MILLISECONDS);
KJ_EXPECT(outcome.wallTime == 2 * kj::MILLISECONDS);

capnp::MallocMessageBuilder message;
auto builder = message.initRoot<rpc::Trace::Outcome>();
Expand All @@ -304,13 +302,9 @@ KJ_TEST("Outcome works") {
auto reader = builder.asReader();
trace::Outcome outcome2(reader);
KJ_EXPECT(outcome2.outcome == EventOutcome::OK);
KJ_EXPECT(outcome2.cpuTime == 1 * kj::MILLISECONDS);
KJ_EXPECT(outcome2.wallTime == 2 * kj::MILLISECONDS);

auto outcome3 = outcome.clone();
KJ_EXPECT(outcome3.outcome == EventOutcome::OK);
KJ_EXPECT(outcome3.cpuTime == 1 * kj::MILLISECONDS);
KJ_EXPECT(outcome3.wallTime == 2 * kj::MILLISECONDS);
}

KJ_TEST("DiagnosticChannelEvent works") {
Expand Down Expand Up @@ -359,12 +353,13 @@ KJ_TEST("Log works") {
KJ_EXPECT(log3.message == "foo"_kj);
}

KJ_TEST("LogV2 workers") {
KJ_TEST("LogV2 works") {
kj::Date date = 0 * kj::MILLISECONDS + kj::UNIX_EPOCH;
trace::LogV2 log(date, LogLevel::INFO, kj::heapArray<kj::byte>(1));
KJ_EXPECT(log.timestamp == date);
KJ_EXPECT(log.logLevel == LogLevel::INFO);
KJ_EXPECT(log.data.size() == 1);
KJ_EXPECT(KJ_ASSERT_NONNULL(log.message.tryGet<kj::Array<kj::byte>>()).size() == 1);
KJ_EXPECT(!log.truncated);

capnp::MallocMessageBuilder message;
auto builder = message.initRoot<rpc::Trace::LogV2>();
Expand All @@ -374,12 +369,14 @@ KJ_TEST("LogV2 workers") {
trace::LogV2 log2(reader);
KJ_EXPECT(log2.timestamp == date);
KJ_EXPECT(log2.logLevel == LogLevel::INFO);
KJ_EXPECT(log2.data.size() == 1);
KJ_EXPECT(KJ_ASSERT_NONNULL(log2.message.tryGet<kj::Array<kj::byte>>()).size() == 1);
KJ_EXPECT(!log2.truncated);

auto log3 = log.clone();
KJ_EXPECT(log3.timestamp == date);
KJ_EXPECT(log3.logLevel == LogLevel::INFO);
KJ_EXPECT(log3.data.size() == 1);
KJ_EXPECT(KJ_ASSERT_NONNULL(log3.message.tryGet<kj::Array<kj::byte>>()).size() == 1);
KJ_EXPECT(!log3.truncated);
}

KJ_TEST("Exception works") {
Expand Down Expand Up @@ -430,9 +427,9 @@ KJ_TEST("Subrequest works") {
}

KJ_TEST("SubrequestOutcome works") {
trace::SubrequestOutcome outcome(1U, kj::none, trace::SpanEvent::Outcome::OK);
trace::SubrequestOutcome outcome(1U, kj::none, trace::Span::Outcome::OK);
KJ_EXPECT(outcome.id == 1U);
KJ_EXPECT(outcome.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(outcome.outcome == trace::Span::Outcome::OK);

capnp::MallocMessageBuilder message;
auto builder = message.initRoot<rpc::Trace::SubrequestOutcome>();
Expand All @@ -441,18 +438,18 @@ KJ_TEST("SubrequestOutcome works") {
auto reader = builder.asReader();
trace::SubrequestOutcome outcome2(reader);
KJ_EXPECT(outcome2.id == 1U);
KJ_EXPECT(outcome2.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(outcome2.outcome == trace::Span::Outcome::OK);

auto outcome3 = outcome.clone();
KJ_EXPECT(outcome3.id == 1U);
KJ_EXPECT(outcome3.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(outcome3.outcome == trace::Span::Outcome::OK);
}

KJ_TEST("SpanEvent works") {
trace::SpanEvent event(1U, 0U, trace::SpanEvent::Outcome::OK, false, kj::none, nullptr);
KJ_TEST("Span works") {
trace::Span event(1U, 0U, trace::Span::Outcome::OK, false, kj::none, nullptr);
KJ_EXPECT(event.id == 1U);
KJ_EXPECT(event.parent == 0U);
KJ_EXPECT(event.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(event.outcome == trace::Span::Outcome::OK);
KJ_EXPECT(event.transactional == false);
KJ_EXPECT(event.info == kj::none);
KJ_EXPECT(event.tags.size() == 0);
Expand All @@ -462,18 +459,18 @@ KJ_TEST("SpanEvent works") {
event.copyTo(builder);

auto reader = builder.asReader();
trace::SpanEvent event2(reader);
trace::Span event2(reader);
KJ_EXPECT(event2.id == 1U);
KJ_EXPECT(event2.parent == 0U);
KJ_EXPECT(event2.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(event2.outcome == trace::Span::Outcome::OK);
KJ_EXPECT(event2.transactional == false);
KJ_EXPECT(event2.info == kj::none);
KJ_EXPECT(event2.tags.size() == 0);

auto event3 = event.clone();
KJ_EXPECT(event3.id == 1U);
KJ_EXPECT(event3.parent == 0U);
KJ_EXPECT(event3.outcome == trace::SpanEvent::Outcome::OK);
KJ_EXPECT(event3.outcome == trace::Span::Outcome::OK);
KJ_EXPECT(event3.transactional == false);
KJ_EXPECT(event3.info == kj::none);
KJ_EXPECT(event3.tags.size() == 0);
Expand Down
81 changes: 57 additions & 24 deletions src/workerd/io/trace-common.c++
Original file line number Diff line number Diff line change
Expand Up @@ -273,24 +273,16 @@ Onset Onset::clone() const {
// ======================================================================================
// Outcome

Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime)
: outcome(outcome),
cpuTime(cpuTime),
wallTime(wallTime) {}
Outcome::Outcome(EventOutcome outcome): outcome(outcome) {}

Outcome::Outcome(rpc::Trace::Outcome::Reader reader)
: outcome(reader.getOutcome()),
cpuTime(reader.getCpuTime() * kj::MILLISECONDS),
wallTime(reader.getWallTime() * kj::MILLISECONDS) {}
Outcome::Outcome(rpc::Trace::Outcome::Reader reader): outcome(reader.getOutcome()) {}

void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const {
builder.setOutcome(outcome);
builder.setCpuTime(cpuTime / kj::MILLISECONDS);
builder.setWallTime(wallTime / kj::MILLISECONDS);
}

Outcome Outcome::clone() const {
return Outcome{outcome, cpuTime, wallTime};
return Outcome{outcome};
}

// ======================================================================================
Expand Down Expand Up @@ -623,6 +615,22 @@ DiagnosticChannelEvent DiagnosticChannelEvent::clone() const {
// ======================================================================================
// Log

namespace {
kj::OneOf<kj::Array<kj::byte>, kj::String> getMessageForLog(
const rpc::Trace::LogV2::Reader& reader) {
auto message = reader.getMessage();
switch (message.which()) {
case rpc::Trace::LogV2::Message::Which::TEXT: {
return kj::str(message.getText());
}
case rpc::Trace::LogV2::Message::Which::DATA: {
return kj::heapArray<kj::byte>(message.getData());
}
}
KJ_UNREACHABLE;
}
} // namespace

Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message)
: timestamp(timestamp),
logLevel(logLevel),
Expand All @@ -643,21 +651,35 @@ Log Log::clone() const {
return Log(timestamp, logLevel, kj::str(message));
}

LogV2::LogV2(kj::Date timestamp, LogLevel logLevel, kj::Array<kj::byte> data, Tags tags)
LogV2::LogV2(kj::Date timestamp,
LogLevel logLevel,
kj::OneOf<kj::Array<kj::byte>, kj::String> message,
Tags tags,
bool truncated)
: timestamp(timestamp),
logLevel(logLevel),
data(kj::mv(data)),
tags(kj::mv(tags)) {}
message(kj::mv(message)),
tags(kj::mv(tags)),
truncated(truncated) {}

LogV2::LogV2(rpc::Trace::LogV2::Reader reader)
: timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS),
logLevel(reader.getLogLevel()),
data(kj::heapArray<kj::byte>(reader.getData())) {}
message(getMessageForLog(reader)),
truncated(reader.getTruncated()) {}

void LogV2::copyTo(rpc::Trace::LogV2::Builder builder) const {
builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS);
builder.setLogLevel(logLevel);
builder.setData(data);
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(str, kj::String) {
builder.initMessage().setText(str);
}
KJ_CASE_ONEOF(data, kj::Array<kj::byte>) {
builder.initMessage().setData(data);
}
}
builder.setTruncated(truncated);
auto outTags = builder.initTags(tags.size());
for (size_t n = 0; n < tags.size(); n++) {
tags[n].copyTo(outTags[n]);
Expand All @@ -669,7 +691,18 @@ LogV2 LogV2::clone() const {
for (auto& tag: tags) {
newTags.add(tag.clone());
}
return LogV2(timestamp, logLevel, kj::heapArray<kj::byte>(data), newTags.releaseAsArray());
auto newMessage = ([&]() -> kj::OneOf<kj::Array<kj::byte>, kj::String> {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(str, kj::String) {
return kj::str(str);
}
KJ_CASE_ONEOF(data, kj::Array<kj::byte>) {
return kj::heapArray<kj::byte>(data);
}
}
KJ_UNREACHABLE;
})();
return LogV2(timestamp, logLevel, kj::mv(newMessage), newTags.releaseAsArray(), truncated);
}

// ======================================================================================
Expand Down Expand Up @@ -941,10 +974,10 @@ SubrequestOutcome SubrequestOutcome::clone() const {
}

// ======================================================================================
// SpanEvent
// Span

namespace {
kj::Maybe<SpanEvent::Info> maybeGetInfo(const rpc::Trace::Span::Reader& reader) {
kj::Maybe<Span::Info> maybeGetInfo(const rpc::Trace::Span::Reader& reader) {
// if (!reader.hasInfo()) return kj::none;
auto info = reader.getInfo();
switch (info.which()) {
Expand All @@ -970,7 +1003,7 @@ kj::Maybe<SpanEvent::Info> maybeGetInfo(const rpc::Trace::Span::Reader& reader)
}
} // namespace

SpanEvent::SpanEvent(uint32_t id,
Span::Span(uint32_t id,
uint32_t parent,
rpc::Trace::Span::SpanOutcome outcome,
bool transactional,
Expand All @@ -983,15 +1016,15 @@ SpanEvent::SpanEvent(uint32_t id,
info(kj::mv(maybeInfo)),
tags(kj::mv(tags)) {}

SpanEvent::SpanEvent(rpc::Trace::Span::Reader reader)
Span::Span(rpc::Trace::Span::Reader reader)
: id(reader.getId()),
parent(reader.getParent()),
outcome(reader.getOutcome()),
transactional(reader.getTransactional()),
info(maybeGetInfo(reader)),
tags(maybeGetTags(reader)) {}

void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const {
void Span::copyTo(rpc::Trace::Span::Builder builder) const {
builder.setId(id);
builder.setParent(parent);
builder.setOutcome(outcome);
Expand Down Expand Up @@ -1019,7 +1052,7 @@ void SpanEvent::copyTo(rpc::Trace::Span::Builder builder) const {
}
}

SpanEvent SpanEvent::clone() const {
Span Span::clone() const {
auto newInfo = ([&]() -> kj::Maybe<Info> {
KJ_IF_SOME(i, info) {
KJ_SWITCH_ONEOF(i) {
Expand All @@ -1040,7 +1073,7 @@ SpanEvent SpanEvent::clone() const {
}
return kj::none;
})();
return SpanEvent(
return Span(
id, parent, outcome, transactional, kj::mv(newInfo),
KJ_MAP(tag, tags) { return tag.clone(); });
}
Expand Down
Loading

0 comments on commit 31ff8c6

Please sign in to comment.