diff --git a/components/core/src/compressor_frontend/Buffer.hpp b/components/core/src/compressor_frontend/Buffer.hpp new file mode 100644 index 000000000..8813ae831 --- /dev/null +++ b/components/core/src/compressor_frontend/Buffer.hpp @@ -0,0 +1,76 @@ +#ifndef COMPRESSOR_FRONTEND_BUFFER_HPP +#define COMPRESSOR_FRONTEND_BUFFER_HPP + +// C++ libraries +#include +#include + +// Project Headers +#include "Constants.hpp" + +/** + * A base class for keeping track of static and dynamic buffers needed for a growing buffer. + * The base class does not grow the buffer, the child class is responsible for doing this. + */ + +namespace compressor_frontend { + template + class Buffer { + public: + // Prevent copying of buffer as this will be really slow + Buffer (Buffer&&) = delete; + + Buffer& operator= (const Buffer&) = delete; + + Buffer () { + m_curr_pos = 0; + m_active_storage = m_static_storage; + m_curr_storage_size = cStaticByteBuffSize; + } + + ~Buffer () { + for (type* dynamic_storage : m_dynamic_storages) { + free(dynamic_storage); + } + } + + type* get_active_buffer () { + return m_active_storage; + } + + [[nodiscard]] uint32_t get_curr_storage_size () const { + return m_curr_storage_size; + } + + void set_curr_pos (uint32_t curr_pos) { + m_curr_pos = curr_pos; + } + + [[nodiscard]] uint32_t get_curr_pos () const { + return m_curr_pos; + } + + /** + * Reset a buffer to parse a new log message + */ + virtual void reset () { + m_curr_pos = 0; + for (type* dynamic_storage : m_dynamic_storages) { + free(dynamic_storage); + } + m_dynamic_storages.clear(); + m_active_storage = m_static_storage; + m_curr_storage_size = cStaticByteBuffSize; + } + + protected: + uint32_t m_curr_pos; + uint32_t m_curr_storage_size; + // Dynamic storage performs better as c-style arrays than as vectors + type* m_active_storage; + std::vector m_dynamic_storages; + type m_static_storage[cStaticByteBuffSize]; + }; +} + +#endif // COMPRESSOR_FRONTEND_BUFFER_HPP \ No newline at end of file diff --git a/components/core/src/compressor_frontend/InputBuffer.cpp b/components/core/src/compressor_frontend/InputBuffer.cpp new file mode 100644 index 000000000..8a542b550 --- /dev/null +++ b/components/core/src/compressor_frontend/InputBuffer.cpp @@ -0,0 +1,111 @@ +// C++ libraries +#include +#include + +// spdlog +#include + +// Project Headers +#include "InputBuffer.hpp" + +using std::string; +using std::to_string; + +namespace compressor_frontend { + + void InputBuffer::reset () { + m_at_end_of_file = false; + m_finished_reading_file = false; + m_consumed_pos = 0; + m_bytes_read = 0; + m_last_read_first_half = false; + Buffer::reset(); + } + + bool InputBuffer::read_is_safe () { + if (m_finished_reading_file) { + return false; + } + // If the next message starts at 0, the previous character is at m_curr_storage_size - 1 + if (m_consumed_pos == -1) { + m_consumed_pos = m_curr_storage_size - 1; + } + // Check that the last log message ends in the half of the buffer that was last read. + // This means the other half of the buffer has already been fully used. + if ((!m_last_read_first_half && m_consumed_pos > m_curr_storage_size / 2) || + (m_last_read_first_half && m_consumed_pos < m_curr_storage_size / 2 && + m_consumed_pos > 0)) { + return true; + } + return false; + } + + bool InputBuffer::increase_size_and_read (ReaderInterface& reader, size_t& old_storage_size) { + old_storage_size = m_curr_storage_size; + bool flipped_static_buffer = false; + // Handle super long line for completeness, but efficiency doesn't matter + if (m_active_storage == m_static_storage) { + SPDLOG_WARN("Long line detected changing to dynamic input buffer and" + " increasing size to {}.", m_curr_storage_size * 2); + } else { + SPDLOG_WARN("Long line detected increasing dynamic input buffer size to {}.", + m_curr_storage_size * 2); + } + m_dynamic_storages.emplace_back((char*)malloc(2 * m_curr_storage_size * sizeof(char))); + if (m_dynamic_storages.back() == nullptr) { + SPDLOG_ERROR("Failed to allocate input buffer of size {}.", m_curr_storage_size); + string err = "Lexer failed to find a match after checking entire buffer"; + throw std::runtime_error(err); + } + if (m_last_read_first_half == false) { + // Buffer in correct order + memcpy(m_dynamic_storages.back(), m_active_storage, + m_curr_storage_size * sizeof(char)); + } else { + // Buffer out of order, so it needs to be flipped when copying + memcpy(m_dynamic_storages.back(), + m_active_storage + m_curr_storage_size * sizeof(char) / 2, + m_curr_storage_size * sizeof(char) / 2); + memcpy(m_dynamic_storages.back() + m_curr_storage_size * sizeof(char) / 2, + m_active_storage, m_curr_storage_size * sizeof(char) / 2); + flipped_static_buffer = true; + } + m_curr_storage_size *= 2; + m_active_storage = m_dynamic_storages.back(); + m_bytes_read = m_curr_storage_size / 2; + m_curr_pos = m_curr_storage_size / 2; + read(reader); + return flipped_static_buffer; + } + + unsigned char InputBuffer::get_next_character () { + if (m_finished_reading_file && m_curr_pos == m_bytes_read) { + m_at_end_of_file = true; + return utf8::cCharEOF; + } + unsigned char character = m_active_storage[m_curr_pos]; + m_curr_pos++; + if (m_curr_pos == m_curr_storage_size) { + m_curr_pos = 0; + } + return character; + } + + void InputBuffer::read (ReaderInterface& reader) { + size_t bytes_read; + // read into the correct half of the buffer + uint32_t read_offset = 0; + if (m_last_read_first_half) { + read_offset = m_curr_storage_size / 2; + } + reader.read(m_active_storage + read_offset, m_curr_storage_size / 2, bytes_read); + m_last_read_first_half = !m_last_read_first_half; + if (bytes_read < m_curr_storage_size / 2) { + m_finished_reading_file = true; + } + m_bytes_read += bytes_read; + if (m_bytes_read > m_curr_storage_size) { + m_bytes_read -= m_curr_storage_size; + } + } +} diff --git a/components/core/src/compressor_frontend/InputBuffer.hpp b/components/core/src/compressor_frontend/InputBuffer.hpp new file mode 100644 index 000000000..c8e6a9e19 --- /dev/null +++ b/components/core/src/compressor_frontend/InputBuffer.hpp @@ -0,0 +1,82 @@ +#ifndef COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP +#define COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP + +// Project Headers +#include "../ReaderInterface.hpp" +#include "Buffer.hpp" + +namespace compressor_frontend { + class InputBuffer : public Buffer { + public: + + /** + * Resets input buffer + * @return + */ + void reset () override; + + /** + * Checks if reading into the input buffer won't overwrite data not yet used + * (e.g., data being overwritten is already compressed in the case of compression) + * @return bool + */ + bool read_is_safe (); + + /** + * Reads into the half of the buffer currently available + * @param reader + */ + void read (ReaderInterface& reader); + + /** + * Reads if no unused data will be overwritten + * @param reader + */ + void try_read (ReaderInterface& reader) { + if (read_is_safe()) { + read(reader); + } + } + + /** + * Swaps to a dynamic buffer (or doubles its size) if needed + * @return bool + */ + bool increase_size_and_read (ReaderInterface& reader, size_t& old_storage_size); + + /** + * Check if at end of file, and return next char (or EOF) + * @return unsigned char + */ + unsigned char get_next_character (); + + bool all_data_read () { + if (m_last_read_first_half) { + return (m_curr_pos == m_curr_storage_size / 2); + } else { + return (m_curr_pos == 0); + } + } + + void set_consumed_pos (uint32_t consumed_pos) { + m_consumed_pos = consumed_pos; + } + + void set_at_end_of_file (bool at_end_of_file) { + m_at_end_of_file = at_end_of_file; + } + + [[nodiscard]] bool at_end_of_file () const { + return m_at_end_of_file; + } + + private: + uint32_t m_bytes_read; + uint32_t m_consumed_pos; + bool m_last_read_first_half; + bool m_finished_reading_file; + bool m_at_end_of_file; + }; +} + +#endif // COMPRESSOR_FRONTEND_INPUT_BUFFER_HPP \ No newline at end of file diff --git a/components/core/src/compressor_frontend/OutputBuffer.cpp b/components/core/src/compressor_frontend/OutputBuffer.cpp new file mode 100644 index 000000000..7f7e40b26 --- /dev/null +++ b/components/core/src/compressor_frontend/OutputBuffer.cpp @@ -0,0 +1,45 @@ +#include "OutputBuffer.hpp" + +// C++ standard libraries +#include + +// spdlog +#include + +using std::string; + +namespace compressor_frontend { + void OutputBuffer::increment_pos () { + m_curr_pos++; + if (m_curr_pos == m_curr_storage_size) { + if (m_active_storage == m_static_storage) { + SPDLOG_WARN( + "Very long log detected: changing to a dynamic output buffer and " + "increasing size to {}. Expect increased latency.", + m_curr_storage_size * 2); + } else { + SPDLOG_WARN("Very long log detected: increasing dynamic output buffer size to {}.", + m_curr_storage_size * 2); + } + m_dynamic_storages.emplace_back( + (Token*)malloc(2 * m_curr_storage_size * sizeof(Token))); + if (m_dynamic_storages.back() == nullptr) { + SPDLOG_ERROR("Failed to allocate output buffer of size {}.", m_curr_storage_size); + /// TODO: update exception when they're properly + /// (e.g., "failed_to_compress_log_continue_to_next") + throw std::runtime_error( + "Lexer failed to find a match after checking entire buffer"); + } + memcpy(m_dynamic_storages.back(), m_active_storage, + m_curr_storage_size * sizeof(Token)); + m_active_storage = m_dynamic_storages.back(); + m_curr_storage_size *= 2; + } + } + + void OutputBuffer::reset () { + m_has_timestamp = false; + m_has_delimiters = false; + Buffer::reset(); + } +} diff --git a/components/core/src/compressor_frontend/OutputBuffer.hpp b/components/core/src/compressor_frontend/OutputBuffer.hpp new file mode 100644 index 000000000..81d870d35 --- /dev/null +++ b/components/core/src/compressor_frontend/OutputBuffer.hpp @@ -0,0 +1,66 @@ +#ifndef COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP +#define COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP + +// Project Headers +#include "Buffer.hpp" +#include "Token.hpp" + +/** + * A buffer containing the tokenized output of the parser. + * The active buffer contains all the tokens from the current log message. + * The first token contains the timestamp (if there is no timestamp the first token is invalid). + * For performance (runtime latency) it defaults to a static buffer and when more tokens are needed + * to be stored than the current capacity it switches to a dynamic buffer. + * Each time the capacity is exceeded a new dynamic buffer is added to the list of dynamic buffers. + */ +namespace compressor_frontend { + + class OutputBuffer : public Buffer { + public: + + /** + * Increment buffer pos, swaps to a dynamic buffer (or doubles its size) if needed + */ + void increment_pos (); + + /** + * Resets output buffer + * @return + */ + void reset () override; + + void set_has_timestamp (bool has_timestamp) { + m_has_timestamp = has_timestamp; + } + + [[nodiscard]] bool has_timestamp () const { + return m_has_timestamp; + } + + void set_has_delimiters (bool has_delimiters) { + m_has_delimiters = has_delimiters; + } + + [[nodiscard]] bool has_delimiters () const { + return m_has_delimiters; + } + + void set_token (uint32_t pos, Token& value) { + m_active_storage[pos] = value; + } + + void set_curr_token (Token& value) { + m_active_storage[m_curr_pos] = value; + } + + [[nodiscard]] const Token& get_curr_token () const { + return m_active_storage[m_curr_pos]; + } + + private: + bool m_has_timestamp = false; + bool m_has_delimiters = false; + }; +} + +#endif // COMPRESSOR_FRONTEND_OUTPUT_BUFFER_HPP \ No newline at end of file