Skip to content

Commit

Permalink
Merge branch 'develop' into release/1.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
kzangeli committed Jul 29, 2022
2 parents 2e02000 + 2bfd358 commit bf7e59a
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Fixed Issues:
* Issue #280 New URI param "?local=true", to GET only local entities in GET /entities
* Issue #280 Support for Prometheus metrics (4 counters for now)
* Issue #280 Adding tenant to response wherever Location is present
* Issue #1178 MQTT notifications also when "-notificationMode != threadpool"

New Features:
* Concise input/output format - more compact but still lossless - see NGSI-LD API spec v1.6.1 for details
Expand Down
62 changes: 19 additions & 43 deletions src/app/orionld/orionld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,16 +604,12 @@ static void contextBrokerInit(std::string dbPrefix, bool multitenant)
int rc = pQNotifier->start();

if (rc != 0)
{
LM_X(1,("Runtime Error starting notification queue workers (%d)", rc));
}
LM_X(1, ("Runtime Error starting notification queue workers (%d)", rc));

pNotifier = pQNotifier;
}
else
{
pNotifier = new Notifier();
}

/* Set notifier object (singleton) */
setNotifier(pNotifier);
Expand Down Expand Up @@ -703,51 +699,31 @@ static SemOpType policyGet(std::string mutexPolicy)
*
* notificationModeParse -
*/
static void notificationModeParse(char *notifModeArg, int *pQueueSize, int *pNumThreads)
static void notificationModeParse(char* notificationMode, int* pQueueSize, int* pNumThreads)
{
char* mode;
char* first_colon;
int flds_num;
errno = 0;
// notifModeArg is a char[64], pretty sure not a huge input to break sscanf
// cppcheck-suppress invalidscanf
flds_num = sscanf(notifModeArg, "%m[^:]:%d:%d", &mode, pQueueSize, pNumThreads);
if (errno != 0)
if (strncmp(notificationMode, "threadpool", 10) == 0)
{
LM_X(1, ("Fatal Error parsing notification mode: sscanf (%s)", strerror(errno)));
}
if (flds_num == 3 && strcmp(mode, "threadpool") == 0)
{
if (*pQueueSize <= 0)
if (notificationMode[10] == 0)
{
LM_X(1, ("Fatal Error parsing notification mode: invalid queue size (%d)", *pQueueSize));
*pQueueSize = DEFAULT_NOTIF_QS;
*pNumThreads = DEFAULT_NOTIF_TN;
return;
}
if (*pNumThreads <= 0)
else if (notificationMode[10] == ':')
{
LM_X(1, ("Fatal Error parsing notification mode: invalid number of threads (%d)",*pNumThreads));
}
}
else if (flds_num == 1 && strcmp(mode, "threadpool") == 0)
{
*pQueueSize = DEFAULT_NOTIF_QS;
*pNumThreads = DEFAULT_NOTIF_TN;
}
else if (!(
flds_num == 1 &&
(strcmp(mode, "transient") == 0 || strcmp(mode, "persistent") == 0)
))
{
LM_X(1, ("Fatal Error parsing notification mode: invalid mode (%s)", notifModeArg));
}
notificationMode[10] = 0;
char* colon2 = strchr(&notificationMode[11], ':');

// get rid of params, if any, in notifModeArg
first_colon = strchr(notifModeArg, ':');
if (first_colon != NULL)
{
*first_colon = '\0';
if (colon2 == NULL)
LM_X(1, ("Invalid notificationMode (first colon found, second is missing)"));
*pQueueSize = atoi(&notificationMode[11]);
*pNumThreads = atoi(&colon2[1]);
}
else
LM_X(1, ("Invalid notificationMode '%s'", notificationMode));
}

free(mode);
else if ((strcmp(notificationMode, "transient") != 0) && (strcmp(notificationMode, "persistent") != 0))
LM_X(1, ("Invalid notificationMode '%s'", notificationMode));
}


Expand Down
6 changes: 0 additions & 6 deletions src/lib/ngsiNotify/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ static std::vector<SenderThreadParams*>* buildSenderParamsCustom
return paramsV; // empty vector
}

LM(("1178: url: '%s'", url.c_str()));

//
// 3. Payload
//
Expand Down Expand Up @@ -547,14 +545,12 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams
std::string uriPath;
std::string protocol;

LM(("1178: httpInfo.url: '%s'", httpInfo.url.c_str()));
if (strncmp(httpInfo.url.c_str(), "mqtt", 4) == 0)
{
host = subP->httpInfo.mqtt.host;
port = subP->httpInfo.mqtt.port;
uriPath = subP->httpInfo.mqtt.topic;
protocol = (char*) ((subP->httpInfo.mqtt.mqtts == false)? "mqtt" : "mqtts");
LM(("1178: protocol: '%s'", protocol.c_str()));
}
else if (!parseUrl(httpInfo.url, host, port, uriPath, protocol))
{
Expand All @@ -571,7 +567,6 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams

SenderThreadParams* params = new SenderThreadParams();

LM(("1178: protocol: '%s'", protocol.c_str()));
params->ip = host;
params->port = port;
params->protocol = protocol;
Expand Down Expand Up @@ -662,7 +657,6 @@ std::vector<SenderThreadParams*>* Notifier::buildSenderParams
}
#endif

