Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed part of #1427 - by disabling the keep-alive #1446

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Fixed issues:
* #280 - Implemented Periodic Notifications (subscriptions with a 'timeInterval')
* #280 - Fixed a crash in the subscription cache for subscriptions with empty URL paths in the notification::endpoint::uri field
* #280 - New CLI (hidden) for extra field in notifications (trigger: "VERB URL PATH"): -triggerOperation
* #1427 - Disabling the keep-alive as it seems to be missing in libpaho (this is just a part of the issue)
2 changes: 1 addition & 1 deletion src/lib/orionld/mqtt/mqttConnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ bool mqttConnect(MqttConnection* mqP, bool mqtts, const char* username, const ch
snprintf(address, sizeof(address), "%s:%d", host, port);
MQTTClient_create(&mqP->client, address, "Orion-LD", MQTTCLIENT_PERSISTENCE_NONE, NULL);

connectOptions.keepAliveInterval = 20;
connectOptions.keepAliveInterval = 0;
connectOptions.cleansession = 1;
connectOptions.username = username;
connectOptions.password = password;
Expand Down
5 changes: 5 additions & 0 deletions src/lib/orionld/mqtt/mqttConnectionLookup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ MqttConnection* mqttConnectionLookup(const char* host, unsigned short port, cons
if ((mqP->password != NULL) && (strcmp(password, mqP->password) != 0)) continue;
if ((mqP->version != NULL) && (strcmp(version, mqP->version) != 0)) continue;

if (MQTTClient_isConnected(mqP->client) != true)
LM_T(LmtMqtt, ("Found the MQTT connection, just, it's not connected!"));
else
LM_T(LmtMqtt, ("Found the MQTT connection and it's connected"));

return mqP;
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib/orionld/mqtt/mqttDisconnect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ void mqttDisconnect(const char* host, unsigned short port, const char* username,
if (mcP->connections > 0)
return;

LM_T(LmtMqtt, ("Disconnection for %s/%s", mcP->host, mcP->username));

if (mcP->host != NULL) free(mcP->host);
if (mcP->username != NULL) free(mcP->username);
if (mcP->password != NULL) free(mcP->password);
Expand Down
15 changes: 13 additions & 2 deletions src/lib/orionld/mqtt/mqttNotification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ int mqttNotification

if (mqttP == NULL)
{
LM_T(LmtMqtt, ("No MQTT connection found, getting one"));
mqttP = mqttConnectionAdd(false, username, password, host, port, mqttVersion);
if (mqttP == NULL)
{
Expand All @@ -165,14 +166,24 @@ int mqttNotification
mqttMsg.qos = QoS;
mqttMsg.retained = 0;

MQTTClient_publishMessage(mqttP->client, topic, &mqttMsg, &mqttToken);
LM_T(LmtMqtt, ("Sending a notification over MQTT (topic: '%s')", topic));
int mr = MQTTClient_publishMessage(mqttP->client, topic, &mqttMsg, &mqttToken);
if (mr != MQTTCLIENT_SUCCESS)
{
LM_E(("MQTT Broker error %d", mr));
// Reconnect and try again
return -1;
}
LM_T(LmtMqtt, ("MQTTClient_publishMessage says OK (returned MQTTCLIENT_SUCCESS)"));

LM_T(LmtMqtt, ("Waiting for completion (timeout: %d)", mqttTimeout));
int rc = MQTTClient_waitForCompletion(mqttP->client, mqttToken, mqttTimeout);
if (rc != 0)
if (rc != MQTTCLIENT_SUCCESS)
{
LM_E(("Internal Error (MQTT waitForCompletion error %d)", rc));
return -1;
}
LM_T(LmtMqtt, ("MQTTClient_waitForCompletion says OK (returned MQTTCLIENT_SUCCESS)"));

return 0;
}
11 changes: 10 additions & 1 deletion src/lib/orionld/mqtt/mqttNotify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,16 @@ int mqttNotify(CachedSubscription* cSubP, struct iovec* ioVec, int ioVecSize, do
mqttMsg.qos = mqttP->qos;
mqttMsg.retained = 0;

MQTTClient_publishMessage(mqttConnectionP->client, mqttP->topic, &mqttMsg, &mqttToken);
LM_T(LmtMqtt, ("Sending a notification over MQTT (topic: '%s')", mqttP->topic));
int mr = MQTTClient_publishMessage(mqttConnectionP->client, mqttP->topic, &mqttMsg, &mqttToken);
if (mr != MQTTCLIENT_SUCCESS)
{
LM_E(("MQTT Broker error %d", mr));
// Reconnect and try again
orionldError(OrionldInternalError, "MQTT Broker Problem", "MQTTClient_publishMessage failed", 500);
notificationFailure(cSubP, "MQTT Broker error", notificationTime);
return -1;
}

extern int mqttTimeout; // From mqttNotification.cpp - should be a CLI
int rc = MQTTClient_waitForCompletion(mqttConnectionP->client, mqttToken, mqttTimeout);
Expand Down
230 changes: 230 additions & 0 deletions test/functionalTest/cases/0000_ngsild/ngsild_mqtt_broker_restart.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Copyright 2023 FIWARE Foundation e.V.
#
# This file is part of Orion-LD Context Broker.
#
# Orion-LD Context Broker is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# Orion-LD Context Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/.
#
# For those usages not covered by this license please contact with
# orionld at fiware dot org

# VALGRIND_READY - to mark the test ready for valgrindTestSuite.sh

--NAME--
Notifications via MQTT and restart of mosquitto

--SHELL-INIT--
dbInit CB
orionldStart CB
mqttTestClientStart --mqttBrokerPort $MQTT_BROKER_PORT --mqttBrokerIp $MQTT_BROKER_HOST --pretty-print --mqttTopic entities

--SHELL--

#
# 01. Create subscription with MQTT broker as endpoint
# 02. Create an entity matching the subscription
# 03. Dump and Reset the MQTT test client and see one notification
# 04. Sleep 30 secs to provoke disconnection broker-mosquitto
# 05. Create another entity matching the subscription
# 06. Dump the MQTT test client and see one notification
#

echo "01. Create subscription with MQTT broker as endpoint"
echo "===================================================="
payload='{
"id": "urn:ngsi-ld:Subscription:mqttNotification",
"type": "Subscription",
"entities": [
{
"type": "AirQualityObserved"
},
{
"type": "AirQualityObserved2"
}
],
"notification": {
"format": "normalized",
"endpoint": {
"uri": "mqtt://'$MQTT_BROKER_HOST':'$MQTT_BROKER_PORT'/entities",
"accept": "application/json",
"receiverInfo": [
{
"key": "H1",
"value": "123"
},
{
"key": "H2",
"value": "456"
}
]
}
},
"@context": ["https://fiware.github.io/data-models/context.jsonld", "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"]
}'
orionCurl --url /ngsi-ld/v1/subscriptions --payload "$payload" -H "Content-Type: application/ld+json"
echo
echo


echo "02. Create an entity matching the subscription"
echo "=============================================="
payload='{
"id": "urn:ngsi-ld:Test:Mqtt1",
"type": "AirQualityObserved",
"temperature": {
"type": "Property",
"value": 21
},
"@context": ["https://fiware.github.io/data-models/context.jsonld", "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"]
}'
orionCurl --url /ngsi-ld/v1/entities --payload "$payload" -H "Content-Type: application/ld+json"
echo
echo


echo "03. Dump and Reset the MQTT test client and see one notification"
echo "================================================================"
mqttTestClientDump entities
mqttTestClientReset entities
echo
echo


echo "04. Sleep 30 secs to provoke disconnection broker-mosquitto"
echo "==========================================================="
sleep 50
echo
echo


echo "05. Create another entity matching the subscription"
echo "==================================================="
payload='{
"id": "urn:ngsi-ld:Test:Mqtt2",
"type": "AirQualityObserved2",
"temperature": {
"type": "Property",
"value": 21
}
}'
orionCurl --url /ngsi-ld/v1/entities --payload "$payload" -H 'X-Auth-Token: x-auth-token-is-not-verified'
echo
echo


echo "06. Dump the MQTT test client and see one notification"
echo "======================================================"
mqttTestClientDump entities
echo
echo


--REGEXPECT--
01. Create subscription with MQTT broker as endpoint
====================================================
HTTP/1.1 201 Created
Content-Length: 0
Date: REGEX(.*)
Location: /ngsi-ld/v1/subscriptions/urn:ngsi-ld:Subscription:mqttNotification



02. Create an entity matching the subscription
==============================================
HTTP/1.1 201 Created
Content-Length: 0
Date: REGEX(.*)
Location: /ngsi-ld/v1/entities/urn:ngsi-ld:Test:Mqtt1



03. Dump and Reset the MQTT test client and see one notification
================================================================
Notifications: 1
{
"body": {
"data": [
{
"id": "urn:ngsi-ld:Test:Mqtt1",
"temperature": {
"type": "Property",
"value": 21
},
"type": "AirQualityObserved"
}
],
"id": "urn:ngsi-ld:Notification:REGEX(.*)",
"notifiedAt": "REGEX(.*)",
"subscriptionId": "urn:ngsi-ld:Subscription:mqttNotification",
"type": "Notification"
},
"metadata": {
"Content-Type": "application/json",
"H1": "123",
"H2": "456",
"Link": "REGEX(.*)"
}
}
=======================================



04. Sleep 30 secs to provoke disconnection broker-mosquitto
===========================================================


05. Create another entity matching the subscription
===================================================
HTTP/1.1 201 Created
Content-Length: 0
Date: REGEX(.*)
Location: /ngsi-ld/v1/entities/urn:ngsi-ld:Test:Mqtt2



06. Dump the MQTT test client and see one notification
======================================================
Notifications: 1
{
"body": {
"data": [
{
"id": "urn:ngsi-ld:Test:Mqtt2",
"temperature": {
"type": "Property",
"value": 21
},
"type": "AirQualityObserved2"
}
],
"id": "urn:ngsi-ld:Notification:REGEX(.*)",
"notifiedAt": "REGEX(.*)",
"subscriptionId": "urn:ngsi-ld:Subscription:mqttNotification",
"type": "Notification"
},
"metadata": {
"Content-Type": "application/json",
"H1": "123",
"H2": "456",
"Link": "REGEX(.*)",
"X-Auth-Token": "x-auth-token-is-not-verified"
}
}
=======================================



--TEARDOWN--
brokerStop CB
dbDrop CB
mqttTestClientStop entities
Loading