Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DEBUG ERRORS #1703

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <absl/container/fixed_array.h>
#include <absl/types/span.h>
#include <mimalloc.h>
#include <sys/socket.h>

Expand Down Expand Up @@ -169,6 +170,10 @@ class Connection : public util::Connection {

ConnectionContext* cntx();

facade::CmdArgList GetParsedArguments() {
return absl::MakeSpan(tmp_cmd_vec_);
}

protected:
void OnShutdown() override;
void OnPreMigrateThread() override;
Expand Down
2 changes: 1 addition & 1 deletion src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
if (stream) {
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
rbuilder_.reset(new RedisReplyBuilder(stream, this));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
Expand Down
97 changes: 89 additions & 8 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
#include "facade/reply_builder.h"

#include <absl/container/fixed_array.h>
#include <absl/strings/match.h>
#include <absl/strings/numbers.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include <double-conversion/double-to-string.h>

#include "base/logging.h"
#include "facade/error.h"
#include "src/facade/conn_context.h"
#include "src/facade/dragonfly_connection.h"
#include "src/server/conn_context.h"

using namespace std;
using absl::StrAppend;
Expand Down Expand Up @@ -196,15 +201,18 @@ char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len)
return sb.Finalize();
}

RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink, facade::ConnectionContext* cntx,
unsigned capacity)
: SinkReplyBuilder(sink),
buffer_(new base::RingBuffer<string>(capacity)), buffer_start_{0}, cntx_{cntx} {
}

void RedisReplyBuilder::SetResp3(bool is_resp3) {
is_resp3_ = is_resp3;
}

void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
VLOG(1) << "Error: " << str;
VLOG(1) << "Error: " << str << " of type: " << (err_type.empty() ? "(no type)"sv : err_type);

if (err_type.empty()) {
err_type = str;
Expand All @@ -214,12 +222,51 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) {

err_count_[err_type]++;

if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
} else {
iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
vector<iovec> v(3u);

if (str[0] != '-')
v.push_back(IoVec(kErrPref));

v.insert(v.end(), {IoVec(str), IoVec(kCRLF)});
v.shrink_to_fit();
Send(v.data(), v.size());

if (buffer_->capacity() > 0u && cntx_ != nullptr) {
string s;

if (absl::StartsWith(str, "unknown command"))
s = str;
else {
auto* dfly_cntx = static_cast<dfly::ConnectionContext*>(cntx_);
string prefix;

if (dfly_cntx && dfly_cntx->cid) {
prefix.reserve(128u);

auto args = dfly_cntx->owner()->GetParsedArguments();

// limit the length of the prefix to be 128 chars
for (auto i = 0u; i < args.length(); ++i) {
auto next = ArgS(args, i);

if ((prefix.length() + next.length()) <= 128u) {
prefix.append(next);
prefix.append(" ", 1u);
} else {
prefix.append("...");
break;
}
}

s = absl::StrCat(prefix, ": ", str);
} else {
prefix = "<no cid>";
}

s = absl::StrCat(prefix, ": ", str);
}

buffer_start_ += buffer_->EmplaceOrOverride(move(s));
}
}

Expand Down Expand Up @@ -503,6 +550,40 @@ void RedisReplyBuilder::SendStringArrInternal(WrappedStrSpan arr, CollectionType
Send(vec.data(), vec_indx + 1);
}

vector<string> RedisReplyBuilder::GetSavedErrors(void) {
/**
* Items are inserted to the RingBuffer by its tail, yet GetItem()
* returns items starting from the head. This implies that printing
* GetItem(0u) would print errors from oldest to newest.
* It is not possible to lazily reverse this result, i.e.,
* by using GetItem(size() - 1) as the head, since
* GetItem(size()) != GetItem(size() - 1)[1], which in words means
* that the next item in GetItem(size() - 1) is not what we want.
*
* Since this is not a hot path, an explicit copy and reversal will suffice.
*/

const auto sz = buffer_->size();
vector<string> reversed(sz);

/**
* What is going on here:
* In `buffer_start_` we store the index at which the buffer
* logically starts, w.r.t time. This means that GetItem(buffer_start_)
* is the oldest item in the buffer.
*
* With this in mind, we want to read the buffer, starting
* from buffer_start_, and ending at its size, and reverse it
*
* Logically this means calling GetItem(buffer_start_), GetItem(buffer_start_ + 1)
* till GetItem(buffer_start_ + sz - 1), and to save time we reverse the array immediately
*/
for (auto i = 0u; i < sz; ++i)
reversed.push_back(*buffer_->GetItem(buffer_start_ + sz - 1 - i));

return reversed;
}