LM(("1178: protocol: '%s'", params->protocol.c_str()));
paramsV->push_back(params);
return paramsV;
}
16 changes: 2 additions & 14 deletions src/lib/ngsiNotify/QueueWorkers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,6 @@ static void* workerFunc(void* pSyncQ)
//
strcpy(transactionId, params->transactionId);

LM(("1178: protocol: '%s'", params->protocol.c_str()));
LM_T(LmtNotifier, ("worker sending '%s' message to: host='%s', port=%d, verb=%s, tenant='%s', service-path: '%s', xauthToken: '%s', path='%s', content-type: %s",
params->protocol.c_str(),
params->ip.c_str(),
params->port,
params->verb.c_str(),
params->tenant.c_str(),
params->servicePath.c_str(),
params->xauthToken.c_str(),
params->resource.c_str(),
params->content_type.c_str()));

int r = 0;

if (simulatedNotification)
Expand All @@ -149,7 +137,7 @@ static void* workerFunc(void* pSyncQ)
{
char* topic = (char*) params->resource.c_str();

LM(("1178: Sending MQTT Notification"));
LM(("Sending MQTT Notification for subscription '%s'", params->subscriptionId.c_str()));
r = mqttNotification(params->ip.c_str(),
params->port,
topic,
Expand All @@ -170,7 +158,7 @@ static void* workerFunc(void* pSyncQ)
if (ngsildSubscription == false)
subscriptionId = NULL;

LM(("1178: Sending HTTP Notification"));
LM(("Sending HTTP Notification for subscription '%s'", params->subscriptionId.c_str()));
r = httpRequestSendWithCurl(curl,
params->ip,
params->port,
Expand Down
73 changes: 52 additions & 21 deletions src/lib/ngsiNotify/senderThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "rest/httpRequestSend.h"
#include "ngsiNotify/senderThread.h"
#include "cache/subCache.h"
#include "orionld/common/orionldState.h"
#include "orionld/mqtt/mqttNotification.h"



Expand All @@ -40,6 +42,13 @@ void* startSenderThread(void* p)
{
std::vector<SenderThreadParams*>* paramsV = (std::vector<SenderThreadParams*>*) p;

// Initialize kjson and kalloc libs - needed for MQTT
orionldStateInit(NULL);
orionldState.kjson.spacesPerIndent = 0;
orionldState.kjson.stringBeforeColon = (char*) "";
orionldState.kjson.stringAfterColon = (char*) "";
orionldState.kjson.nlString = (char*) "";

for (unsigned ix = 0; ix < paramsV->size(); ix++)
{
SenderThreadParams* params = (SenderThreadParams*) (*paramsV)[ix];
Expand All @@ -58,27 +67,49 @@ void* startSenderThread(void* p)

if (!simulatedNotification)
{
std::string out;
int r;

r = httpRequestSend(params->ip,
params->port,
params->protocol,
params->verb,
params->tenant.c_str(),
params->servicePath,
params->xauthToken.c_str(),
params->resource,
params->content_type,
params->content,
params->fiwareCorrelator,
params->renderFormat,
NOTIFICATION_WAIT_MODE,
&out,
params->extraHeaders,
"",
-1,
params->subscriptionId.c_str()); // Subscription ID as URL param
int r;

if (strncmp(params->protocol.c_str(), "mqtt", 4) != 0) // Not MQTT, must be HTTP
{
std::string out;

LM(("Sending HTTP Notification for subscription '%s'", params->subscriptionId.c_str()));
r = httpRequestSend(params->ip,
params->port,
params->protocol,
params->verb,
params->tenant.c_str(),
params->servicePath,
params->xauthToken.c_str(),
params->resource,
params->content_type,
params->content,
params->fiwareCorrelator,
params->renderFormat,
NOTIFICATION_WAIT_MODE,
&out,
params->extraHeaders,
"",
-1,
params->subscriptionId.c_str()); // Subscription ID as URL param
}
else
{
char* topic = (char*) params->resource.c_str();

LM(("Sending MQTT Notification for subscription '%s'", params->subscriptionId.c_str()));
r = mqttNotification(params->ip.c_str(),
params->port,
topic,
params->content.c_str(),
params->content_type.c_str(),
params->mqttQoS,
params->mqttUserName,
params->mqttPassword,
params->mqttVersion,
params->xauthToken.c_str(),
params->extraHeaders);
}

if (params->toFree != NULL)
{
Expand Down
Loading

0 comments on commit bf7e59a

Please sign in to comment.