Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added buffer files #101

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions components/core/src/compressor_frontend/Buffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#ifndef COMPRESSOR_FRONTEND_BUFFER_HPP
#define COMPRESSOR_FRONTEND_BUFFER_HPP

// C++ libraries
#include <cstdint>
#include <vector>

// Project Headers
#include "Constants.hpp"

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Class header comment should go above the class definition.
  • Comment lines should be less than 80 characters.
  • This description is vague. You should ideally explain why you would need a static and a dynamic buffer, grouped together in one class.

* A base class for keeping track of static and dynamic buffers needed for a growing buffer.
* The base class does not grow the buffer, the child class is responsible for doing this.
*/

namespace compressor_frontend {
template <typename type>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template <typename type>
template <typename Item>

class Buffer {
public:
// Prevent copying of buffer as this will be really slow
Buffer (Buffer&&) = delete;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If you're deleting the move constructor, you should delete the copy constructor as well (see rule of 5).
  • Why're you deleting the move constructor?
  • The move constructor should come after the constructor.


Buffer& operator= (const Buffer&) = delete;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operators should be after the constructor.


Buffer () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the initializer list.

m_curr_pos = 0;
m_active_storage = m_static_storage;
m_curr_storage_size = cStaticByteBuffSize;
}

~Buffer () {
for (type* dynamic_storage : m_dynamic_storages) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (type* dynamic_storage : m_dynamic_storages) {
for (auto* dynamic_storage : m_dynamic_storages) {

free(dynamic_storage);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
free(dynamic_storage);
delete[] dynamic_storage;

}
}

type* get_active_buffer () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type* get_active_buffer () {
[[nodiscard]] type* get_active_buffer () {

return m_active_storage;
}

[[nodiscard]] uint32_t get_curr_storage_size () const {
return m_curr_storage_size;
}

void set_curr_pos (uint32_t curr_pos) {
m_curr_pos = curr_pos;
}

[[nodiscard]] uint32_t get_curr_pos () const {
return m_curr_pos;
}

/**
* Reset a buffer to parse a new log message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Should be in imperative form ("Resets")
  • I would leave it at "Resets the buffer"

*/
virtual void reset () {
m_curr_pos = 0;
for (type* dynamic_storage : m_dynamic_storages) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • You probably want to lift this up into a method to avoid repeating it here and in the destructor. Or perhaps call reset in the destructor.
  • It's also a bit messy to separate allocation and deallocation in different classes. Perhaps you want to move the allocation logic into this class?

free(dynamic_storage);
}
m_dynamic_storages.clear();
m_active_storage = m_static_storage;
m_curr_storage_size = cStaticByteBuffSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m_curr_storage_size = cStaticByteBuffSize;
m_curr_storage_size = m_static_storage.size();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not in this review, but I would suggest changing cStaticByteBuffSize to be something like cStaticSingleBufferSize and make it be half of its current value. Then multiply it by 2 wherever necessary.

  • Dropping "Byte" is because it's not just used for a byte buffer.
  • Dividing it by 2 means we avoid a case where a developer later changes the size to something that's not evenly divisible.

}

protected:
uint32_t m_curr_pos;
uint32_t m_curr_storage_size;
// Dynamic storage performs better as c-style arrays than as vectors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can omit this comment. It should be clear to the reader than a vector of vectors is not as appropriate as a vector of C-arrays.

type* m_active_storage;
std::vector<type*> m_dynamic_storages;
type m_static_storage[cStaticByteBuffSize];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type m_static_storage[cStaticByteBuffSize];
std::array<type, cStaticByteBuffSize> m_static_storage;

};
}

#endif // COMPRESSOR_FRONTEND_BUFFER_HPP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty newline at end of file.

111 changes: 111 additions & 0 deletions components/core/src/compressor_frontend/InputBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// C++ libraries
#include <memory.h>
#include <string>

// spdlog
#include <spdlog/spdlog.h>

// Project Headers
#include "InputBuffer.hpp"

using std::string;
using std::to_string;

