Skip to content

Commit

Permalink
Merge pull request #144 from bersler/performance_improvements
Browse files Browse the repository at this point in the history
enhancement: performance optimizations
  • Loading branch information
bersler committed May 28, 2024
2 parents 74c70eb + b5a14c9 commit 348a86e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 32 deletions.
87 changes: 57 additions & 30 deletions src/parser/Parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ namespace OpenLogReplicator {
}

if (ctx->dumpRawData > 0) {
std::string header = "## H: [" + std::to_string(lwnMember->block * reader->getBlockSize() + lwnMember->offset) + "] " +
std::string header = "## H: [" + std::to_string(static_cast<uint64_t>(lwnMember->block) * reader->getBlockSize() + lwnMember->offset) + "] " +
std::to_string(headerLength);
ctx->dumpStream << header;
if (header.length() < 36)
Expand Down Expand Up @@ -242,7 +242,7 @@ namespace OpenLogReplicator {
redoLogRecord[vectorCur].subScn = lwnMember->subScn;
redoLogRecord[vectorCur].usn = usn;
redoLogRecord[vectorCur].data = data + offset;
redoLogRecord[vectorCur].dataOffset = lwnMember->block * reader->getBlockSize() + lwnMember->offset + offset;
redoLogRecord[vectorCur].dataOffset = static_cast<uint64_t>(lwnMember->block) * reader->getBlockSize() + lwnMember->offset + offset;
redoLogRecord[vectorCur].fieldLengthsDelta = fieldOffset;
if (redoLogRecord[vectorCur].fieldLengthsDelta + 1 >= recordLength) {
dumpRedoVector(data, recordLength);
Expand Down Expand Up @@ -741,7 +741,7 @@ namespace OpenLogReplicator {
false, true, false);
transaction->begin = true;
transaction->firstSequence = sequence;
transaction->firstOffset = lwnCheckpointBlock * reader->getBlockSize();
transaction->firstOffset = static_cast<uint64_t>(lwnCheckpointBlock) * reader->getBlockSize();
transaction->log(ctx, "B ", redoLogRecord1);
lastTransaction = transaction;
}
Expand Down Expand Up @@ -1209,7 +1209,7 @@ namespace OpenLogReplicator {
}

uint64_t Parser::parse() {
uint64_t lwnConfirmedBlock = 2;
typeBlk lwnConfirmedBlock = 2;
uint64_t lwnRecords = 0;

if (firstScn == Ctx::ZERO_SCN && nextScn == Ctx::ZERO_SCN && reader->getFirstScn() != 0) {
Expand Down Expand Up @@ -1245,8 +1245,8 @@ namespace OpenLogReplicator {
std::to_string(lwnConfirmedBlock) + ")");
metadata->offset = 0;
}
reader->setBufferStartEnd(lwnConfirmedBlock * reader->getBlockSize(),
lwnConfirmedBlock * reader->getBlockSize());
reader->setBufferStartEnd(static_cast<uint64_t>(lwnConfirmedBlock) * reader->getBlockSize(),
static_cast<uint64_t>(lwnConfirmedBlock) * reader->getBlockSize());

ctx->info(0, "processing redo log: " + toString() + " offset: " + std::to_string(reader->getBufferStart()));
if (ctx->flagsSet(Ctx::REDO_FLAGS_ADAPTIVE_SCHEMA) && !metadata->schema->loaded && ctx->versionStr.length() > 0) {
Expand All @@ -1269,14 +1269,14 @@ namespace OpenLogReplicator {
time_ut cStart = ctx->clock->getTimeUt();
reader->setStatusRead();
LwnMember* lwnMember;
uint64_t currentBlock = lwnConfirmedBlock;
uint64_t blockOffset;
uint64_t startBlock = lwnConfirmedBlock;
uint64_t confirmedBufferStart = reader->getBufferStart();
uint64_t recordLength4;
uint64_t recordPos = 0;
uint64_t recordLeftToCopy = 0;
uint64_t lwnEndBlock = lwnConfirmedBlock;
typeBlk startBlock = lwnConfirmedBlock;
typeBlk currentBlock = lwnConfirmedBlock;
typeBlk lwnEndBlock = lwnConfirmedBlock;
uint16_t lwnNumMax = 0;
uint16_t lwnNumCnt = 0;
lwnCheckpointBlock = lwnConfirmedBlock;
Expand All @@ -1285,8 +1285,8 @@ namespace OpenLogReplicator {
while (!ctx->softShutdown) {
// There is some work to do
while (confirmedBufferStart < reader->getBufferEnd()) {
uint64_t redoBufferPos = (currentBlock * reader->getBlockSize()) % Ctx::MEMORY_CHUNK_SIZE;
uint64_t redoBufferNum = ((currentBlock * reader->getBlockSize()) / Ctx::MEMORY_CHUNK_SIZE) % ctx->readBufferMax;
uint64_t redoBufferPos = (static_cast<uint64_t>(currentBlock) * reader->getBlockSize()) % Ctx::MEMORY_CHUNK_SIZE;
uint64_t redoBufferNum = ((static_cast<uint64_t>(currentBlock) * reader->getBlockSize()) / Ctx::MEMORY_CHUNK_SIZE) % ctx->readBufferMax;
uint8_t* redoBlock = reader->redoBufferList[redoBufferNum] + redoBufferPos;

blockOffset = 16;
Expand Down Expand Up @@ -1321,7 +1321,7 @@ namespace OpenLogReplicator {
++lwnNumCnt;

if (ctx->trace & Ctx::TRACE_LWN) {
uint64_t lwnStartBlock = currentBlock;
typeBlk lwnStartBlock = currentBlock;
ctx->logTrace(Ctx::TRACE_LWN, "at: " + std::to_string(lwnStartBlock) + " length: " + std::to_string(lwnLength) +
" chk: " + std::to_string(lwnNum) + " max: " + std::to_string(lwnNumMax));
}
Expand Down Expand Up @@ -1365,15 +1365,13 @@ namespace OpenLogReplicator {
ctx->logTrace(Ctx::TRACE_LWN, "length: " + std::to_string(recordLength4) + " scn: " +
std::to_string(lwnMember->scn) + " subscn: " + std::to_string(lwnMember->subScn));

uint64_t lwnPos = lwnRecords++;
uint64_t lwnPos = ++lwnRecords;
if (lwnPos >= MAX_RECORDS_IN_LWN)
throw RedoLogException(50054, "all " + std::to_string(lwnPos) + " records in lwn were used");

while (lwnPos > 0 &&
(lwnMembers[lwnPos - 1]->scn > lwnMember->scn ||
(lwnMembers[lwnPos - 1]->scn == lwnMember->scn && lwnMembers[lwnPos - 1]->subScn > lwnMember->subScn))) {
lwnMembers[lwnPos] = lwnMembers[lwnPos - 1];
--lwnPos;
while (lwnPos > 1 && *lwnMember < *lwnMembers[lwnPos / 2]) {
lwnMembers[lwnPos] = lwnMembers[lwnPos / 2];
lwnPos = lwnPos / 2;
}
lwnMembers[lwnPos] = lwnMember;
}
Expand Down Expand Up @@ -1412,9 +1410,10 @@ namespace OpenLogReplicator {

if (ctx->trace & Ctx::TRACE_LWN)
ctx->logTrace(Ctx::TRACE_LWN, "* analyze: " + std::to_string(lwnScn));
for (uint64_t i = 0; i < lwnRecords; ++i) {

while (lwnRecords > 0) {
try {
analyzeLwn(lwnMembers[i]);
analyzeLwn(lwnMembers[1]);
} catch (DataException& ex) {
if (ctx->flagsSet(Ctx::REDO_FLAGS_IGNORE_DATA_ERRORS)) {
ctx->error(ex.code, ex.msg);
Expand All @@ -1428,13 +1427,40 @@ namespace OpenLogReplicator {
} else
throw RedoLogException(ex.code, "runtime error, aborting further redo log processing: " + ex.msg);
}

if (lwnRecords == 1) {
lwnRecords = 0;
break;
}

uint64_t lwnPos = 1;
while (true) {
if (lwnPos * 2 < lwnRecords && *lwnMembers[lwnPos * 2] < *lwnMembers[lwnRecords]) {
if (lwnPos * 2 + 1 < lwnRecords && *lwnMembers[lwnPos * 2 + 1] < *lwnMembers[lwnPos * 2]) {
lwnMembers[lwnPos] = lwnMembers[lwnPos * 2 + 1];
lwnPos *= 2;
++lwnPos;
} else {
lwnMembers[lwnPos] = lwnMembers[lwnPos * 2];
lwnPos *= 2;
}
} else if (lwnPos * 2 + 1 < lwnRecords && *lwnMembers[lwnPos * 2 + 1] < *lwnMembers[lwnRecords]) {
lwnMembers[lwnPos] = lwnMembers[lwnPos * 2 + 1];
lwnPos *= 2;
++lwnPos;
} else
break;
}

lwnMembers[lwnPos] = lwnMembers[lwnRecords];
--lwnRecords;
}

if (lwnScn > metadata->firstDataScn) {
if (ctx->trace & Ctx::TRACE_CHECKPOINT)
ctx->logTrace(Ctx::TRACE_CHECKPOINT, "on: " + std::to_string(lwnScn));
builder->processCheckpoint(lwnScn, sequence, lwnTimestamp.toEpoch(ctx->hostTimezone),
currentBlock * reader->getBlockSize(), switchRedo);
static_cast<uint64_t>(currentBlock) * reader->getBlockSize(), switchRedo);

typeSeq minSequence = Ctx::ZERO_SEQ;
uint64_t minOffset = -1;
Expand All @@ -1443,8 +1469,8 @@ namespace OpenLogReplicator {
if (ctx->trace & Ctx::TRACE_LWN)
ctx->logTrace(Ctx::TRACE_LWN, "* checkpoint: " + std::to_string(lwnScn));
metadata->checkpoint(lwnScn, lwnTimestamp, sequence,
currentBlock * reader->getBlockSize(),
(currentBlock - lwnConfirmedBlock) * reader->getBlockSize(), minSequence,
static_cast<uint64_t>(currentBlock) * reader->getBlockSize(),
static_cast<uint64_t>(currentBlock - lwnConfirmedBlock) * reader->getBlockSize(), minSequence,
minOffset, minXid);

if (ctx->stopCheckpoints > 0 && metadata->isNewData(lwnScn, builder->lwnIdx)) {
Expand All @@ -1463,7 +1489,6 @@ namespace OpenLogReplicator {

lwnNumCnt = 0;
freeLwn();
lwnRecords = 0;

if (ctx->metrics)
ctx->metrics->emitBytesParsed((currentBlock - lwnConfirmedBlock) * reader->getBlockSize());
Expand All @@ -1485,7 +1510,7 @@ namespace OpenLogReplicator {
if (ctx->trace & Ctx::TRACE_CHECKPOINT)
ctx->logTrace(Ctx::TRACE_CHECKPOINT, "on: " + std::to_string(lwnScn) + " with switch");
builder->processCheckpoint(lwnScn, sequence, lwnTimestamp.toEpoch(ctx->hostTimezone),
currentBlock * reader->getBlockSize(), switchRedo);
static_cast<uint64_t>(currentBlock) * reader->getBlockSize(), switchRedo);
if (ctx->metrics)
ctx->metrics->emitCheckpointsOut(1);
} else {
Expand All @@ -1498,7 +1523,7 @@ namespace OpenLogReplicator {
if (ctx->trace & Ctx::TRACE_CHECKPOINT)
ctx->logTrace(Ctx::TRACE_CHECKPOINT, "on: " + std::to_string(lwnScn) + " at exit");
builder->processCheckpoint(lwnScn, sequence, lwnTimestamp.toEpoch(ctx->hostTimezone),
currentBlock * reader->getBlockSize(), false);
static_cast<uint64_t>(currentBlock) * reader->getBlockSize(), false);
if (ctx->metrics)
ctx->metrics->emitCheckpointsOut(1);

Expand All @@ -1508,7 +1533,7 @@ namespace OpenLogReplicator {
if (reader->getRet() == Reader::REDO_FINISHED && nextScn == Ctx::ZERO_SCN && reader->getNextScn() != Ctx::ZERO_SCN)
nextScn = reader->getNextScn();
if (reader->getRet() == Reader::REDO_STOPPED || reader->getRet() == Reader::REDO_OVERWRITTEN)
metadata->offset = lwnConfirmedBlock * reader->getBlockSize();
metadata->offset = static_cast<uint64_t>(lwnConfirmedBlock) * reader->getBlockSize();
break;
}
}
Expand All @@ -1531,7 +1556,7 @@ namespace OpenLogReplicator {
time_ut cEnd = ctx->clock->getTimeUt();
double suppLogPercent = 0.0;
if (currentBlock != startBlock)
suppLogPercent = 100.0 * ctx->suppLogSize / ((currentBlock - startBlock) * reader->getBlockSize());
suppLogPercent = 100.0 * ctx->suppLogSize / (static_cast<double>(currentBlock - startBlock) * reader->getBlockSize());

if (group == 0) {
double mySpeed = 0;
Expand All @@ -1545,14 +1570,16 @@ namespace OpenLogReplicator {

ctx->logTrace(Ctx::TRACE_PERFORMANCE, std::to_string(myTime) + " ms, " +
"Speed: " + std::to_string(mySpeed) + " MB/s, " +
"Redo log size: " + std::to_string((currentBlock - startBlock) * reader->getBlockSize() / 1024 / 1024) +
"Redo log size: " + std::to_string(static_cast<uint64_t>(currentBlock - startBlock) *
reader->getBlockSize() / 1024 / 1024) +
" MB, Read size: " + std::to_string(reader->getSumRead() / 1024 / 1024) + " MB, " +
"Read speed: " + std::to_string(myReadSpeed) + " MB/s, " +
"Max LWN size: " + std::to_string(lwnAllocatedMax) + ", " +
"Supplemental redo log size: " + std::to_string(ctx->suppLogSize) + " bytes " +
"(" + std::to_string(suppLogPercent) + " %)");
} else {
ctx->logTrace(Ctx::TRACE_PERFORMANCE, "Redo log size: " + std::to_string((currentBlock - startBlock) * reader->getBlockSize() / 1024 / 1024) + " MB, " +
ctx->logTrace(Ctx::TRACE_PERFORMANCE, "Redo log size: " + std::to_string(static_cast<uint64_t>(currentBlock - startBlock) *
reader->getBlockSize() / 1024 / 1024) + " MB, " +
"Max LWN size: " + std::to_string(lwnAllocatedMax) + ", " +
"Supplemental redo log size: " + std::to_string(ctx->suppLogSize) + " bytes " +
"(" + std::to_string(suppLogPercent) + " %)");
Expand Down
20 changes: 18 additions & 2 deletions src/parser/Parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ namespace OpenLogReplicator {
typeScn scn;
typeSubScn subScn;
typeBlk block;

bool operator<(const LwnMember& other) const {
if (scn < other.scn)
return true;
if (other.scn < scn)
return false;
if (subScn < other.subScn)
return true;
if (other.subScn < subScn)
return false;
if (block < other.block)
return true;
if (block > other.block)
return false;
return (offset < other.offset);
}
};

class Parser final {
Expand All @@ -55,12 +71,12 @@ namespace OpenLogReplicator {
Transaction* lastTransaction;

uint8_t* lwnChunks[MAX_LWN_CHUNKS];
LwnMember* lwnMembers[MAX_RECORDS_IN_LWN];
LwnMember* lwnMembers[MAX_RECORDS_IN_LWN + 1];
uint64_t lwnAllocated;
uint64_t lwnAllocatedMax;
typeTime lwnTimestamp;
typeScn lwnScn;
uint64_t lwnCheckpointBlock;
typeBlk lwnCheckpointBlock;

void freeLwn();
void analyzeLwn(LwnMember* lwnMember);
Expand Down

0 comments on commit 348a86e

Please sign in to comment.