Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DDL in manual transaction #3128

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions src/include/common/enums/statement_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,9 @@ enum class StatementType : uint8_t {
};

struct StatementTypeUtils {
static bool allowActiveTransaction(StatementType statementType) {
switch (statementType) {
case StatementType::CREATE_TABLE:
case StatementType::DROP_TABLE:
case StatementType::ALTER:
case StatementType::CREATE_MACRO:
case StatementType::COPY_FROM:
return false;
default:
return true;
}
static bool allowActiveTransaction(StatementType /*statementType*/) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a todo for Chang?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to KUZU_API, affecting python, java, and rust APIs, which I can not handle

// TODO(Chang): remove this function
return true;
}
};

Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class BatchInsert : public Sink {
inline std::shared_ptr<BatchInsertSharedState> getSharedState() const { return sharedState; }

protected:
void checkIfTableIsEmpty();
void checkIfTableIsEmpty(transaction::Transaction* transaction);

protected:
std::unique_ptr<BatchInsertInfo> info;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TableData {
virtual void append(ChunkedNodeGroup* nodeGroup) = 0;

inline void dropColumn(common::column_id_t columnID) {
KU_ASSERT(columnID != common::INVALID_COLUMN_ID && columnID < columns.size());
columns.erase(columns.begin() + columnID);
}
void addColumn(transaction::Transaction* transaction, const std::string& colNamePrefix,
Expand Down
2 changes: 1 addition & 1 deletion src/include/transaction/transaction_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TransactionContext {
void beginReadTransaction();
void beginWriteTransaction();
void beginAutoTransaction(bool readOnlyStatement);
void validateManualTransaction(bool allowActiveTransaction, bool readOnlyStatement);
void validateManualTransaction(bool readOnlyStatement);

void commit();
void rollback();
Expand Down
3 changes: 1 addition & 2 deletions src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ std::unique_ptr<PreparedStatement> ClientContext::prepareNoLock(
if (transactionContext->isAutoTransaction()) {
transactionContext->beginAutoTransaction(preparedStatement->readOnly);
} else {
transactionContext->validateManualTransaction(
preparedStatement->allowActiveTransaction(), preparedStatement->readOnly);
transactionContext->validateManualTransaction(preparedStatement->readOnly);
}
if (!this->getTx()->isReadOnly()) {
database->catalog->initCatalogContentForWriteTrxIfNecessary();
Expand Down
3 changes: 1 addition & 2 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ void Database::commit(Transaction* transaction, bool skipCheckpointForTestingRec
transactionManager->allowReceivingNewTransactions();
}

void Database::rollback(
transaction::Transaction* transaction, bool skipCheckpointForTestingRecovery) {
void Database::rollback(Transaction* transaction, bool skipCheckpointForTestingRecovery) {
if (transaction->isReadOnly()) {
transactionManager->rollback(transaction);
return;
Expand Down
1 change: 1 addition & 0 deletions src/processor/operator/ddl/drop_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::storage;

namespace kuzu {
namespace processor {
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/persistent/batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
#include "common/exception/message.h"

using namespace kuzu::common;
using namespace kuzu::transaction;

namespace kuzu {
namespace processor {

void BatchInsert::checkIfTableIsEmpty() {
if (sharedState->table->getNumTuples(&transaction::DUMMY_READ_TRANSACTION) != 0) {
void BatchInsert::checkIfTableIsEmpty(Transaction* transaction) {
if (sharedState->table->getNumTuples(transaction) != 0) {
throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException());
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/processor/operator/persistent/node_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#include "common/types/types.h"
#include "function/table/scan_functions.h"
#include "processor/result/factorized_table.h"
#include "storage/storage_manager.h"

using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::storage;
using namespace kuzu::transaction;

namespace kuzu {
namespace processor {
Expand Down Expand Up @@ -52,7 +54,7 @@ void NodeBatchInsertSharedState::appendIncompleteNodeGroup(
}

void NodeBatchInsert::initGlobalStateInternal(ExecutionContext* context) {
checkIfTableIsEmpty();
checkIfTableIsEmpty(context->clientContext->getTx());
sharedState->logBatchInsertWALRecord();
auto nodeSharedState =
ku_dynamic_cast<BatchInsertSharedState*, NodeBatchInsertSharedState*>(sharedState.get());
Expand Down Expand Up @@ -190,6 +192,14 @@ void NodeBatchInsert::finalize(ExecutionContext* context) {
sharedState->getNumRows(), info->tableEntry->getName());
FactorizedTableUtils::appendStringToTable(
sharedState->fTable.get(), outputMsg, context->clientContext->getMemoryManager());
auto tableID = sharedState->table->getTableID();
auto catalogEntry = context->clientContext->getCatalog()->getTableCatalogEntry(
context->clientContext->getTx(), tableID);
auto nodeTableEntry = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(catalogEntry);
auto table = context->clientContext->getStorageManager()->getTable(tableID);
KU_ASSERT(table->getTableType() == TableType::NODE);
ku_dynamic_cast<Table*, NodeTable*>(table)->initializePKIndex(
nodeTableEntry, false /* readOnly */, context->clientContext->getVFSUnsafe());
}
} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/rel_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace processor {

void RelBatchInsert::initGlobalStateInternal(ExecutionContext* context) {
if (!context->clientContext->getClientConfig()->enableMultiCopy) {
checkIfTableIsEmpty();
checkIfTableIsEmpty(context->clientContext->getTx());
}
sharedState->logBatchInsertWALRecord();
}
Expand Down
70 changes: 29 additions & 41 deletions src/storage/wal_replayer.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#include "storage/wal_replayer.h"

#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "common/exception/storage.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/store/node_table.h"
#include "storage/wal_replayer_utils.h"
#include "transaction/transaction.h"

Expand Down Expand Up @@ -40,11 +38,25 @@ void WALReplayer::replay() {
"Cannot checkpointInMemory WAL because last logged record is not a commit record.");
}
if (!wal->isEmptyWAL()) {
auto walIterator = wal->getIterator();
WALRecord walRecord;
while (walIterator->hasNextRecord()) {
walIterator->getNextRecord(walRecord);
replayWALRecord(walRecord);
// for rollback, we should replay in reverse order.
if ((!isCheckpoint && !isRecovering)) {
auto walIterator = wal->getIterator();
std::vector<WALRecord> walRecords;
while (walIterator->hasNextRecord()) {
WALRecord walRecord;
walIterator->getNextRecord(walRecord);
walRecords.push_back(walRecord);
}
for (auto iter = walRecords.rbegin(); iter != walRecords.rend(); ++iter) {
replayWALRecord(*iter);
}
} else {
auto walIterator = wal->getIterator();
WALRecord walRecord;
while (walIterator->hasNextRecord()) {
walIterator->getNextRecord(walRecord);
replayWALRecord(walRecord);
}
}
}
// We next perform an in-memory checkpointing or rolling back of node/relTables.
Expand Down Expand Up @@ -169,7 +181,9 @@ void WALReplayer::replayCatalogRecord() {
void WALReplayer::replayCreateTableRecord(const WALRecord& walRecord) {
if (!isCheckpoint) {
storageManager->dropTable(walRecord.createTableRecord.tableID);
wal->getUpdatedTables().erase(walRecord.createTableRecord.tableID);
}
// should recovery
}

void WALReplayer::replayRdfGraphRecord(const WALRecord& walRecord) {
Expand All @@ -188,24 +202,10 @@ void WALReplayer::replayRdfGraphRecord(const WALRecord& walRecord) {
replayCreateTableRecord(literalTripleTableWALRecord);
}

void WALReplayer::replayCopyTableRecord(const WALRecord& walRecord) {
auto tableID = walRecord.copyTableRecord.tableID;
void WALReplayer::replayCopyTableRecord(const WALRecord& /*walRecord*/) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should handle rollback, otherwise index lookup following rollbacked copy might return incorrect result?

if (isCheckpoint) {
if (!isRecovering) {
// CHECKPOINT.
// If we are not recovering, i.e., we are checkpointing during normal execution,
// then we need to update the nodeTable because the actual columns and lists
// files have been changed during checkpoint. So the in memory
// fileHandles are obsolete and should be reconstructed (e.g. since the numPages
// have likely changed they need to reconstruct their page locks).
auto catalogEntry = catalog->getTableCatalogEntry(&DUMMY_READ_TRANSACTION, tableID);
if (catalogEntry->getType() == CatalogEntryType::NODE_TABLE_ENTRY) {
auto nodeTableEntry =
ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(catalogEntry);
auto nodeTable =
ku_dynamic_cast<Table*, NodeTable*>(storageManager->getTable(tableID));
nodeTable->initializePKIndex(nodeTableEntry, false /* readOnly */, vfs);
}
} else {
// RECOVERY.
if (wal->isLastLoggedRecordCommit()) {
Expand All @@ -216,31 +216,19 @@ void WALReplayer::replayCopyTableRecord(const WALRecord& walRecord) {
} else {
// ROLLBACK.
// TODO(Guodong): Do nothing for now. Should remove metaDA and reclaim free pages.
// TODO(Jiamin): should rollback hash index
}
}

void WALReplayer::replayDropTableRecord(const WALRecord& walRecord) {
if (isCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should handle rollback.

auto tableID = walRecord.dropTableRecord.tableID;
if (!isRecovering) {
auto tableEntry = catalog->getTableCatalogEntry(&DUMMY_READ_TRANSACTION, tableID);
switch (tableEntry->getTableType()) {
case TableType::NODE: {
storageManager->dropTable(tableID);
// TODO(Guodong): Do nothing for now. Should remove metaDA and reclaim free pages.
WALReplayerUtils::removeHashIndexFile(vfs, tableID, wal->getDirectory());
} break;
case TableType::REL: {
storageManager->dropTable(tableID);
// TODO(Guodong): Do nothing for now. Should remove metaDA and reclaim free pages.
} break;
case TableType::RDF: {
// Do nothing.
} break;
default: {
KU_UNREACHABLE;
}
}
// we will not access rdf/rel_group tabletype
storageManager->dropTable(tableID);
// need to delete table id from wal, otherwise it will trigger in-memory
// checkpoint/rollback
wal->getUpdatedTables().erase(tableID);
} else {
if (!wal->isLastLoggedRecordCommit()) {
// Nothing to undo.
Expand Down Expand Up @@ -292,7 +280,7 @@ void WALReplayer::replayAddPropertyRecord(const WALRecord& walRecord) {
auto tableID = walRecord.addPropertyRecord.tableID;
auto propertyID = walRecord.addPropertyRecord.propertyID;
if (!isCheckpoint) {
auto tableEntry = catalog->getTableCatalogEntry(&DUMMY_READ_TRANSACTION, tableID);
auto tableEntry = catalog->getTableCatalogEntry(&DUMMY_WRITE_TRANSACTION, tableID);
storageManager->getTable(tableID)->dropColumn(tableEntry->getColumnID(propertyID));
}
}
Expand Down
9 changes: 1 addition & 8 deletions src/transaction/transaction_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,11 @@ void TransactionContext::beginAutoTransaction(bool readOnlyStatement) {
readOnlyStatement ? TransactionType::READ_ONLY : TransactionType::WRITE);
}

void TransactionContext::validateManualTransaction(
bool allowActiveTransaction, bool readOnlyStatement) {
void TransactionContext::validateManualTransaction(bool readOnlyStatement) {
KU_ASSERT(hasActiveTransaction());
if (activeTransaction->isReadOnly() && !readOnlyStatement) {
throw ConnectionException("Can't execute a write query inside a read-only transaction.");
}
if (!allowActiveTransaction) {
throw ConnectionException(
"DDL, Copy, createMacro statements can only run in the AUTO_COMMIT mode. Please commit "
"or rollback your previous transaction if there is any and issue the query without "
"beginning a transaction");
}
}

void TransactionContext::commit() {
Expand Down
1 change: 0 additions & 1 deletion test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ add_subdirectory(runner)
add_subdirectory(storage)
add_subdirectory(transaction)
add_subdirectory(util_tests)
add_subdirectory(ddl)
add_subdirectory(copy)
19 changes: 0 additions & 19 deletions test/c_api/prepared_statement_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,6 @@ TEST_F(CApiPreparedStatementTest, GetErrorMessage) {
free(message);
}

TEST_F(CApiPreparedStatementTest, AllowActiveTransaction) {
auto connection = getConnection();
auto query = "MATCH (a:person) WHERE a.isStudent = $1 RETURN COUNT(*)";
auto preparedStatement = kuzu_connection_prepare(connection, query);
ASSERT_NE(preparedStatement, nullptr);
ASSERT_NE(preparedStatement->_prepared_statement, nullptr);
ASSERT_TRUE(kuzu_prepared_statement_is_success(preparedStatement));
ASSERT_TRUE(kuzu_prepared_statement_allow_active_transaction(preparedStatement));
kuzu_prepared_statement_destroy(preparedStatement);

query = "create node table npytable (id INT64,i64 INT64[12],PRIMARY KEY(id));";
preparedStatement = kuzu_connection_prepare(connection, query);
ASSERT_NE(preparedStatement, nullptr);
ASSERT_NE(preparedStatement->_prepared_statement, nullptr);
ASSERT_TRUE(kuzu_prepared_statement_is_success(preparedStatement));
ASSERT_FALSE(kuzu_prepared_statement_allow_active_transaction(preparedStatement));
kuzu_prepared_statement_destroy(preparedStatement);
}

TEST_F(CApiPreparedStatementTest, BindBool) {
auto connection = getConnection();
auto query = "MATCH (a:person) WHERE a.isStudent = $1 RETURN COUNT(*)";
Expand Down
1 change: 0 additions & 1 deletion test/ddl/CMakeLists.txt

This file was deleted.

Loading
Loading