From a868c2d41958caea07aadc4152a8d8a22fbaa7d8 Mon Sep 17 00:00:00 2001 From: Ken Zangelin Date: Thu, 28 Jul 2022 11:54:18 +0200 Subject: [PATCH 1/3] Traces for issue #1178 --- src/lib/ngsiNotify/Notifier.cpp | 9 ++++++++- src/lib/ngsiNotify/QueueWorkers.cpp | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/lib/ngsiNotify/Notifier.cpp b/src/lib/ngsiNotify/Notifier.cpp index ec9727ae1f..5d4a72c81d 100644 --- a/src/lib/ngsiNotify/Notifier.cpp +++ b/src/lib/ngsiNotify/Notifier.cpp @@ -98,7 +98,7 @@ void Notifier::sendNotifyContextRequest { pthread_t tid; - + LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str())); std::vector* paramsV = Notifier::buildSenderParams(ncrP, httpInfo, tenant, @@ -255,6 +255,7 @@ static std::vector* buildSenderParamsCustom return paramsV; // empty vector } + LM(("1178: url: '%s'", url.c_str())); // // 3. Payload @@ -435,6 +436,8 @@ std::vector* Notifier::buildSenderParams char* toFree = NULL; #endif + LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str())); + if ((verb == NOVERB) || (verb == UNKNOWNVERB) || disableCusNotif) { // Default verb/method (or the one in case of disabled custom notifications) is POST @@ -544,12 +547,14 @@ std::vector* Notifier::buildSenderParams std::string uriPath; std::string protocol; + LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str())); if (strncmp(httpInfo.url.c_str(), "mqtt", 4) == 0) { host = subP->httpInfo.mqtt.host; port = subP->httpInfo.mqtt.port; uriPath = subP->httpInfo.mqtt.topic; protocol = (char*) ((subP->httpInfo.mqtt.mqtts == false)? "mqtt" : "mqtts"); + LM(("1178: protocol: '%s'", protocol.c_str())); } else if (!parseUrl(httpInfo.url, host, port, uriPath, protocol)) { @@ -566,6 +571,7 @@ std::vector* Notifier::buildSenderParams SenderThreadParams* params = new SenderThreadParams(); + LM(("1178: protocol: '%s'", protocol.c_str())); params->ip = host; params->port = port; params->protocol = protocol; @@ -656,6 +662,7 @@ std::vector* Notifier::buildSenderParams } #endif + LM(("1178: protocol: '%s'", params->protocol.c_str())); paramsV->push_back(params); return paramsV; } diff --git a/src/lib/ngsiNotify/QueueWorkers.cpp b/src/lib/ngsiNotify/QueueWorkers.cpp index fdce2740fb..8fd3659131 100644 --- a/src/lib/ngsiNotify/QueueWorkers.cpp +++ b/src/lib/ngsiNotify/QueueWorkers.cpp @@ -126,6 +126,7 @@ static void* workerFunc(void* pSyncQ) // strcpy(transactionId, params->transactionId); + LM(("1178: protocol: '%s'", params->protocol.c_str())); LM_T(LmtNotifier, ("worker sending '%s' message to: host='%s', port=%d, verb=%s, tenant='%s', service-path: '%s', xauthToken: '%s', path='%s', content-type: %s", params->protocol.c_str(), params->ip.c_str(), @@ -148,6 +149,7 @@ static void* workerFunc(void* pSyncQ) { char* topic = (char*) params->resource.c_str(); + LM(("1178: Sending MQTT Notification")); r = mqttNotification(params->ip.c_str(), params->port, topic, @@ -168,6 +170,7 @@ static void* workerFunc(void* pSyncQ) if (ngsildSubscription == false) subscriptionId = NULL; + LM(("1178: Sending HTTP Notification")); r = httpRequestSendWithCurl(curl, params->ip, params->port, From 3c089f17bfe18c514ab3ff0819225727a745a829 Mon Sep 17 00:00:00 2001 From: Ken Zangelin Date: Thu, 28 Jul 2022 11:54:44 +0200 Subject: [PATCH 2/3] Error for mqtts on sub creation, as it isn't supported --- src/lib/orionld/payloadCheck/pcheckEndpoint.cpp | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/lib/orionld/payloadCheck/pcheckEndpoint.cpp b/src/lib/orionld/payloadCheck/pcheckEndpoint.cpp index 872283a41d..7fb7494db0 100644 --- a/src/lib/orionld/payloadCheck/pcheckEndpoint.cpp +++ b/src/lib/orionld/payloadCheck/pcheckEndpoint.cpp @@ -57,8 +57,19 @@ bool pcheckEndpoint(KjNode* endpointP, bool patch, KjNode** uriPP, KjNode** noti STRING_CHECK(uriP, "endpoint::uri"); URI_CHECK(uriP->value.s, "endpoint::uri", true); - if (strncmp(uriP->value.s, "mqtt", 4) == 0) + LM(("1178: uri: '%s'", uriP->value.s)); + if (strncmp(uriP->value.s, "mqtt://", 7) == 0) + { *mqttChangeP = true; + LM(("1178: it's an MQTT subscription")); + } + else if (strncmp(uriP->value.s, "mqtts://", 8) == 0) + { + orionldError(OrionldOperationNotSupported, "Not Implemented", "MQTT notifications witn mqtts is not implemented", 501); + return false; + } + else + LM(("1178: it's NOT an MQTT subscription")); } else if (strcmp(epItemP->name, "accept") == 0) { From 40aeded992abdf691491161ff907ffea8984bf54 Mon Sep 17 00:00:00 2001 From: Ken Zangelin Date: Thu, 28 Jul 2022 11:55:35 +0200 Subject: [PATCH 3/3] Connection with MQTT broker at a slightly later stage in sub-creation --- .../legacyDriver/legacyPostSubscriptions.cpp | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/lib/orionld/legacyDriver/legacyPostSubscriptions.cpp b/src/lib/orionld/legacyDriver/legacyPostSubscriptions.cpp index b2f20debd0..846c9fd2dc 100644 --- a/src/lib/orionld/legacyDriver/legacyPostSubscriptions.cpp +++ b/src/lib/orionld/legacyDriver/legacyPostSubscriptions.cpp @@ -75,8 +75,16 @@ bool legacyPostSubscriptions(void) sub.throttling = -1; // 0? sub.timeInterval = -1; // 0? - char* subIdP = NULL; - KjNode* endpointP = NULL; + char* subIdP = NULL; + KjNode* endpointP = NULL; + bool mqtt = false; + bool mqtts = false; + char* mqttUser = NULL; + char* mqttPassword = NULL; + char* mqttHost = NULL; + unsigned short mqttPort = 0; + char* mqttVersion = NULL; + char* mqttTopic = NULL; // kjTreeToSubscription does the pCheckSubscription stuff ... for now ... if (kjTreeToSubscription(&sub, &subIdP, &endpointP) == false) @@ -90,17 +98,13 @@ bool legacyPostSubscriptions(void) { KjNode* uriP = kjLookup(endpointP, "uri"); - if (strncmp(uriP->value.s, "mqtt://", 7) == 0) + LM(("1178: uri: '%s'", uriP->value.s)); + if ((strncmp(uriP->value.s, "mqtt://", 7) == 0) || (strncmp(uriP->value.s, "mqtts://", 8) == 0)) { - bool mqtts = false; - char* mqttUser = NULL; - char* mqttPassword = NULL; - char* mqttHost = NULL; - unsigned short mqttPort = 0; - char* mqttTopic = NULL; char* detail = NULL; char* uri = kaStrdup(&orionldState.kalloc, uriP->value.s); // Can't destroy uriP->value.s ... mqttParse is destructive! + LM(("1178: parsing MQTT url: '%s'", uriP->value.s)); if (mqttParse(uri, &mqtts, &mqttUser, &mqttPassword, &mqttHost, &mqttPort, &mqttTopic, &detail) == false) { orionldError(OrionldBadRequestData, "Invalid MQTT endpoint", detail, 400); @@ -118,7 +122,6 @@ bool legacyPostSubscriptions(void) // // Get MQTT-Version from notification:endpoint:notifierInfo Array, "key == MQTT-Version" // - char* mqttVersion = NULL; int mqttQoS; KjNode* notifierInfoP = kjLookup(endpointP, "notifierInfo"); @@ -145,15 +148,7 @@ bool legacyPostSubscriptions(void) } } - - // - // Establish connection with MQTT broker - // - if (mqttConnectionEstablish(mqtts, mqttUser, mqttPassword, mqttHost, mqttPort, mqttVersion) == false) - { - orionldError(OrionldInternalError, "Unable to connect to MQTT server", "xxx", 500); - return false; - } + mqtt = true; } } @@ -175,6 +170,19 @@ bool legacyPostSubscriptions(void) } } + if (mqtt) + { + // + // Establish connection with MQTT broker + // + LM(("1178: Establishing connection with the MQTT broker")); + if (mqttConnectionEstablish(mqtts, mqttUser, mqttPassword, mqttHost, mqttPort, mqttVersion) == false) + { + orionldError(OrionldInternalError, "Unable to connect to MQTT server", "xxx", 500); + return false; + } + } + // // Create the subscription // @@ -189,7 +197,9 @@ bool legacyPostSubscriptions(void) orionldState.correlator, sub.ldContext, sub.lang); + // // FIXME: Check oError for failure (oError is output from mongoCreateSubscription!) + // Disconnect from MQTT broker if needed orionldState.httpStatusCode = SccCreated; httpHeaderLocationAdd("/ngsi-ld/v1/subscriptions/", subId.c_str(), orionldState.tenantP->tenant);