Skip to content

Commit

Permalink
Sequentialize protocol writes with fastq
Browse files Browse the repository at this point in the history
  • Loading branch information
TimDaub committed Jul 4, 2024
1 parent d169f21 commit e587729
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 14 deletions.
29 changes: 23 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"ethers": "5.7.2",
"express": "4.18.2",
"express-async-errors": "3.1.1",
"fastq": "1.17.1",
"htm": "3.1.1",
"isbot": "3.7.0",
"isomorphic-dompurify": "2.12.0",
Expand Down
55 changes: 47 additions & 8 deletions src/store.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { env } from "process";
import { resolve } from "path";

import fastq from "fastq";
import normalizeUrl from "normalize-url";
import { utils } from "ethers";
import {
Expand Down Expand Up @@ -121,6 +122,7 @@ export function isEqual(buf1, buf2) {
return false;
}

// NOTE: Only set to export because this function is imported in tests
export async function lookup(trie, hash, key) {
const result = {
node: null,
Expand Down Expand Up @@ -259,13 +261,7 @@ export function upvoteID(identity, link, type) {
return `${utils.getAddress(identity)}|${normalizeUrl(link)}|${type}`;
}

export async function atomicPut(
trie,
message,
identity,
accounts,
delegations,
) {
async function atomicPut(trie, message, identity, accounts, delegations) {
const marker = upvoteID(identity, message.href, message.type);
const { canonical, index } = toDigest(message);
log(
Expand Down Expand Up @@ -330,7 +326,50 @@ export async function atomicPut(
};
}

// NOTE: The ethereumjs trie library doesn't support more concurrency than one
// write at a time, that is because the trie is shuffled and recomputed with a
// new leaf entering.
//
// `store.add` is the fundamental write operation and we've had cases where a
// write can disrupt a concurrent read, and I'm also speculating that we have
// had writes disrupting parallel writes (I think I have observed this
// recently).
//
// Technically, since it's hard to reason about the process of retrieval and
// writing in the ethereumjs trie, it would be best to wrap all `trie`-touching
// functions with this type of queue sequentialization seen below. But, at this
// stage it is also a potential over-optimization, as reads themselves should
// not disrupts reads, and since reads should now mostly occur through the
// sqlite cache.
//
// So, at the moment, I consider it viable enough to just sequentialize the
// writes so that they don't accidentially disrupt themselves and then to see
// where this change takes us. So this is pretty much a work in progress.
const concurrency = 1;
const queue = fastq.promise(_add, 1);
export async function add(
trie,
message,
libp2p,
allowlist,
delegations,
accounts,
synching,
metadb,
) {
return await queue.push({
trie,
message,
libp2p,
allowlist,
delegations,
accounts,
synching,
metadb,
});
}

async function _add({
trie,
message,
libp2p,
Expand All @@ -339,7 +378,7 @@ export async function add(
accounts,
synching = false,
metadb = upvotes,
) {
}) {
const address = verify(message);

let identity;
Expand Down

0 comments on commit e587729

Please sign in to comment.