diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index 59e92b4e9c..606559e7a5 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -21,6 +21,7 @@ Fixed Issues: * Issue #280 New URI param "?local=true", to GET only local entities in GET /entities * Issue #280 Support for Prometheus metrics (4 counters for now) * Issue #280 Adding tenant to response wherever Location is present +* Issue #1178 MQTT notifications also when "-notificationMode != threadpool" New Features: * Concise input/output format - more compact but still lossless - see NGSI-LD API spec v1.6.1 for details diff --git a/src/app/orionld/orionld.cpp b/src/app/orionld/orionld.cpp index e24fb40d70..f030eca51c 100644 --- a/src/app/orionld/orionld.cpp +++ b/src/app/orionld/orionld.cpp @@ -604,16 +604,12 @@ static void contextBrokerInit(std::string dbPrefix, bool multitenant) int rc = pQNotifier->start(); if (rc != 0) - { - LM_X(1,("Runtime Error starting notification queue workers (%d)", rc)); - } + LM_X(1, ("Runtime Error starting notification queue workers (%d)", rc)); pNotifier = pQNotifier; } else - { pNotifier = new Notifier(); - } /* Set notifier object (singleton) */ setNotifier(pNotifier); @@ -703,51 +699,31 @@ static SemOpType policyGet(std::string mutexPolicy) * * notificationModeParse - */ -static void notificationModeParse(char *notifModeArg, int *pQueueSize, int *pNumThreads) +static void notificationModeParse(char* notificationMode, int* pQueueSize, int* pNumThreads) { - char* mode; - char* first_colon; - int flds_num; - errno = 0; - // notifModeArg is a char[64], pretty sure not a huge input to break sscanf - // cppcheck-suppress invalidscanf - flds_num = sscanf(notifModeArg, "%m[^:]:%d:%d", &mode, pQueueSize, pNumThreads); - if (errno != 0) + if (strncmp(notificationMode, "threadpool", 10) == 0) { - LM_X(1, ("Fatal Error parsing notification mode: sscanf (%s)", strerror(errno))); - } - if (flds_num == 3 && strcmp(mode, "threadpool") == 0) - { - if (*pQueueSize <= 0) + if (notificationMode[10] == 0) { - LM_X(1, ("Fatal Error parsing notification mode: invalid queue size (%d)", *pQueueSize)); + *pQueueSize = DEFAULT_NOTIF_QS; + *pNumThreads = DEFAULT_NOTIF_TN; + return; } - if (*pNumThreads <= 0) + else if (notificationMode[10] == ':') { - LM_X(1, ("Fatal Error parsing notification mode: invalid number of threads (%d)",*pNumThreads)); - } - } - else if (flds_num == 1 && strcmp(mode, "threadpool") == 0) - { - *pQueueSize = DEFAULT_NOTIF_QS; - *pNumThreads = DEFAULT_NOTIF_TN; - } - else if (!( - flds_num == 1 && - (strcmp(mode, "transient") == 0 || strcmp(mode, "persistent") == 0) - )) - { - LM_X(1, ("Fatal Error parsing notification mode: invalid mode (%s)", notifModeArg)); - } + notificationMode[10] = 0; + char* colon2 = strchr(¬ificationMode[11], ':'); - // get rid of params, if any, in notifModeArg - first_colon = strchr(notifModeArg, ':'); - if (first_colon != NULL) - { - *first_colon = '\0'; + if (colon2 == NULL) + LM_X(1, ("Invalid notificationMode (first colon found, second is missing)")); + *pQueueSize = atoi(¬ificationMode[11]); + *pNumThreads = atoi(&colon2[1]); + } + else + LM_X(1, ("Invalid notificationMode '%s'", notificationMode)); } - - free(mode); + else if ((strcmp(notificationMode, "transient") != 0) && (strcmp(notificationMode, "persistent") != 0)) + LM_X(1, ("Invalid notificationMode '%s'", notificationMode)); } diff --git a/src/lib/ngsiNotify/Notifier.cpp b/src/lib/ngsiNotify/Notifier.cpp index 5d4a72c81d..5f103c2f4b 100644 --- a/src/lib/ngsiNotify/Notifier.cpp +++ b/src/lib/ngsiNotify/Notifier.cpp @@ -255,8 +255,6 @@ static std::vector* buildSenderParamsCustom return paramsV; // empty vector } - LM(("1178: url: '%s'", url.c_str())); - // // 3. Payload // @@ -547,14 +545,12 @@ std::vector* Notifier::buildSenderParams std::string uriPath; std::string protocol; - LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str())); if (strncmp(httpInfo.url.c_str(), "mqtt", 4) == 0) { host = subP->httpInfo.mqtt.host; port = subP->httpInfo.mqtt.port; uriPath = subP->httpInfo.mqtt.topic; protocol = (char*) ((subP->httpInfo.mqtt.mqtts == false)? "mqtt" : "mqtts"); - LM(("1178: protocol: '%s'", protocol.c_str())); } else if (!parseUrl(httpInfo.url, host, port, uriPath, protocol)) { @@ -571,7 +567,6 @@ std::vector* Notifier::buildSenderParams SenderThreadParams* params = new SenderThreadParams(); - LM(("1178: protocol: '%s'", protocol.c_str())); params->ip = host; params->port = port; params->protocol = protocol; @@ -662,7 +657,6 @@ std::vector* Notifier::buildSenderParams } #endif - LM(("1178: protocol: '%s'", params->protocol.c_str())); paramsV->push_back(params); return paramsV; } diff --git a/src/lib/ngsiNotify/QueueWorkers.cpp b/src/lib/ngsiNotify/QueueWorkers.cpp index 8fd3659131..e5d1791634 100644 --- a/src/lib/ngsiNotify/QueueWorkers.cpp +++ b/src/lib/ngsiNotify/QueueWorkers.cpp @@ -126,18 +126,6 @@ static void* workerFunc(void* pSyncQ) // strcpy(transactionId, params->transactionId); - LM(("1178: protocol: '%s'", params->protocol.c_str())); - LM_T(LmtNotifier, ("worker sending '%s' message to: host='%s', port=%d, verb=%s, tenant='%s', service-path: '%s', xauthToken: '%s', path='%s', content-type: %s", - params->protocol.c_str(), - params->ip.c_str(), - params->port, - params->verb.c_str(), - params->tenant.c_str(), - params->servicePath.c_str(), - params->xauthToken.c_str(), - params->resource.c_str(), - params->content_type.c_str())); - int r = 0; if (simulatedNotification) @@ -149,7 +137,7 @@ static void* workerFunc(void* pSyncQ) { char* topic = (char*) params->resource.c_str(); - LM(("1178: Sending MQTT Notification")); + LM(("Sending MQTT Notification for subscription '%s'", params->subscriptionId.c_str())); r = mqttNotification(params->ip.c_str(), params->port, topic, @@ -170,7 +158,7 @@ static void* workerFunc(void* pSyncQ) if (ngsildSubscription == false) subscriptionId = NULL; - LM(("1178: Sending HTTP Notification")); + LM(("Sending HTTP Notification for subscription '%s'", params->subscriptionId.c_str())); r = httpRequestSendWithCurl(curl, params->ip, params->port, diff --git a/src/lib/ngsiNotify/senderThread.cpp b/src/lib/ngsiNotify/senderThread.cpp index 373ae64260..d32fc27685 100644 --- a/src/lib/ngsiNotify/senderThread.cpp +++ b/src/lib/ngsiNotify/senderThread.cpp @@ -29,6 +29,8 @@ #include "rest/httpRequestSend.h" #include "ngsiNotify/senderThread.h" #include "cache/subCache.h" +#include "orionld/common/orionldState.h" +#include "orionld/mqtt/mqttNotification.h" @@ -40,6 +42,13 @@ void* startSenderThread(void* p) { std::vector* paramsV = (std::vector*) p; + // Initialize kjson and kalloc libs - needed for MQTT + orionldStateInit(NULL); + orionldState.kjson.spacesPerIndent = 0; + orionldState.kjson.stringBeforeColon = (char*) ""; + orionldState.kjson.stringAfterColon = (char*) ""; + orionldState.kjson.nlString = (char*) ""; + for (unsigned ix = 0; ix < paramsV->size(); ix++) { SenderThreadParams* params = (SenderThreadParams*) (*paramsV)[ix]; @@ -58,27 +67,49 @@ void* startSenderThread(void* p) if (!simulatedNotification) { - std::string out; - int r; - - r = httpRequestSend(params->ip, - params->port, - params->protocol, - params->verb, - params->tenant.c_str(), - params->servicePath, - params->xauthToken.c_str(), - params->resource, - params->content_type, - params->content, - params->fiwareCorrelator, - params->renderFormat, - NOTIFICATION_WAIT_MODE, - &out, - params->extraHeaders, - "", - -1, - params->subscriptionId.c_str()); // Subscription ID as URL param + int r; + + if (strncmp(params->protocol.c_str(), "mqtt", 4) != 0) // Not MQTT, must be HTTP + { + std::string out; + + LM(("Sending HTTP Notification for subscription '%s'", params->subscriptionId.c_str())); + r = httpRequestSend(params->ip, + params->port, + params->protocol, + params->verb, + params->tenant.c_str(), + params->servicePath, + params->xauthToken.c_str(), + params->resource, + params->content_type, + params->content, + params->fiwareCorrelator, + params->renderFormat, + NOTIFICATION_WAIT_MODE, + &out, + params->extraHeaders, + "", + -1, + params->subscriptionId.c_str()); // Subscription ID as URL param + } + else + { + char* topic = (char*) params->resource.c_str(); + + LM(("Sending MQTT Notification for subscription '%s'", params->subscriptionId.c_str())); + r = mqttNotification(params->ip.c_str(), + params->port, + topic, + params->content.c_str(), + params->content_type.c_str(), + params->mqttQoS, + params->mqttUserName, + params->mqttPassword, + params->mqttVersion, + params->xauthToken.c_str(), + params->extraHeaders); + } if (params->toFree != NULL) { diff --git a/test/functionalTest/cases/0000_ngsild/ngsild_subscription_with_mqtt_and-senderThread.test b/test/functionalTest/cases/0000_ngsild/ngsild_subscription_with_mqtt_and-senderThread.test new file mode 100644 index 0000000000..9a47c5c9be --- /dev/null +++ b/test/functionalTest/cases/0000_ngsild/ngsild_subscription_with_mqtt_and-senderThread.test @@ -0,0 +1,219 @@ +# Copyright 2022 FIWARE Foundation e.V. +# +# This file is part of Orion-LD Context Broker. +# +# Orion-LD Context Broker is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Orion-LD Context Broker is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +# +# For those usages not covered by this license please contact with +# orionld at fiware dot org + +# VALGRIND_READY - to mark the test ready for valgrindTestSuite.sh + +--NAME-- +Notifications via MQTT using the sender thread (notificationMode transient) + +--SHELL-INIT-- +export BROKER=orionld +dbInit CB +brokerStart CB 0 IPv4 -notificationMode transient +mqttTestClientStart --mqttBrokerPort $MQTT_BROKER_PORT --mqttBrokerIp $MQTT_BROKER_HOST --pretty-print --mqttTopic example_mosquitto_topic + +--SHELL-- + +# +# 01. Create an entity age01_Car, to later be patched to match S1 +# 02. Create subscription S1 with MQTT broker as endpoint +# 03. PATCH E1 to match S1 +# 04. Dump the MQTT test client and see one notification +# + +echo "01. Create an entity age01_Car, to later be patched to match S1" +echo "===============================================================" +payload='{ + "id": "urn:ngsi-ld:Device:age01_Car", + "type": "Device", + "Accelerate_info": { + "type": "Property", + "value": { + "@type": "commandResult", + "@value": " " + } + }, + "Accelerate_status": { + "type": "Property", + "value": { + "@type": "commandStatus", + "@value": "UNKNOWN" + } + }, + "Acceleration": { + "type": "Property", + "value": 0 + }, + "EngineStopped": { + "type": "Property", + "value": true + }, + "Engine_Oxigen": { + "type": "Property", + "value": 219 + }, + "Engine_Temperature": { + "type": "Property", + "value": 20 + }, + "Error_info": { + "type": "Property", + "value": { + "@type": "commandResult", + "@value": " " + } + }, + "Error_status": { + "type": "Property", + "value": { + "@type": "commandStatus", + "@value": "UNKNOWN" + } + }, + "Stop_info": { + "type": "Property", + "value": { + "@type": "commandResult", + "@value": " " + } + }, + "Stop_status": { + "type": "Property", + "value": { + "@type": "commandStatus", + "@value": "UNKNOWN" + } + } +}' +orionCurl --url /ngsi-ld/v1/entities --payload "$payload" +echo +echo + + +echo "02. Create subscription S1 with MQTT broker as endpoint" +echo "=======================================================" +payload='{ + "id": "urn:ngsi-ld:subscription:S1", + "type": "Subscription", + "name": "Car-Subscription", + "description": "Car subscription", + "entities": [ + { + "id": "urn:ngsi-ld:Device:age01_Car", + "type": "Device" + } + ], + "watchedAttributes": [ + "Engine_Oxigen", + "Engine_Temperature" + ], + "notification": { + "attributes": [ + "Engine_Oxigen", + "Engine_Temperature" + ], + "format": "keyValues", + "endpoint": { + "uri": "mqtt://'$MQTT_BROKER_HOST':'$MQTT_BROKER_PORT'/example_mosquitto_topic" + } + } +}' +orionCurl --url /ngsi-ld/v1/subscriptions --payload "$payload" +echo +echo + + +echo "03. PATCH E1 to match S1" +echo "========================" +payload='{ + "Engine_Oxigen": { + "type": "Property", + "value": 220 + } +}' +orionCurl --url /ngsi-ld/v1/entities/urn:ngsi-ld:Device:age01_Car/attrs --payload "$payload" -X PATCH +echo +echo + + +echo "04. Dump the MQTT test client and see one notification" +echo "======================================================" +mqttTestClientDump example_mosquitto_topic +echo +echo + + +--REGEXPECT-- +01. Create an entity age01_Car, to later be patched to match S1 +=============================================================== +HTTP/1.1 201 Created +Content-Length: 0 +Date: REGEX(.*) +Location: /ngsi-ld/v1/entities/urn:ngsi-ld:Device:age01_Car + + + +02. Create subscription S1 with MQTT broker as endpoint +======================================================= +HTTP/1.1 201 Created +Content-Length: 0 +Date: REGEX(.*) +Location: /ngsi-ld/v1/subscriptions/urn:ngsi-ld:subscription:S1 + + + +03. PATCH E1 to match S1 +======================== +HTTP/1.1 204 No Content +Date: REGEX(.*) + + + +04. Dump the MQTT test client and see one notification +====================================================== +Notifications: 1 +{ + "body": { + "data": [ + { + "Engine_Oxigen": 220, + "Engine_Temperature": 20, + "id": "urn:ngsi-ld:Device:age01_Car", + "type": "Device" + } + ], + "id": "urn:ngsi-ld:Notification:REGEX(.*)", + "notifiedAt": "202REGEX(.*)", + "subscriptionId": "urn:ngsi-ld:subscription:S1", + "type": "Notification" + }, + "metadata": { + "Content-Type": "application/json", + "Link": "REGEX(.*)" + } +} +======================================= + + + +--TEARDOWN-- +brokerStop CB +dbDrop CB +mqttTestClientStop example_mosquitto_topic diff --git a/test/functionalTest/cases/0322_multitenancy/http_tenant_update_query.test b/test/functionalTest/cases/0322_multitenancy/http_tenant_update_query.test index b0d29997ba..426c5868cc 100644 --- a/test/functionalTest/cases/0322_multitenancy/http_tenant_update_query.test +++ b/test/functionalTest/cases/0322_multitenancy/http_tenant_update_query.test @@ -24,8 +24,8 @@ HTTP-Tenant update and query --SHELL-INIT-- dbInit CB -dbInit ${CB_DB_NAME} t_03 -dbInit ${CB_DB_NAME} t_04 +dbInit CB t_03 +dbInit CB t_04 brokerStart CB 0-255 IPV4 -multiservice --SHELL-- @@ -242,5 +242,5 @@ Fiware-Correlator: REGEX([0-9a-f\-]{36}) brokerStop CB dbDrop CB -dbDrop ${CB_DB_NAME} t_03 -dbDrop ${CB_DB_NAME} t_04 +dbDrop CB t_03 +dbDrop CB t_04 diff --git a/test/functionalTest/harnessFunctions.sh b/test/functionalTest/harnessFunctions.sh index 0dffedbbf8..857198d31a 100644 --- a/test/functionalTest/harnessFunctions.sh +++ b/test/functionalTest/harnessFunctions.sh @@ -574,7 +574,7 @@ function brokerStart() elif [ "$1" == "-notificationMode" ] || [ "$1" == "--notificationMode" ] then notificationModeGiven=TRUE - xParams="$xParams $1 $2" + xParams="$xParams -notificationMode $2" shift else xParams=$xParams' '$1