void ReqSerializer::SendCommand(std::string_view str) {
VLOG(2) << "SendCommand: " << str;

Expand Down
25 changes: 24 additions & 1 deletion src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@

#include <absl/container/flat_hash_map.h>

#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

#include "base/logging.h"
#include "base/ring_buffer.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "io/io.h"

namespace facade {

class ConnectionContext;

// Reply mode allows filtering replies.
enum class ReplyMode {
NONE, // No replies are recorded
Expand Down Expand Up @@ -169,10 +176,13 @@ class MCReplyBuilder : public SinkReplyBuilder {
class RedisReplyBuilder : public SinkReplyBuilder {
public:
enum CollectionType { ARRAY, SET, MAP, PUSH };
static constexpr unsigned buffer_capacity = 4u;

using StrSpan = std::variant<absl::Span<const std::string>, absl::Span<const std::string_view>>;

RedisReplyBuilder(::io::Sink* stream);
//! capacity must be a power of 2, see RingBuffer
RedisReplyBuilder(::io::Sink* stream, facade::ConnectionContext* cntx,
unsigned capacity = buffer_capacity);

void SetResp3(bool is_resp3);

Expand Down Expand Up @@ -210,6 +220,16 @@ class RedisReplyBuilder : public SinkReplyBuilder {
// into the string that would be sent
static std::string_view StatusToMsg(OpStatus status);

std::vector<std::string> GetSavedErrors(void);
void FlushErrors(void) {
const unsigned int capacity = buffer_->capacity();
buffer_.reset(new base::RingBuffer<std::string>(capacity));
}

void ResizeErrorsBuffer(unsigned int new_capacity) {
buffer_.reset(new base::RingBuffer<std::string>(new_capacity));
}

protected:
struct WrappedStrSpan : public StrSpan {
size_t Size() const;
Expand All @@ -222,6 +242,9 @@ class RedisReplyBuilder : public SinkReplyBuilder {
const char* NullString();

bool is_resp3_ = false;
std::unique_ptr<base::RingBuffer<std::string>> buffer_; // DEBUG ERRORS logs error here
unsigned buffer_start_;
facade::ConnectionContext* cntx_;
};

class ReqSerializer {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class RedisReplyBuilderTest : public testing::Test {

void SetUp() {
sink_.Clear();
builder_.reset(new RedisReplyBuilder(&sink_));
builder_.reset(new RedisReplyBuilder(&sink_, nullptr, 0u));
}

protected:
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_capture.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CapturingReplyBuilder : public RedisReplyBuilder {

public:
CapturingReplyBuilder(ReplyMode mode = ReplyMode::FULL)
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
: RedisReplyBuilder{nullptr, nullptr, 0u}, reply_mode_{mode}, stack_{}, current_{} {
}

using Payload = std::variant<std::monostate, Null, Error, OpStatus, long, double, SimpleString,
Expand Down
35 changes: 35 additions & 0 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

#include <filesystem>
Expand Down Expand Up @@ -210,6 +211,30 @@ void DoBuildObjHist(EngineShard* shard, ObjHistMap* obj_hist_map) {
}
}

void ErrorStatsInternal(ConnectionContext* cntx, CmdArgList args) {
auto* rb =
cntx->operator->(); // returns RedisReplyBuilder, safety checks performed inside operator->()

if (args.size() > 1) { // received arguments
auto arg = ArgS(args, 1);
if (absl::EqualsIgnoreCase(arg, "FLUSH")) {
rb->FlushErrors();
} else if (absl::EqualsIgnoreCase(arg, "RESIZE")) {
unsigned int out{0};
if (!absl::SimpleAtoi(ArgS(args, 2), &out)) {
return rb->SendError(kInvalidIntErr);
}
rb->ResizeErrorsBuffer(out);
} else {
rb->SendError(kSyntaxErrType);
}
} else {
return rb->SendStringArr(rb->GetSavedErrors());
}

rb->SendOk();
}

} // namespace

DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) {
Expand Down Expand Up @@ -244,6 +269,11 @@ void DebugCmd::Run(CmdArgList args) {
" If SLOTS is specified then create keys only in given slots range."
"OBJHIST",
" Prints histogram of object sizes.",
"ERRORS [FLUSH] [RESIZE <new_size>]",
" Returns the last K errors recorded in Dragonfly. By default, k = 32.",
" It is possible to clear the buffer by using [FLUSH].",
" Resize the buffer using [RESIZE <size>]. This will clear the buffer. The new size "
"must be a power of 2.",
"HELP",
" Prints this help.",
};
Expand Down Expand Up @@ -283,6 +313,11 @@ void DebugCmd::Run(CmdArgList args) {
if (subcmd == "OBJHIST") {
return ObjHist();
}

if (subcmd == "ERRORS") {
return ErrorStatsInternal(cntx_, args);
}

string reply = UnknownSubCmd(subcmd, "DEBUG");
return (*cntx_)->SendError(reply, kSyntaxErrType);
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ void DispatchMonitor(ConnectionContext* cntx, const CommandId* cid, CmdArgList t

class InterpreterReplier : public RedisReplyBuilder {
public:
InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) {
InterpreterReplier(ObjectExplorer* explr)
: RedisReplyBuilder(nullptr, nullptr, 0u), explr_(explr) {
}

void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
Expand Down
Loading