Skip to content

Commit

Permalink
Merge pull request #1198 from FIWARE/issue/1178
Browse files Browse the repository at this point in the history
Issue/1178
  • Loading branch information
kzangeli authored Jul 28, 2022
2 parents 7660740 + f0abcef commit 340d92d
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
9 changes: 8 additions & 1 deletion src/lib/ngsiNotify/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void Notifier::sendNotifyContextRequest
{
pthread_t tid;


LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str()));
std::vector<SenderThreadParams*>* paramsV = Notifier::buildSenderParams(ncrP,
httpInfo,
tenant,
Expand Down Expand Up @@ -255,6 +255,7 @@ static std::vector<SenderThreadParams*>* buildSenderParamsCustom
return paramsV; // empty vector
}

LM(("1178: url: '%s'", url.c_str()));

//
// 3. Payload
Expand Down Expand Up @@ -435,6 +436,8 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams
char* toFree = NULL;
#endif

LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str()));

if ((verb == NOVERB) || (verb == UNKNOWNVERB) || disableCusNotif)
{
// Default verb/method (or the one in case of disabled custom notifications) is POST
Expand Down Expand Up @@ -544,12 +547,14 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams
std::string uriPath;
std::string protocol;

LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str()));
if (strncmp(httpInfo.url.c_str(), "mqtt", 4) == 0)
{
host = subP->httpInfo.mqtt.host;
port = subP->httpInfo.mqtt.port;
uriPath = subP->httpInfo.mqtt.topic;
protocol = (char*) ((subP->httpInfo.mqtt.mqtts == false)? "mqtt" : "mqtts");
LM(("1178: protocol: '%s'", protocol.c_str()));
}
else if (!parseUrl(httpInfo.url, host, port, uriPath, protocol))
{
Expand All @@ -566,6 +571,7 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams

SenderThreadParams* params = new SenderThreadParams();

LM(("1178: protocol: '%s'", protocol.c_str()));
params->ip = host;
params->port = port;
params->protocol = protocol;
Expand Down Expand Up @@ -656,6 +662,7 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams
}
#endif

LM(("1178: protocol: '%s'", params->protocol.c_str()));
paramsV->push_back(params);
return paramsV;
}
3 changes: 3 additions & 0 deletions src/lib/ngsiNotify/QueueWorkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ static void* workerFunc(void* pSyncQ)
//
strcpy(transactionId, params->transactionId);

