Skip to content

Commit

Permalink
Convert the datastore write "pauser" to be a "transaction"
Browse files Browse the repository at this point in the history
This is to mitigate the possibilty of race conditions when writing multiple values and trying to read one of them. For example:
thread 1: begin transaction
thread 1: write is_account=true
thread 2: read is_account and account_username; get true and ""
thread 1: write account_username="blah"
thread 1: commit (or roll back)

The new implementation will require thread 2 to wait until thread 1 has finished its transaction.
  • Loading branch information
adam-p committed Aug 24, 2021
1 parent df72dc8 commit 62f9942
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 95 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ add_library( # Sets the name of the library.
${SOURCES} )

SET(GCC_COVERAGE_COMPILE_FLAGS "-Wall -fprofile-arcs -ftest-coverage -g -O0")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${GCC_COVERAGE_COMPILE_FLAGS} -pthread")

SET(GCC_COVERAGE_LINK_FLAGS "-lgcov --coverage")
#SET(GCC_COVERAGE_LINK_FLAGS "-lclang_rt.profile_osx -L/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/lib/clang/10.0.0/lib/darwin")
Expand Down
49 changes: 33 additions & 16 deletions datastore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <iostream>
#include <fstream>
#include <cstdio>
#include <mutex>
#include "datastore.hpp"
#include "utils.hpp"
#include "vendor/nlohmann/json.hpp"
Expand All @@ -33,10 +34,11 @@ using namespace error;

static string FilePath(const string& file_root, const string& suffix);
static Result<json> FileLoad(const string& file_path);
static Error FileStore(bool paused, const string& file_path, const json& json);
static Error FileStore(int transaction_depth, const string& file_path, const json& json);

Datastore::Datastore()
: initialized_(false), json_(json::object()), paused_(false) {
: initialized_(false), explicit_lock_(mutex_, std::defer_lock),
transaction_depth_(0), json_(json::object()) {
}

Error Datastore::Init(const string& file_root, const string& suffix) {
Expand All @@ -55,8 +57,8 @@ Error Datastore::Init(const string& file_root, const string& suffix) {

Error Datastore::Reset(const string& file_path, json new_value) {
SYNCHRONIZE(mutex_);
paused_ = false;
if (auto err = FileStore(paused_, file_path, new_value)) {
transaction_depth_ = 0;
if (auto err = FileStore(transaction_depth_, file_path, new_value)) {
return PassError(err);
}
json_ = new_value;
Expand All @@ -73,25 +75,40 @@ Error Datastore::Reset(json new_value) {
return PassError(Reset(file_path_, new_value));
}

bool Datastore::PauseWrites() {
void Datastore::BeginTransaction() {
// We only acquire a non-local lock if we're starting an outermost transaction.
SYNCHRONIZE(mutex_);
auto was_paused = paused_;
paused_ = true;
return !was_paused;
// We got a local lock, so we know there's no transaction in progress in any other thread.
if (transaction_depth_ == 0) {
explicit_lock_.lock();
}
transaction_depth_++;
}

Error Datastore::UnpauseWrites(bool commit) {
Error Datastore::EndTransaction(bool commit) {
SYNCHRONIZE(mutex_);
MUST_BE_INITIALIZED;
if (!paused_) {
if (transaction_depth_ <= 0) {
assert(false);
return nullerr;
}
paused_ = false;

transaction_depth_--;

if (transaction_depth_ > 0) {
// This was an inner transaction and there's nothing more to do.
return nullerr;
}

// We need to release the explicit lock on exit from ths function, no matter what.
// We will "adopt" the lock into this lock_guard to ensure the unlock happens when it goes out of scope.
std::lock_guard<std::unique_lock<std::recursive_mutex>> lock_releaser(explicit_lock_, std::adopt_lock);

if (commit) {
return PassError(FileStore(paused_, file_path_, json_));
return PassError(FileStore(transaction_depth_, file_path_, json_));
}

// Revert to what's on disk
// We're rolling back -- revert to what's on disk
auto res = FileLoad(file_path_);
if (!res) {
return PassError(res.error());
Expand All @@ -110,7 +127,7 @@ Error Datastore::Set(const json::json_pointer& p, json v) {
SYNCHRONIZE(mutex_);
MUST_BE_INITIALIZED;
json_[p] = v;
return PassError(FileStore(paused_, file_path_, json_));
return PassError(FileStore(transaction_depth_, file_path_, json_));
}

static string FilePath(const string& file_root, const string& suffix) {
Expand Down Expand Up @@ -186,8 +203,8 @@ static Result<json> FileLoad(const string& file_path) {
return json;
}

static Error FileStore(bool paused, const string& file_path, const json& json) {
if (paused) {
static Error FileStore(int transaction_depth, const string& file_path, const json& json) {
if (transaction_depth > 0) {
return nullerr;
}

Expand Down
33 changes: 20 additions & 13 deletions datastore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Datastore {
using json = nlohmann::json;

public:
enum DatastoreGetError {
enum class DatastoreGetError {
kNotFound = 1,
kTypeMismatch,
kDatastoreUninitialized
Expand All @@ -62,12 +62,16 @@ class Datastore {
/// Init() must have already been called, successfully.
error::Error Reset(json new_value);

/// Stops writing of updates to disk until UnpauseWrites is called.
/// Returns false if writing was already paused (so this call did nothing).
bool PauseWrites();
/// Unpauses writing. If commit is true, it writes the changes immediately; if false
/// it discards the changes.
error::Error UnpauseWrites(bool commit);
/// Locks the read/write mutex and stops writing of updates to disk until
/// EndTransaction is called. Transactions are re-enterable, but not nested.
/// NOTE: Failing to call EndTransaction will result in undefined behaviour.
void BeginTransaction();
/// Ends an ongoing transaction writing. If commit is true, it writes the changes
/// immediately; if false it discards the changes.
/// Committing or rolling back inner transactions does nothing. Any errors during
/// inner transactions that require the outermost transaction to be rolled back must
/// be handled by the caller.
error::Error EndTransaction(bool commit);

/// Returns the value, or an error indicating the failure reason.
template<typename T>
Expand All @@ -79,23 +83,23 @@ class Datastore {
SYNCHRONIZE_BLOCK(mutex_) {
// Not using MUST_BE_INITIALIZED so we don't need it in the header.
if (!initialized_) {
return nonstd::make_unexpected(kDatastoreUninitialized);
return nonstd::make_unexpected(DatastoreGetError::kDatastoreUninitialized);
}

if (p.empty() || !json_.contains(p)) {
return nonstd::make_unexpected(kNotFound);
return nonstd::make_unexpected(DatastoreGetError::kNotFound);
}

val = json_.at(p).get<T>();
}
return val;
}
catch (json::type_error&) {
return nonstd::make_unexpected(kTypeMismatch);
return nonstd::make_unexpected(DatastoreGetError::kTypeMismatch);
}
catch (json::out_of_range&) {
// This should be avoided by the explicit check above. But we'll be safe.
return nonstd::make_unexpected(kNotFound);
return nonstd::make_unexpected(DatastoreGetError::kNotFound);
}
}

Expand All @@ -112,11 +116,14 @@ class Datastore {
error::Error Reset(const std::string& file_path, json new_value);

private:
mutable std::recursive_mutex mutex_;
bool initialized_;

mutable std::recursive_mutex mutex_;
std::unique_lock<std::recursive_mutex> explicit_lock_;
int transaction_depth_;

std::string file_path_;
json json_;
bool paused_;
};

} // namespace psicash
Expand Down
Loading

0 comments on commit 62f9942

Please sign in to comment.