diff --git a/CHANGELOG.md b/CHANGELOG.md index f08c883..2c959bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - turn into esm with exports for node - build with node > 18, should still work with earlier versions but preceed with caution and make tests - remove eslint formatting rules in favor of prettier, touched basically all files but now it is "pretty" +- prototype `ProcessOutputDataObject` and make properties id and type readonly # 19.0.1 diff --git a/lib/index.cjs b/lib/index.cjs index 610d320..fe01823 100644 --- a/lib/index.cjs +++ b/lib/index.cjs @@ -46,7 +46,7 @@ function Scripts(disableDummy) { this.disableDummy = disableDummy; } -Scripts.prototype.register = function register({id, type, behaviour, logger, environment}) { +Scripts.prototype.register = function register({ id, type, behaviour, logger, environment }) { let scriptBody, language; switch (type) { @@ -79,20 +79,20 @@ Scripts.prototype.register = function register({id, type, behaviour, logger, env return script; }; -Scripts.prototype.getScript = function getScript(language, {id}) { +Scripts.prototype.getScript = function getScript(language, { id }) { return this.scripts[id]; }; function JavaScript(language, filename, scriptBody, environment) { this.id = filename; - this.script = new vm.Script(scriptBody, {filename}); + this.script = new vm.Script(scriptBody, { filename }); this.language = language; this.environment = environment; } JavaScript.prototype.execute = function execute(executionContext, callback) { const timers = this.environment.timers.register(executionContext); - return this.script.runInNewContext({...executionContext, ...timers, next: callback}); + return this.script.runInNewContext({ ...executionContext, ...timers, next: callback }); }; function DummyScript(language, filename, logger) { @@ -103,7 +103,7 @@ function DummyScript(language, filename, logger) { } DummyScript.prototype.execute = function execute(executionContext, callback) { - const {id, executionId} = executionContext.content; + const { id, executionId } = executionContext.content; this.logger.debug(`<${executionId} (${id})> passthrough dummy script ${this.language || 'esperanto'}`); callback(); }; @@ -119,34 +119,50 @@ function getOptionsAndCallback(optionsOrCallback, callback) { return [options, callback]; } -function ProcessOutputDataObject(dataObjectDef, {environment}) { - const {id, type, name, behaviour, parent} = dataObjectDef; - - const source = { - id, - name, - type, - behaviour, - parent, - read(broker, exchange, routingKeyPrefix, messageProperties) { - const value = environment.variables.data && environment.variables.data[id]; - return broker.publish(exchange, `${routingKeyPrefix}response`, {id, name, type, value}, messageProperties); - }, - write(broker, exchange, routingKeyPrefix, value, messageProperties) { - environment.variables.data = environment.variables.data || {}; - environment.variables.data[id] = value; +const kDataObjectDef = Symbol.for('data object definition'); + +function ProcessOutputDataObject(dataObjectDef, { environment }) { + this[kDataObjectDef] = dataObjectDef; + this.environment = environment; + this.behaviour = dataObjectDef.behaviour; + this.name = dataObjectDef.name; + this.parent = dataObjectDef.parent; +} - environment.output.data = environment.output.data || {}; - environment.output.data[id] = value; - return broker.publish(exchange, `${routingKeyPrefix}response`, {id, name, type, value}, messageProperties); +Object.defineProperties(ProcessOutputDataObject.prototype, { + id: { + get() { + return this[kDataObjectDef].id; }, - }; + }, + type: { + get() { + return this[kDataObjectDef].type; + }, + }, +}); - return source; -} +ProcessOutputDataObject.prototype.read = function readDataObject(broker, exchange, routingKeyPrefix, messageProperties) { + const environment = this.environment; + const { id, name, type } = this; + const value = environment.variables.data && environment.variables.data[this.id]; + return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); +}; + +ProcessOutputDataObject.prototype.write = function writeDataObject(broker, exchange, routingKeyPrefix, value, messageProperties) { + const environment = this.environment; + const { id, name, type } = this; + + environment.variables.data = environment.variables.data || {}; + environment.variables.data[id] = value; + + environment.output.data = environment.output.data || {}; + environment.output.data[id] = value; + return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); +}; const nodeRequire = module$1.createRequire(url.fileURLToPath((typeof document === 'undefined' ? require('u' + 'rl').pathToFileURL(__filename).href : (_documentCurrentScript && _documentCurrentScript.src || new URL('index.cjs', document.baseURI).href)))); -const {version: engineVersion} = nodeRequire('../package.json'); +const { version: engineVersion } = nodeRequire('../package.json'); const kEngine = Symbol.for('engine'); const kEnvironment = Symbol.for('environment'); @@ -165,29 +181,32 @@ function Engine(options = {}) { events.EventEmitter.call(this); - const opts = this.options = { + const opts = (this.options = { Logger: Logger, scripts: new Scripts(options.disableDummyScript), ...options, - }; + }); this.logger = opts.Logger('engine'); - this[kTypeResolver] = serializer.TypeResolver({ - ...Elements__namespace, - ...opts.elements, - }, opts.typeResolver || defaultTypeResolver); + this[kTypeResolver] = serializer.TypeResolver( + { + ...Elements__namespace, + ...opts.elements, + }, + opts.typeResolver || defaultTypeResolver + ); this[kEnvironment] = new Elements__namespace.Environment(opts); - const broker = this.broker = new smqp.Broker(this); - broker.assertExchange('event', 'topic', {autoDelete: false}); + const broker = (this.broker = new smqp.Broker(this)); + broker.assertExchange('event', 'topic', { autoDelete: false }); this[kExecution] = null; this[kLoadedDefinitions] = null; this[kSources] = []; - const pendingSources = this[kPendingSources] = []; + const pendingSources = (this[kPendingSources] = []); if (opts.source) pendingSources.push(this._serializeSource(opts.source)); if (opts.moddleContext) pendingSources.push(this._serializeModdleContext(opts.moddleContext)); if (opts.sourceContext) pendingSources.push(opts.sourceContext); @@ -252,7 +271,7 @@ Engine.prototype.execute = async function execute(...args) { throw err; } - const execution = this[kExecution] = new Execution(this, definitions, this.options); + const execution = (this[kExecution] = new Execution(this, definitions, this.options)); return execution._execute(executeOptions, callback); }; @@ -279,7 +298,7 @@ Engine.prototype.recover = function recover(savedState, recoverOptions) { const preSources = pendingSources.splice(0); const typeResolver = this[kTypeResolver]; - const loadedDefinitions = this[kLoadedDefinitions] = savedState.definitions.map((dState) => { + const loadedDefinitions = (this[kLoadedDefinitions] = savedState.definitions.map((dState) => { let source; if (dState.source) source = serializer.deserialize(JSON.parse(dState.source), typeResolver); else source = preSources.find((s) => s.id === dState.id); @@ -292,7 +311,7 @@ Engine.prototype.recover = function recover(savedState, recoverOptions) { definition.recover(dState); return definition; - }); + })); this[kExecution] = new Execution(this, loadedDefinitions, {}, true); @@ -316,16 +335,16 @@ Engine.prototype.resume = async function resume(...args) { return execution._resume(resumeOptions, callback); }; -Engine.prototype.addSource = function addSource({sourceContext: addContext} = {}) { +Engine.prototype.addSource = function addSource({ sourceContext: addContext } = {}) { if (!addContext) return; const loadedDefinitions = this[kLoadedDefinitions]; if (loadedDefinitions) loadedDefinitions.splice(0); this[kPendingSources].push(addContext); }; -Engine.prototype.getDefinitions = async function getDefinitions(executeOptions) { +Engine.prototype.getDefinitions = function getDefinitions(executeOptions) { const loadedDefinitions = this[kLoadedDefinitions]; - if (loadedDefinitions?.length) return loadedDefinitions; + if (loadedDefinitions?.length) return Promise.resolve(loadedDefinitions); return this._loadDefinitions(executeOptions); }; @@ -360,27 +379,30 @@ Engine.prototype.waitFor = function waitFor(eventName) { Engine.prototype._loadDefinitions = async function loadDefinitions(executeOptions) { const runSources = await Promise.all(this[kPendingSources]); - const loadedDefinitions = this[kLoadedDefinitions] = runSources.map((source) => this._loadDefinition(source, executeOptions)); + const loadedDefinitions = (this[kLoadedDefinitions] = runSources.map((source) => this._loadDefinition(source, executeOptions))); return loadedDefinitions; }; Engine.prototype._loadDefinition = function loadDefinition(serializedContext, executeOptions = {}) { - const {settings, variables} = executeOptions; + const { settings, variables } = executeOptions; const environment = this.environment; - const context = new Elements__namespace.Context(serializedContext, environment.clone({ - listener: environment.options.listener, - ...executeOptions, - settings: { - ...environment.settings, - ...settings, - }, - variables: { - ...environment.variables, - ...variables, - }, - source: serializedContext, - })); + const context = new Elements__namespace.Context( + serializedContext, + environment.clone({ + listener: environment.options.listener, + ...executeOptions, + settings: { + ...environment.settings, + ...settings, + }, + variables: { + ...environment.variables, + ...variables, + }, + source: serializedContext, + }) + ); return new Elements__namespace.Definition(context); }; @@ -410,7 +432,7 @@ function Execution(engine, definitions, options, isRecovered = false) { this[kEnvironment] = engine.environment; this[kEngine] = engine; this[kExecuting] = []; - const onBrokerReturn = this[kOnBrokerReturn] = this._onBrokerReturn.bind(this); + const onBrokerReturn = (this[kOnBrokerReturn] = this._onBrokerReturn.bind(this)); engine.broker.on('return', onBrokerReturn); } @@ -435,29 +457,10 @@ Object.defineProperties(Execution.prototype, { return this[kEnvironment]; }, }, -}); - -Object.defineProperty(Execution.prototype, 'activityStatus', { - get() { - let status = 'idle'; - const running = this[kExecuting]; - if (!running.length) return status; - - for (const def of running) { - const bpStatus = def.activityStatus; - switch (def.activityStatus) { - case 'executing': - return bpStatus; - case 'timer': - status = bpStatus; - break; - case 'wait': - if (status === 'idle') status = bpStatus; - break; - } - } - - return status; + activityStatus: { + get() { + return this._getActivityStatus(); + }, }, }); @@ -505,20 +508,35 @@ Execution.prototype._addConsumerCallbacks = function addConsumerCallbacks(callba clearConsumers(); - broker.subscribeOnce('event', 'engine.stop', () => { - clearConsumers(); - return callback(null, this); - }, {consumerTag: 'ctag-cb-stop'}); - - broker.subscribeOnce('event', 'engine.end', () => { - clearConsumers(); - return callback(null, this); - }, {consumerTag: 'ctag-cb-end'}); - - broker.subscribeOnce('event', 'engine.error', (_, message) => { - clearConsumers(); - return callback(message.content); - }, {consumerTag: 'ctag-cb-error'}); + broker.subscribeOnce( + 'event', + 'engine.stop', + () => { + clearConsumers(); + return callback(null, this); + }, + { consumerTag: 'ctag-cb-stop' } + ); + + broker.subscribeOnce( + 'event', + 'engine.end', + () => { + clearConsumers(); + return callback(null, this); + }, + { consumerTag: 'ctag-cb-end' } + ); + + broker.subscribeOnce( + 'event', + 'engine.error', + (_, message) => { + clearConsumers(); + return callback(message.content); + }, + { consumerTag: 'ctag-cb-error' } + ); return callback; @@ -554,7 +572,7 @@ Execution.prototype._setup = function setup(setupOptions = {}) { for (const definition of this.definitions) { if (listener) definition.environment.options.listener = listener; - const {queueName} = definition.broker.subscribeTmp('event', 'definition.#', onChildMessage, {noAck: true, consumerTag: '_engine_definition'}); + const { queueName } = definition.broker.subscribeTmp('event', 'definition.#', onChildMessage, { noAck: true, consumerTag: '_engine_definition' }); definition.broker.bindQueue(queueName, 'event', 'process.#'); definition.broker.bindQueue(queueName, 'event', 'activity.#'); definition.broker.bindQueue(queueName, 'event', 'flow.#'); @@ -562,7 +580,7 @@ Execution.prototype._setup = function setup(setupOptions = {}) { }; Execution.prototype._onChildMessage = function onChildMessage(routingKey, message, owner) { - const {environment: ownerEnvironment} = owner; + const { environment: ownerEnvironment } = owner; const listener = ownerEnvironment.options?.listener; this[kState] = 'running'; @@ -611,7 +629,7 @@ Execution.prototype._onChildMessage = function onChildMessage(routingKey, messag switch (key) { case 'data': { environment.output.data = environment.output.data || {}; - environment.output.data = {...environment.output.data, ...message.content.output.data}; + environment.output.data = { ...environment.output.data, ...message.content.output.data }; break; } default: { @@ -626,7 +644,7 @@ Execution.prototype._onChildMessage = function onChildMessage(routingKey, messag if (listener) listener.emit(routingKey, elementApi, this); const broker = this.broker; - broker.publish('event', routingKey, {...message.content}, {...message.properties, mandatory: false}); + broker.publish('event', routingKey, { ...message.content }, { ...message.properties, mandatory: false }); if (!newState) return; @@ -635,13 +653,13 @@ Execution.prototype._onChildMessage = function onChildMessage(routingKey, messag switch (newState) { case 'stopped': this._debug('stopped'); - return this._complete('stop', {}, {type: 'stop'}); + return this._complete('stop', {}, { type: 'stop' }); case 'idle': this._debug('completed'); - return this._complete('end', {}, {type: 'end'}); + return this._complete('end', {}, { type: 'end' }); case 'error': this._debug('error'); - return this._complete('error', message.content.error, {type: 'error', mandatory: true}); + return this._complete('error', message.content.error, { type: 'error', mandatory: true }); } }; @@ -694,7 +712,7 @@ Execution.prototype.getPostponed = function getPostponed() { }, []); }; -Execution.prototype.signal = function signal(payload, {ignoreSameDefinition} = {}) { +Execution.prototype.signal = function signal(payload, { ignoreSameDefinition } = {}) { for (const definition of this[kExecuting]) { if (ignoreSameDefinition && payload?.parent?.id === definition.id) continue; definition.signal(payload); @@ -721,6 +739,29 @@ Execution.prototype._debug = function debug(msg) { this[kEngine].logger.debug(`<${this.name}> ${msg}`); }; +Execution.prototype._getActivityStatus = function getActivityStatus() { + let status = 'idle'; + const running = this[kExecuting]; + if (!running.length) return status; + else if (running.length === 1) return running[0].activityStatus; + + for (const def of running) { + const bpStatus = def.activityStatus; + switch (def.activityStatus) { + case 'executing': + return bpStatus; + case 'timer': + status = bpStatus; + break; + case 'wait': + if (status === 'idle') status = bpStatus; + break; + } + } + + return status; +}; + exports.Engine = Engine; exports.Execution = Execution; exports.JavaScripts = Scripts; diff --git a/src/extensions/ProcessOutputDataObject.js b/src/extensions/ProcessOutputDataObject.js index fb87d92..0dcd151 100644 --- a/src/extensions/ProcessOutputDataObject.js +++ b/src/extensions/ProcessOutputDataObject.js @@ -1,25 +1,41 @@ +const kDataObjectDef = Symbol.for('data object definition'); + export default function ProcessOutputDataObject(dataObjectDef, { environment }) { - const { id, type, name, behaviour, parent } = dataObjectDef; + this[kDataObjectDef] = dataObjectDef; + this.environment = environment; + this.behaviour = dataObjectDef.behaviour; + this.name = dataObjectDef.name; + this.parent = dataObjectDef.parent; +} - const source = { - id, - name, - type, - behaviour, - parent, - read(broker, exchange, routingKeyPrefix, messageProperties) { - const value = environment.variables.data && environment.variables.data[id]; - return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); +Object.defineProperties(ProcessOutputDataObject.prototype, { + id: { + get() { + return this[kDataObjectDef].id; }, - write(broker, exchange, routingKeyPrefix, value, messageProperties) { - environment.variables.data = environment.variables.data || {}; - environment.variables.data[id] = value; - - environment.output.data = environment.output.data || {}; - environment.output.data[id] = value; - return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); + }, + type: { + get() { + return this[kDataObjectDef].type; }, - }; + }, +}); - return source; -} +ProcessOutputDataObject.prototype.read = function readDataObject(broker, exchange, routingKeyPrefix, messageProperties) { + const environment = this.environment; + const { id, name, type } = this; + const value = environment.variables.data && environment.variables.data[this.id]; + return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); +}; + +ProcessOutputDataObject.prototype.write = function writeDataObject(broker, exchange, routingKeyPrefix, value, messageProperties) { + const environment = this.environment; + const { id, name, type } = this; + + environment.variables.data = environment.variables.data || {}; + environment.variables.data[id] = value; + + environment.output.data = environment.output.data || {}; + environment.output.data[id] = value; + return broker.publish(exchange, `${routingKeyPrefix}response`, { id, name, type, value }, messageProperties); +};