Skip to content

Commit

Permalink
Merge pull request #794 from telefonicaid/task/usage_transport_endpoi…
Browse files Browse the repository at this point in the history
…nt_from_group2

usage transport endpoint from group
  • Loading branch information
fgalan authored Dec 20, 2023
2 parents 8bdba96 + 108b0d5 commit 436f5a1
Show file tree
Hide file tree
Showing 15 changed files with 540 additions and 139 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- Fix: protect access to multimeasure array
- Add: check and usage endpoint and transport from Group level when commands
31 changes: 21 additions & 10 deletions docs/usermanual.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ Examples of these `ngsiv2` payloads are the following ones:
...
}
```
```

````
Example of these `ngsild` payloads are the following ones:
Expand Down Expand Up @@ -212,7 +213,7 @@ Example of these `ngsild` payloads are the following ones:
},
...
]
```
````

(2) NGSI-LD single entity format:

Expand Down Expand Up @@ -254,8 +255,16 @@ Example of these `ngsild` payloads are the following ones:
Some additional considerations to take into account:

- In the case of array of entities, they are handled as a multiple measure, i.e. each entity is a measure.
- The `type` of the attribute is the one used in the provision of the attribute, not the one in the measure. The exception is the autoprovisioned devices case, in which case the `type` of the attribute is taken from the measure (given the attribute lacks proviosioned type). In this latter case, if the attribute `type` is not included in the measure the [explicit type omission rules for Context Broker](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#partial-representations) are also taken into account in this case.
- In the case of NGSI-LD, fields different from `type`, `value` or `object` (e.g. `observedAt` in the examples above) are include as NGSI-v2 metadata in the entity corresponding to the measure at Context Broker. Note IOTA doesn't provide the `type` for that metadata, so the Context Broker applies [a default type based in the metadata `value` JSON type](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#partial-representations).
- The `type` of the attribute is the one used in the provision of the attribute, not the one in the measure. The
exception is the autoprovisioned devices case, in which case the `type` of the attribute is taken from the measure
(given the attribute lacks proviosioned type). In this latter case, if the attribute `type` is not included in the
measure the
[explicit type omission rules for Context Broker](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#partial-representations)
are also taken into account in this case.
- In the case of NGSI-LD, fields different from `type`, `value` or `object` (e.g. `observedAt` in the examples above)
are include as NGSI-v2 metadata in the entity corresponding to the measure at Context Broker. Note IOTA doesn't
provide the `type` for that metadata, so the Context Broker applies
[a default type based in the metadata `value` JSON type](https://github.com/telefonicaid/fiware-orion/blob/master/doc/manuals/orion-api.md#partial-representations).

##### SOAP-XML Measure reporting

Expand Down Expand Up @@ -412,8 +421,8 @@ and
[Practice: Scenario 3: commands - error](https://github.com/telefonicaid/iotagent-node-lib/blob/master/doc/northboundinteractions.md#scenario-3-commands-error).

MQTT devices commands are always push. For HTTP Devices commands to be push they **must** be provisioned with the
`endpoint` attribute, that will contain the URL where the IoT Agent will send the received commands. Otherwise the
command will be poll. When using the HTTP transport, the command handling have two flavours:
`endpoint` attribute, from device or group device, that will contain the URL where the IoT Agent will send the received
commands. Otherwise the command will be poll. When using the HTTP transport, the command handling have two flavours:

- **Push commands**: The request payload format will be a plain JSON, as described in the "Payload" section. The
device will reply with a 200OK response containing the result of the command in the JSON result format. Example of
Expand Down Expand Up @@ -587,7 +596,8 @@ commands and a topic to receive configuration information. This mechanism can be
configuration flag, `configRetrieval`.

In case of MQTT to retrieve configuration parameters from the Context Broker, it is required that the device should be
provisioned using "MQTT" as transport key. By default it will be considered "HTTP" as transport.
provisioned using "MQTT" as transport key, at device or group level. By default it will be considered "HTTP" as
transport if none transport is defined at device or group level.

The parameter will be given as follows:

Expand Down Expand Up @@ -985,9 +995,10 @@ simple restart should be enough).

In order to distinguish which device uses which attribute, a new field, `transport`, will be added to the device
provisioning. When a command or a notification arrives to the IoTAgent, this field is read to guess what plugin to
invoke in order to execute the requested task. If the field is not found, the value of the configuration parameter
`defaultTransport` will be used instead. In order to associate a module with a device, the value of the `transport`
attribute of the device provisioning must match the value of the `protocol` field of the binding.
invoke in order to execute the requested task. If the field is not found, the same field is search in configuration
group and then used, but if not the value of the configuration parameter `defaultTransport` will be used instead. In
order to associate a module with a device, the value of the `transport` attribute of the device provisioning must match
the value of the `protocol` field of the binding.

### API

Expand Down
4 changes: 2 additions & 2 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ let amqpChannel;
* @param {Object} device Data object for the device receiving the command.
* @param {String} serializedPayload String payload in JSON format for the command.
*/
function executeCommand(apiKey, device, cmdName, serializedPayload, contentType, callback) {
function executeCommand(apiKey, group, device, cmdName, serializedPayload, contentType, callback) {
config
.getLogger()
.debug(
Expand Down Expand Up @@ -258,7 +258,7 @@ function deviceUpdatingHandler(device, callback) {
* @param {String} deviceId ID of the Device.
* @param {Object} results Context Broker response.
*/
function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
function sendConfigurationToDevice(apiKey, group, deviceId, results, callback) {
callback();
}

Expand Down
65 changes: 45 additions & 20 deletions lib/bindings/HTTPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ function parseDataMultipleMeasure(req, res, next) {
}
}

function executeCommand(apiKey, device, cmdName, serializedPayload, contentType, callback) {
function executeCommand(apiKey, group, device, cmdName, serializedPayload, contentType, callback) {
const options = {
url: device.endpoint,
url: device.endpoint || (group && group.endpoint ? group.endpoint : undefined),
method: 'POST',
body: serializedPayload,
headers: {
Expand All @@ -213,8 +213,8 @@ function executeCommand(apiKey, device, cmdName, serializedPayload, contentType,
'content-type': contentType
}
};
if (device.endpoint) {
// device.endpoint or another field like device.endpointExp ?
if (options.endpoint) {
// endpoint could be an expression
const parser = iotAgentLib.dataPlugins.expressionTransformation;
let attrList = iotAgentLib.dataPlugins.utils.getIdTypeServSubServiceFromDevice(device);
attrList = device.staticAttributes ? attrList.concat(device.staticAttributes) : attrList.concat([]);
Expand All @@ -223,11 +223,11 @@ function executeCommand(apiKey, device, cmdName, serializedPayload, contentType,
// expression result will be the full command payload
let endpointRes = null;
try {
endpointRes = parser.applyExpression(device.endpoint, ctxt, device);
endpointRes = parser.applyExpression(options.endpoint, ctxt, device);
} catch (e) {
// no error should be reported
}
options.url = endpointRes ? endpointRes : device.endpoint;
options.url = endpointRes ? endpointRes : options.endpoint;
}
if (config.getConfig().http.timeout) {
options.timeout = config.getConfig().http.timeout;
Expand Down Expand Up @@ -404,7 +404,7 @@ function handleIncomingMeasure(req, res, next) {
}
}

iotaUtils.retrieveDevice(req.deviceId, req.apiKey, transport, processDeviceMeasure);
iotaUtils.retrieveDevice(req.deviceId, req.apiKey, processDeviceMeasure);
}

function isCommand(req, res, next) {
Expand All @@ -418,7 +418,7 @@ function isCommand(req, res, next) {
next();
}

function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
function sendConfigurationToDevice(apiKey, group, deviceId, results, callback) {
function handleDeviceResponse(innerCallback) {
return function (error, response, body) {
if (error) {
Expand All @@ -431,9 +431,12 @@ function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
};
}

function sendRequest(device, results, innerCallback) {
function sendRequest(device, group, results, innerCallback) {
const resultRequest = {
url: device.endpoint + constants.HTTP_CONFIGURATION_PATH,
url:
(device.endpoint || (group && group.endpoint ? group.endpoint : undefined)) +
constants.HTTP_CONFIGURATION_PATH,

method: 'POST',
json: iotaUtils.createConfigurationNotification(results),
headers: {
Expand All @@ -445,13 +448,13 @@ function sendConfigurationToDevice(apiKey, deviceId, results, callback) {

request(resultRequest, handleDeviceResponse(innerCallback));
}
iotaUtils.retrieveDevice(deviceId, apiKey, transport, function (error, device) {
iotaUtils.retrieveDevice(deviceId, apiKey, function (error, device) {
if (error) {
callback(error);
} else if (!device.endpoint) {
callback(new errors.EndpointNotFound(device.id));
} else {
sendRequest(device, results, callback);
sendRequest(device, group, results, callback);
}
});
}
Expand All @@ -464,7 +467,7 @@ function handleConfigurationRequest(req, res, next) {
res.status(200).json({});
}
}
iotaUtils.retrieveDevice(req.deviceId, req.apiKey, transport, function (error, device) {
iotaUtils.retrieveDevice(req.deviceId, req.apiKey, function (error, device) {
if (error) {
next(error);
} else {
Expand Down Expand Up @@ -499,17 +502,19 @@ function handleError(error, req, res, next) {
* Just fills in the transport protocol in case there is none and polling if endpoint.
*
* @param {Object} device Device object containing all the information about the device.
* @param {Object} group Group object containing all the information about the device.
*/
function setPollingAndDefaultTransport(device, callback) {
function setPollingAndDefaultTransport(device, group, callback) {
config.getLogger().debug(context, 'httpbinding.setPollingAndDefaultTransport device %j group %j', device, group);
if (!device.transport) {
device.transport = 'HTTP';
device.transport = group && group.transport ? group.transport : 'HTTP';
}

if (device.transport === 'HTTP') {
if (device.endpoint) {
device.polling = false;
} else {
device.polling = true;
device.polling = !(group && group.endpoint);
}
}

Expand All @@ -522,8 +527,18 @@ function setPollingAndDefaultTransport(device, callback) {
* @param {Object} device Device object containing all the information about the provisioned device.
*/
function deviceProvisioningHandler(device, callback) {
config.getLogger().debug(context, 'httpbinding.deviceProvisioningHandler %j', device);
setPollingAndDefaultTransport(device, callback);
config.getLogger().debug(context, 'httpbinding.deviceProvisioningHandler device %j', device);
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', device.apikey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
config.getLogger().debug(context, 'httpbinding.deviceProvisioningHandler group %j', group);
setPollingAndDefaultTransport(device, group, callback);
});
}

/**
Expand All @@ -532,8 +547,18 @@ function deviceProvisioningHandler(device, callback) {
* @param {Object} device Device object containing all the information about the updated device.
*/
function deviceUpdatingHandler(device, callback) {
config.getLogger().debug(context, 'httpbinding.deviceUpdatingHandler %j', device);
setPollingAndDefaultTransport(device, callback);
config.getLogger().debug(context, 'httpbinding.deviceUpdatingHandler device %j', device);
let group = {};
iotAgentLib.getConfigurationSilently(config.getConfig().iota.defaultResource || '', device.apikey, function (
error,
foundGroup
) {
if (!error) {
group = foundGroup;
}
config.getLogger().debug(context, 'httpbinding.deviceUpdatingHandler group %j', group);
setPollingAndDefaultTransport(device, group, callback);
});
}

/**
Expand Down
5 changes: 3 additions & 2 deletions lib/bindings/MQTTBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ function recreateSubscriptions(callback) {
* @param {String} deviceId ID of the Device.
* @param {Object} results Context Broker response.
*/
function sendConfigurationToDevice(apiKey, deviceId, results, callback) {
function sendConfigurationToDevice(apiKey, group, deviceId, results, callback) {
const configurations = iotaUtils.createConfigurationNotification(results);
const options = {};
context = fillService(context, { service: 'n/a', subservice: 'n/a' });
Expand Down Expand Up @@ -369,10 +369,11 @@ function stop(callback) {
* JSON payload (already containing the command information).
*
* @param {String} apiKey APIKey of the device that will be receiving the command.
* @param {Object} group Data object for the group receiving the command.
* @param {Object} device Data object for the device receiving the command.
* @param {String} serializedPayload String payload in JSON format for the command.
*/
function executeCommand(apiKey, device, cmdName, serializedPayload, contentType, callback) {
function executeCommand(apiKey, group, device, cmdName, serializedPayload, contentType, callback) {
const options = {};
// retrieve command mqtt options from device
const commands = Object.assign({}, ...device.commands.map((c) => ({ [c.name]: c })));
Expand Down
35 changes: 24 additions & 11 deletions lib/commandHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,18 @@ function serializedPayloadCommand(payload, command) {
* @param {Object} attribute Attribute in NGSI format.
* @return {Function} Command execution function ready to be called with async.series.
*/
function generateCommandExecution(apiKey, device, attribute) {
function generateCommandExecution(apiKey, device, group, attribute) {
let payload = {};
let command = device && device.commands.find((att) => att.name === attribute.name);
const command = device && device.commands.find((att) => att.name === attribute.name);

if (command && command.expression) {
let parser = iotAgentLib.dataPlugins.expressionTransformation;
const parser = iotAgentLib.dataPlugins.expressionTransformation;
// The context for the JEXL expression should be the ID, TYPE, S, SS
let attrList = iotAgentLib.dataPlugins.utils.getIdTypeServSubServiceFromDevice(device);
attrList = device.staticAttributes
? attrList.concat(device.staticAttributes).concat(attribute)
: attrList.concat(attribute);
let ctxt = parser.extractContext(attrList, device);
const ctxt = parser.extractContext(attrList, device);
// expression result will be the full command payload
let payloadRes = null;
try {
Expand All @@ -109,13 +110,13 @@ function generateCommandExecution(apiKey, device, attribute) {
apiKey,
payload
);

const executions = transportSelector.createExecutionsForBinding(
[apiKey, device, attribute.name, serialized, contentType],
[apiKey, group, device, attribute.name, serialized, contentType],
'executeCommand',
device.transport || config.getConfig().defaultTransport
device.transport ||
(group && group.transport ? group.transport : undefined) ||
config.getConfig().defaultTransport
);

return executions;
}

Expand Down Expand Up @@ -157,9 +158,21 @@ function commandHandler(id, type, service, subservice, attributes, callback) {
if (error) {
callback(error);
} else {
async.series(
attributes.map(generateCommandExecution.bind(null, apiKey, device)).reduce(concat, []),
callback
let group = {};
iotAgentLib.getConfigurationSilently(
config.getConfig().iota.defaultResource || '',
apiKey,
function (error, foundGroup) {
if (!error) {
group = foundGroup;
}
async.series(
attributes
.map(generateCommandExecution.bind(null, apiKey, device, group))
.reduce(concat, []),
callback
);
}
);
}
});
Expand Down
Loading

0 comments on commit 436f5a1

Please sign in to comment.