Skip to content

Commit

Permalink
Rewrite invoke without ports
Browse files Browse the repository at this point in the history
Closes: #1740
  • Loading branch information
tshemsedinov committed Feb 5, 2024
1 parent 1fd89f6 commit ec28c22
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 32 deletions.
34 changes: 22 additions & 12 deletions impress.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,30 @@ const startWorker = async (app, kind, port, id = ++impress.lastWorkerId) => {
},

invoke: async (msg) => {
const { status, port, exclusive } = msg;
if (status === 'done') return void app.pool.release(worker);
const promisedThread = exclusive ? app.pool.capture() : app.pool.next();
const next = await promisedThread.catch(() => {
const error = new Error('No thread available');
port.postMessage({ name: 'error', error });
return null;
});
if (!next) return;
next.postMessage(msg, [port]);
const { from, to, exclusive } = msg;
if (from) {
const promised = exclusive ? app.pool.capture() : app.pool.next();
const next = await promised.catch(() => {
const error = { message: 'No thread available' };
const back = app.threads.get(from);
const data = { id, status: 'error', error };
back.postMessage({ name: 'invoke', to: from, data });
return null;
});
if (!next) return;
next.postMessage(msg);
} else {
const back = app.threads.get(to);
back.postMessage(msg);
}
},

release: () => {
app.pool.release(worker);
},

terminate: (msg) => {
process.emit('TERMINATE', msg.code);
terminate: ({ code }) => {
process.emit('TERMINATE', code);
},
};

Expand Down
53 changes: 33 additions & 20 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const { node, metarhia, notLoaded, wt } = require('./deps.js');
const { MessageChannel, parentPort, threadId, workerData } = wt;
const { parentPort, threadId, workerData } = wt;

const application = require('./application.js');

Expand All @@ -22,17 +22,21 @@ process.on('warning', logError('warning'));
process.on('uncaughtException', logError('uncaughtException'));
process.on('unhandledRejection', logError('unhandledRejection'));

let callId = 0;
const calls = new Map();

const invoke = async ({ method, args, exclusive = false }) => {
const { port1: port, port2 } = new MessageChannel();
const data = { method, args };
const msg = { name: 'invoke', exclusive, data, port };
const id = ++callId;
const data = { type: 'call', id, method, args };
const msg = { name: 'invoke', from: threadId, exclusive, data };
return new Promise((resolve, reject) => {
port2.on('message', ({ error, data }) => {
port2.close();
const handler = ({ error, result }) => {
calls.delete(id);
if (error) reject(error);
else resolve(data);
});
parentPort.postMessage(msg, [port]);
else resolve(result);
};
calls.set(id, handler);
parentPort.postMessage(msg);
});
};

Expand All @@ -48,16 +52,22 @@ const handlers = {
process.exit(0);
},

invoke: async ({ exclusive, data, port }) => {
const { method, args } = data;
invoke: async ({ from, to, exclusive, data }) => {
const { id, status, error, method, args } = data;
if (to) {
const handler = calls.get(id);
const err = status === 'error' ? new Error(error.message) : null;
return void handler({ error: err });
}
const { sandbox, config } = application;
const msg = { name: 'invoke', to: from };
const { timeout } = config.server.workers;
const handler = metarhia.metautil.namespaceByPath(sandbox, method);
if (!handler) {
const error = new Error('Handler not found');
return void port.postMessage({ name: 'error', error });
const error = { message: 'Handler not found' };
const data = { id, status: 'error', error };
return void parentPort.postMessage({ ...msg, data });
}
const msg = { name: 'invoke', status: 'done' };
const { timeout } = config.server.workers;
try {
let result;
if (timeout) {
Expand All @@ -70,12 +80,15 @@ const handlers = {
} else {
result = await handler(args);
}
port.postMessage({ ...msg, data: result });
} catch (error) {
port.postMessage({ name: 'error', error });
application.console.error(error.stack);
const data = { id, status: 'done', result };
parentPort.postMessage({ ...msg, data });
} catch (err) {
const error = { message: err.message };
const data = { id, status: 'error', error };
parentPort.postMessage({ ...msg, data });
application.console.error(err.stack);
} finally {
if (exclusive) parentPort.postMessage(msg);
if (exclusive) parentPort.postMessage({ name: 'release' });
}
},
};
Expand Down

0 comments on commit ec28c22

Please sign in to comment.