LM(("1178: protocol: '%s'", params->protocol.c_str()));
LM_T(LmtNotifier, ("worker sending '%s' message to: host='%s', port=%d, verb=%s, tenant='%s', service-path: '%s', xauthToken: '%s', path='%s', content-type: %s",
params->protocol.c_str(),
params->ip.c_str(),
Expand All @@ -148,6 +149,7 @@ static void* workerFunc(void* pSyncQ)
{
char* topic = (char*) params->resource.c_str();

LM(("1178: Sending MQTT Notification"));
r = mqttNotification(params->ip.c_str(),
params->port,
topic,
Expand All @@ -168,6 +170,7 @@ static void* workerFunc(void* pSyncQ)
if (ngsildSubscription == false)
subscriptionId = NULL;

LM(("1178: Sending HTTP Notification"));
r = httpRequestSendWithCurl(curl,
params->ip,
params->port,
Expand Down
48 changes: 29 additions & 19 deletions src/lib/orionld/legacyDriver/legacyPostSubscriptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,16 @@ bool legacyPostSubscriptions(void)
sub.throttling = -1; // 0?
sub.timeInterval = -1; // 0?

char* subIdP = NULL;
KjNode* endpointP = NULL;
char* subIdP = NULL;
KjNode* endpointP = NULL;
bool mqtt = false;
bool mqtts = false;
char* mqttUser = NULL;
char* mqttPassword = NULL;
char* mqttHost = NULL;
unsigned short mqttPort = 0;
char* mqttVersion = NULL;
char* mqttTopic = NULL;

// kjTreeToSubscription does the pCheckSubscription stuff ... for now ...
if (kjTreeToSubscription(&sub, &subIdP, &endpointP) == false)
Expand All @@ -90,17 +98,13 @@ bool legacyPostSubscriptions(void)
{
KjNode* uriP = kjLookup(endpointP, "uri");

if (strncmp(uriP->value.s, "mqtt://", 7) == 0)
LM(("1178: uri: '%s'", uriP->value.s));
if ((strncmp(uriP->value.s, "mqtt://", 7) == 0) || (strncmp(uriP->value.s, "mqtts://", 8) == 0))
{
bool mqtts = false;
char* mqttUser = NULL;
char* mqttPassword = NULL;
char* mqttHost = NULL;
unsigned short mqttPort = 0;
char* mqttTopic = NULL;
char* detail = NULL;
char* uri = kaStrdup(&orionldState.kalloc, uriP->value.s); // Can't destroy uriP->value.s ... mqttParse is destructive!

LM(("1178: parsing MQTT url: '%s'", uriP->value.s));
if (mqttParse(uri, &mqtts, &mqttUser, &mqttPassword, &mqttHost, &mqttPort, &mqttTopic, &detail) == false)
{
orionldError(OrionldBadRequestData, "Invalid MQTT endpoint", detail, 400);
Expand All @@ -118,7 +122,6 @@ bool legacyPostSubscriptions(void)
//
// Get MQTT-Version from notification:endpoint:notifierInfo Array, "key == MQTT-Version"
//
char* mqttVersion = NULL;
int mqttQoS;
KjNode* notifierInfoP = kjLookup(endpointP, "notifierInfo");

Expand All @@ -145,15 +148,7 @@ bool legacyPostSubscriptions(void)
}
}


//
// Establish connection with MQTT broker
//
if (mqttConnectionEstablish(mqtts, mqttUser, mqttPassword, mqttHost, mqttPort, mqttVersion) == false)
{
orionldError(OrionldInternalError, "Unable to connect to MQTT server", "xxx", 500);
return false;
}
mqtt = true;
}
}

Expand All @@ -175,6 +170,19 @@ bool legacyPostSubscriptions(void)
}
}

if (mqtt)
{
//
// Establish connection with MQTT broker
//
LM(("1178: Establishing connection with the MQTT broker"));
if (mqttConnectionEstablish(mqtts, mqttUser, mqttPassword, mqttHost, mqttPort, mqttVersion) == false)
{
orionldError(OrionldInternalError, "Unable to connect to MQTT server", "xxx", 500);
return false;
}
}

//
// Create the subscription
//
Expand All @@ -189,7 +197,9 @@ bool legacyPostSubscriptions(void)
orionldState.correlator,
sub.ldContext,
sub.lang);
//
// FIXME: Check oError for failure (oError is output from mongoCreateSubscription!)
// Disconnect from MQTT broker if needed

orionldState.httpStatusCode = SccCreated;
httpHeaderLocationAdd("/ngsi-ld/v1/subscriptions/", subId.c_str(), orionldState.tenantP->tenant);
Expand Down
13 changes: 12 additions & 1 deletion src/lib/orionld/payloadCheck/pcheckEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,19 @@ bool pcheckEndpoint(KjNode* endpointP, bool patch, KjNode** uriPP, KjNode** noti
STRING_CHECK(uriP, "endpoint::uri");
URI_CHECK(uriP->value.s, "endpoint::uri", true);

if (strncmp(uriP->value.s, "mqtt", 4) == 0)
LM(("1178: uri: '%s'", uriP->value.s));
if (strncmp(uriP->value.s, "mqtt://", 7) == 0)
{
*mqttChangeP = true;
LM(("1178: it's an MQTT subscription"));
}
else if (strncmp(uriP->value.s, "mqtts://", 8) == 0)
{
orionldError(OrionldOperationNotSupported, "Not Implemented", "MQTT notifications witn mqtts is not implemented", 501);
return false;
}
else
LM(("1178: it's NOT an MQTT subscription"));
}
else if (strcmp(epItemP->name, "accept") == 0)
{
Expand Down

0 comments on commit 340d92d

Please sign in to comment.