namespace compressor_frontend {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line.

void InputBuffer::reset () {
m_at_end_of_file = false;
m_finished_reading_file = false;
m_consumed_pos = 0;
m_bytes_read = 0;
m_last_read_first_half = false;
Buffer::reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you use Buffer in InputBuffer and OutputBuffer suggests to me that Buffer would be more appropriate as a class member in the XBuffer rather as a parent class. That would also get rid of this kind of antipattern where the overridden method is required to call the parent method.

}

bool InputBuffer::read_is_safe () {
if (m_finished_reading_file) {
return false;
}
// If the next message starts at 0, the previous character is at m_curr_storage_size - 1
if (m_consumed_pos == -1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Prefer constant == variable
  • Instead of comparing against how about, std::numeric_limits<decltype(m_consumed_pos)>::max()?

m_consumed_pos = m_curr_storage_size - 1;
}
// Check that the last log message ends in the half of the buffer that was last read.
// This means the other half of the buffer has already been fully used.
if ((!m_last_read_first_half && m_consumed_pos > m_curr_storage_size / 2) ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Rather than using m_curr_storage_size / 2 everywhere, it seems to me like it would more concise to have a variable like m_single_buffer_size and then in the few places you need the double buffer size, you can use m_single_buffer_size * 2.
  • Since you return true if this is true and false if it's not, you can just use expression as the return value.

(m_last_read_first_half && m_consumed_pos < m_curr_storage_size / 2 &&
m_consumed_pos > 0)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, the brace should be on the next line for multiline expressions.

return true;
}
return false;
}

bool InputBuffer::increase_size_and_read (ReaderInterface& reader, size_t& old_storage_size) {
old_storage_size = m_curr_storage_size;
bool flipped_static_buffer = false;
// Handle super long line for completeness, but efficiency doesn't matter
if (m_active_storage == m_static_storage) {
SPDLOG_WARN("Long line detected changing to dynamic input buffer and"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long line detected,

" increasing size to {}.", m_curr_storage_size * 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should just save the new size in a variable rather than repeating it everywhere.

} else {
SPDLOG_WARN("Long line detected increasing dynamic input buffer size to {}.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long line detected,

m_curr_storage_size * 2);
}
m_dynamic_storages.emplace_back((char*)malloc(2 * m_curr_storage_size * sizeof(char)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use malloc and free in C++. Use new and delete.

if (m_dynamic_storages.back() == nullptr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Rather than using m_dynamic_storages.back() everywhere, I would just save the allocated buffer in a variable.
  • This check is not necessary if you use new since it will throw if it fails to allocate.

SPDLOG_ERROR("Failed to allocate input buffer of size {}.", m_curr_storage_size);
string err = "Lexer failed to find a match after checking entire buffer";
throw std::runtime_error(err);
}
if (m_last_read_first_half == false) {
// Buffer in correct order
memcpy(m_dynamic_storages.back(), m_active_storage,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use std::copy instead (should give the compiler more chances to optimize).

m_curr_storage_size * sizeof(char));
} else {
// Buffer out of order, so it needs to be flipped when copying
memcpy(m_dynamic_storages.back(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Use std::copy instead (should give the compiler more chances to optimize).
  • In this branch, it looks like you're assuming the previous buffer was static (since you're copying half and half). What if it was dynamic?

m_active_storage + m_curr_storage_size * sizeof(char) / 2,
m_curr_storage_size * sizeof(char) / 2);
memcpy(m_dynamic_storages.back() + m_curr_storage_size * sizeof(char) / 2,
m_active_storage, m_curr_storage_size * sizeof(char) / 2);
flipped_static_buffer = true;
}
m_curr_storage_size *= 2;
m_active_storage = m_dynamic_storages.back();
m_bytes_read = m_curr_storage_size / 2;
m_curr_pos = m_curr_storage_size / 2;
read(reader);
return flipped_static_buffer;
}

unsigned char InputBuffer::get_next_character () {
if (m_finished_reading_file && m_curr_pos == m_bytes_read) {
m_at_end_of_file = true;
return utf8::cCharEOF;
}
unsigned char character = m_active_storage[m_curr_pos];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can combine this line and the next into one.

m_curr_pos++;
if (m_curr_pos == m_curr_storage_size) {
m_curr_pos = 0;
}
return character;
}

void InputBuffer::read (ReaderInterface& reader) {
size_t bytes_read;
// read into the correct half of the buffer
uint32_t read_offset = 0;
if (m_last_read_first_half) {
read_offset = m_curr_storage_size / 2;
}
reader.read(m_active_storage + read_offset, m_curr_storage_size / 2, bytes_read);
m_last_read_first_half = !m_last_read_first_half;
if (bytes_read < m_curr_storage_size / 2) {
m_finished_reading_file = true;
}
m_bytes_read += bytes_read;
if (m_bytes_read > m_curr_storage_size) {
m_bytes_read -= m_curr_storage_size;
}
}
}
82 changes: 82 additions & 0 deletions components/core/src/compressor_frontend/InputBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP
#define COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP

// Project Headers
#include "../ReaderInterface.hpp"
#include "Buffer.hpp"

namespace compressor_frontend {
class InputBuffer : public Buffer<char> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Missing class description.
  • Seeing as the code seems to assume a very specific use, I would change the class name to something like InputCharacterBuffer or LogMessageInputBuffer.

public:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary newline.

/**
* Resets input buffer
* @return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't return anything.

*/
void reset () override;

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment has too many negatives. How about "Checks if reading into the input buffer is safe in that unconsumed data won't be overwritten"?

* Checks if reading into the input buffer won't overwrite data not yet used
* (e.g., data being overwritten is already compressed in the case of compression)
* @return bool
*/
bool read_is_safe ();

/**
* Reads into the half of the buffer currently available
* @param reader
*/
void read (ReaderInterface& reader);

/**
* Reads if no unused data will be overwritten
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused -> unconsumed?

* @param reader
*/
void try_read (ReaderInterface& reader) {
if (read_is_safe()) {
read(reader);
}
}

/**
* Swaps to a dynamic buffer (or doubles its size) if needed
* @return bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Missing parameter descriptions.
  • Missing description of what you're returning.

*/
bool increase_size_and_read (ReaderInterface& reader, size_t& old_storage_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Seeing as the size used everywhere else is uint32_t, why use size_t here?
  • Typically, the capacity of a buffer is called its capacity rather than its size. So how about increase_capacity_and_read?


/**
* Check if at end of file, and return next char (or EOF)
* @return unsigned char
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@return EOF if at end of file, or the next char in the file

*/
unsigned char get_next_character ();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unsigned char when the type of the buffer element is char?


bool all_data_read () {
if (m_last_read_first_half) {
return (m_curr_pos == m_curr_storage_size / 2);
} else {
return (m_curr_pos == 0);
}
}

void set_consumed_pos (uint32_t consumed_pos) {
m_consumed_pos = consumed_pos;
}

void set_at_end_of_file (bool at_end_of_file) {
m_at_end_of_file = at_end_of_file;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would a caller need to set this manually rather than it being set by reading to the end of the file?

}

[[nodiscard]] bool at_end_of_file () const {
return m_at_end_of_file;
}

private:
uint32_t m_bytes_read;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of m_curr_pos, m_consumed_pos, and m_bytes_read is not clear to me. Can you describe their usage?

uint32_t m_consumed_pos;
bool m_last_read_first_half;
bool m_finished_reading_file;
bool m_at_end_of_file;
};
}

#endif // COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP
45 changes: 45 additions & 0 deletions components/core/src/compressor_frontend/OutputBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#include "OutputBuffer.hpp"

// C++ standard libraries
#include <string>

// spdlog
#include <spdlog/spdlog.h>

using std::string;

namespace compressor_frontend {
void OutputBuffer::increment_pos () {
m_curr_pos++;
if (m_curr_pos == m_curr_storage_size) {
if (m_active_storage == m_static_storage) {
SPDLOG_WARN(
"Very long log detected: changing to a dynamic output buffer and "
"increasing size to {}. Expect increased latency.",
m_curr_storage_size * 2);
} else {
SPDLOG_WARN("Very long log detected: increasing dynamic output buffer size to {}.",
m_curr_storage_size * 2);
}
m_dynamic_storages.emplace_back(
(Token*)malloc(2 * m_curr_storage_size * sizeof(Token)));
if (m_dynamic_storages.back() == nullptr) {
SPDLOG_ERROR("Failed to allocate output buffer of size {}.", m_curr_storage_size);
/// TODO: update exception when they're properly
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When they're properly what?

/// (e.g., "failed_to_compress_log_continue_to_next")
throw std::runtime_error(
"Lexer failed to find a match after checking entire buffer");
}
memcpy(m_dynamic_storages.back(), m_active_storage,
m_curr_storage_size * sizeof(Token));
m_active_storage = m_dynamic_storages.back();
m_curr_storage_size *= 2;
}
}

void OutputBuffer::reset () {
m_has_timestamp = false;
m_has_delimiters = false;
Buffer::reset();
}
}
66 changes: 66 additions & 0 deletions components/core/src/compressor_frontend/OutputBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP
#define COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP

// Project Headers
#include "Buffer.hpp"
#include "Token.hpp"

/**
* A buffer containing the tokenized output of the parser.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This comment should be word-wrapped at 80 characters.
  • These details are somewhat useful but don't really seem to be handled within the class, so I would either move some of the logic into the class or move some of these comments to where they're actually relevant. My preference would be the first one to avoid cases where you might a timestamp but forget to set m_has_timestamp.

* The active buffer contains all the tokens from the current log message.
* The first token contains the timestamp (if there is no timestamp the first token is invalid).
* For performance (runtime latency) it defaults to a static buffer and when more tokens are needed
* to be stored than the current capacity it switches to a dynamic buffer.
* Each time the capacity is exceeded a new dynamic buffer is added to the list of dynamic buffers.
*/
namespace compressor_frontend {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line.

class OutputBuffer : public Buffer<Token> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutputTokenBuffer?

public:

/**
* Increment buffer pos, swaps to a dynamic buffer (or doubles its size) if needed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Should be in imperative form.
  • Swapping to a dynamic buffer seems like an implementation detail (so it shouldn't be in the API header comment).
  • It's not really clear to a reader what pos is. Perhaps this should be called advance_to_next_token?

*/
void increment_pos ();

/**
* Resets output buffer
* @return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No return

*/
void reset () override;

void set_has_timestamp (bool has_timestamp) {
m_has_timestamp = has_timestamp;
}

[[nodiscard]] bool has_timestamp () const {
return m_has_timestamp;
}

void set_has_delimiters (bool has_delimiters) {
m_has_delimiters = has_delimiters;
}

[[nodiscard]] bool has_delimiters () const {
return m_has_delimiters;
}

void set_token (uint32_t pos, Token& value) {
m_active_storage[pos] = value;
}

void set_curr_token (Token& value) {
m_active_storage[m_curr_pos] = value;
}

[[nodiscard]] const Token& get_curr_token () const {
return m_active_storage[m_curr_pos];
}

private:
bool m_has_timestamp = false;
bool m_has_delimiters = false;
};
}

#endif // COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP