Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD MQTT retain feature #4412

Merged
merged 7 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
- Add: servicePath field to builtin attributes (#2877)
- Add: notification.mqtt.retain and notification.mqttCustom.retain flag for MQTT retain in notifications (#4388)
- Fix: logDeprecate not working correctly (`geo:json` wrongly considered as deprecated)
- Fix: improve error traces (#4387)
- Add: CLI parameter -dbUri / env var ORION_MONGO_URI (#3794)
- Fix: improve logs in MongoDB query logic
- Fix: improve logs in MongoDB query logic
5 changes: 3 additions & 2 deletions doc/manuals/admin/database_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ Fields:
is updated each time a notification is sent, to avoid violating throttling.
- **throttling**: minimum interval between notifications. 0 or -1 means no throttling.
- **reference**: the URL for notifications, either HTTP or MQTT
- **mqttTopic**: MQTT topic (only in MQTT notifications)
- **mqttQoS**: MQTT QoS value (only in MQTT notifications)
- **topic**: MQTT topic (only in MQTT notifications)
- **qos**: MQTT QoS value (only in MQTT notifications)
Comment on lines +286 to +287
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking into account this at src/lib/mongoBackend/dbConstants.h

#define CSUB_MQTTTOPIC               "topic"
#define CSUB_MQTTQOS                 "qos"

It seems mqttQoS and mqttTopic was a typo.

NTC (informative)

- **retain**: MQTT retain value (only in MQTT notifications)
- **entities**: an array of entities (mandatory). The JSON for each
entity contains **id**, **type**, **isPattern** and **isTypePattern**. Note that,
due to legacy reasons, **isPattern** may be `"true"` or `"false"` (text) while
Expand Down
2 changes: 2 additions & 0 deletions doc/manuals/orion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3713,6 +3713,7 @@ A `mqtt` object contains the following subfields:
| `url` | | string | Represent the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains a path (it only includes host and port) |
| `topic` | | string | Represent the MQTT topic to use |
| `qos` | ✓ | number | MQTT QoS value to use in the notifications associated to the subscription (0, 1 or 2). If omitted then QoS 0 is used. |
| `retain` | ✓ | boolean | MQTT retain value to use in the notifications associated to the subscription (`true` or `false`). If omitted then retain `false` is used. |
| `user` | ✓ | string | User name used to authenticate the connection with the broker. |
| `passwd` | ✓ | string | Passphrase for the broker authentication. It is always obfuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). |

Expand Down Expand Up @@ -3746,6 +3747,7 @@ A `mqttCustom` object contains the following subfields.
| `url` | | string | Represent the MQTT broker endpoint to use. URL must start with `mqtt://` and never contains a path (it only includes host and port) |
| `topic` | | string | Represent the MQTT topic to use. Macro replacement is also performed for this field (i.e: a topic based on an attribute ) |
| `qos` | ✓ | number | MQTT QoS value to use in the notifications associated to the subscription (0, 1 or 2). If omitted then QoS 0 is used. |
| `retain` | ✓ | boolean | MQTT retain value to use in the notifications associated to the subscription (`true` or `false`). If omitted then retain `false` is used. |
| `user` | ✓ | string | User name used to authenticate the connection with the broker. |
| `passwd` | ✓ | string | Passphrase for the broker authentication. It is always obfuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`). |
| `payload` | ✓ | string | Text-based payload to be used in notifications. In case of empty string or omitted, the default payload (see [Notification Messages](#notification-messages) sections) is used. If `null`, notification will not include any payload. |
Expand Down
4 changes: 3 additions & 1 deletion doc/manuals/user/mqtt_notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ The following elements can be used within `mqtt`:
* `topic` to specify the MQTT topic to use
* `qos`: to specify the MQTT QoS value to use in the notifications associated to the subscription
(0, 1 or 2). This is an optional field, if omitted then QoS 0 is used.
* `retain`: to specify the MQTT retain value to use in the notifications associated to the subscription
(`true` or `false`). This is an optional field, if omitted then QoS `false` is used.
* `user` and `passwd`: optional fields, to be used in the case MQTT broker needs user/password based
authentication. If used, both fields have to be used together. Note that for security reasons,
the password is always offuscated when retrieving subscription information (e.g. `GET /v2/subscriptions`).
Expand All @@ -52,7 +54,7 @@ in MQTT subscriptions work the same as in HTTP subscriptions, taking into accoun
* `mqttCustom` is used instead of `httpCustom`
* The same fields used in `mqtt` can be used in `mqttCustom`.
* `headers`, `qs` and `method`cannot be used, as they doesn’t have equivalence in MQTT
* Macro replacement is performed in `topic` and `payload` fields. `url`, `qos`, `user` and `passwd` are fixed values
* Macro replacement is performed in `topic` and `payload` fields. `url`, `qos`, `retain`, `user` and `passwd` are fixed values

## Connection management

Expand Down
3 changes: 3 additions & 0 deletions src/lib/apiTypesV2/MqttInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ std::string MqttInfo::toJson()
jh.addString("url", this->url);
jh.addString("topic", this->topic);
jh.addNumber("qos", (long long) this->qos);
jh.addBool("retain", this->retain);

if (providedAuth)
{
Expand Down Expand Up @@ -117,6 +118,7 @@ void MqttInfo::fill(const orion::BSONObj& bo)
this->url = bo.hasField(CSUB_REFERENCE)? getStringFieldF(bo, CSUB_REFERENCE) : "";
this->topic = bo.hasField(CSUB_MQTTTOPIC)? getStringFieldF(bo, CSUB_MQTTTOPIC) : "";
this->qos = bo.hasField(CSUB_MQTTQOS)? getIntFieldF(bo, CSUB_MQTTQOS) : 0;
this->retain = bo.hasField(CSUB_MQTTRETAIN)? getBoolFieldF(bo, CSUB_MQTTRETAIN) : false;
this->custom = bo.hasField(CSUB_CUSTOM)? getBoolFieldF(bo, CSUB_CUSTOM) : false;

// both user and passwd have to be used at the same time
Expand Down Expand Up @@ -228,6 +230,7 @@ void MqttInfo::fill(const MqttInfo& _mqttInfo)
this->url = _mqttInfo.url;
this->topic = _mqttInfo.topic;
this->qos = _mqttInfo.qos;
this->retain = _mqttInfo.retain;
this->custom = _mqttInfo.custom;
this->payload = _mqttInfo.payload;
this->payloadType = _mqttInfo.payloadType;
Expand Down
1 change: 1 addition & 0 deletions src/lib/apiTypesV2/MqttInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct MqttInfo
std::string url;
std::string topic;
unsigned int qos; // 0, 1 or 2
bool retain;

bool custom;
std::string payload; // either payload, json or ngsi is used (depending on payloadType)
Expand Down
39 changes: 39 additions & 0 deletions src/lib/jsonParseV2/parseSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,31 @@ static std::string parseMqttQoS(ConnectionInfo* ciP, SubscriptionUpdate* subsP,



/* ****************************************************************************
*
* parseMqttRetain -
*/
static std::string parseMqttRetain(ConnectionInfo* ciP, SubscriptionUpdate* subsP, const Value& mqtt)
{
Opt<bool> retainOpt = getBoolOpt(mqtt, "retain");
if (!retainOpt.ok())
{
return badInput(ciP, retainOpt.error);
}
if (retainOpt.given)
{
subsP->notification.mqttInfo.retain = retainOpt.value;
}
else
{
subsP->notification.mqttInfo.retain = 0;
}

return "";
}



/* ****************************************************************************
*
* parseMqttTopic -
Expand Down Expand Up @@ -1035,6 +1060,13 @@ static std::string parseNotification(ConnectionInfo* ciP, SubscriptionUpdate* su
return r;
}

// retain
r = parseMqttRetain(ciP, subsP, mqtt);
if (!r.empty())
{
return r;
}

// topic
r = parseMqttTopic(ciP, subsP, mqtt);
if (!r.empty())
Expand Down Expand Up @@ -1076,6 +1108,13 @@ static std::string parseNotification(ConnectionInfo* ciP, SubscriptionUpdate* su
return r;
}

// retain
r = parseMqttRetain(ciP, subsP, mqttCustom);
if (!r.empty())
{
return r;
}

// topic (same as in not custom mqtt)
r = parseMqttTopic(ciP, subsP, mqttCustom);
if (!r.empty())
Expand Down
10 changes: 6 additions & 4 deletions src/lib/mongoBackend/MongoCommonSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ void setNotificationInfo(const Subscription& sub, orion::BSONObjBuilder* b)
b->append(CSUB_REFERENCE, sub.notification.mqttInfo.url);
b->append(CSUB_MQTTTOPIC, sub.notification.mqttInfo.topic);
b->append(CSUB_MQTTQOS, (int) sub.notification.mqttInfo.qos);
b->append(CSUB_MQTTRETAIN, sub.notification.mqttInfo.retain);
b->append(CSUB_CUSTOM, sub.notification.mqttInfo.custom);

LM_T(LmtMongo, ("Subscription reference: %s", sub.notification.mqttInfo.url.c_str()));
LM_T(LmtMongo, ("Subscription mqttTopic: %s", sub.notification.mqttInfo.topic.c_str()));
LM_T(LmtMongo, ("Subscription mqttQos: %d", sub.notification.mqttInfo.qos));
LM_T(LmtMongo, ("Subscription custom: %s", sub.notification.mqttInfo.custom? "true" : "false"));
LM_T(LmtMongo, ("Subscription reference: %s", sub.notification.mqttInfo.url.c_str()));
LM_T(LmtMongo, ("Subscription mqttTopic: %s", sub.notification.mqttInfo.topic.c_str()));
LM_T(LmtMongo, ("Subscription mqttQos: %d", sub.notification.mqttInfo.qos));
LM_T(LmtMongo, ("Subscription mqttRetain: %s", sub.notification.mqttInfo.retain? "true": "false"));
LM_T(LmtMongo, ("Subscription custom: %s", sub.notification.mqttInfo.custom? "true" : "false"));

if (sub.notification.mqttInfo.providedAuth)
{
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/dbConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@

#define CSUB_MQTTTOPIC "topic"
#define CSUB_MQTTQOS "qos"
#define CSUB_MQTTRETAIN "retain"

#define CSUB_USER "user"
#define CSUB_PASSWD "passwd"
Expand Down
1 change: 1 addition & 0 deletions src/lib/mongoBackend/mongoUpdateSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void setNotificationInfo(const Subscription& sub, orion::BSONObjBuilder* setB, o
{
unsetB->append(CSUB_MQTTTOPIC, 1);
unsetB->append(CSUB_MQTTQOS, 1);
unsetB->append(CSUB_MQTTRETAIN, 1);
unsetB->append(CSUB_USER, 1);
unsetB->append(CSUB_PASSWD, 1);

Expand Down
4 changes: 2 additions & 2 deletions src/lib/mqtt/MqttConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ MqttConnection* MqttConnectionManager::getConnection(const std::string& host, in
*
* MqttConnectionManager::sendMqttNotification -
*/
bool MqttConnectionManager::sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos)
bool MqttConnectionManager::sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos, bool retain)
{
std::string endpoint = getEndpoint(host, port);

Expand Down Expand Up @@ -394,7 +394,7 @@ bool MqttConnectionManager::sendMqttNotification(const std::string& host, int po
int id;

bool retval;
int resultCode = mosquitto_publish(mosq, &id, topic.c_str(), (int) strlen(msg), msg, qos, false);
int resultCode = mosquitto_publish(mosq, &id, topic.c_str(), (int) strlen(msg), msg, qos, retain);
if (resultCode != MOSQ_ERR_SUCCESS)
{
retval = false;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/mqtt/MqttConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MqttConnectionManager

const char* semGet(void);

bool sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos);
bool sendMqttNotification(const std::string& host, int port, const std::string& user, const std::string& passwd, const std::string& content, const std::string& topic, unsigned int qos, bool retain);
void cleanup(double maxAge);

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/ngsiNotify/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ static SenderThreadParams* buildSenderParamsCustom
paramsP->registration = false;
paramsP->subscriptionId = subscriptionId.get();
paramsP->qos = notification.mqttInfo.qos; // unspecified in case of HTTP notifications
paramsP->retain = notification.mqttInfo.retain; // unspecified in case of HTTP notifications
paramsP->timeout = notification.httpInfo.timeout; // unspecified in case of MQTT notifications
paramsP->user = notification.mqttInfo.user; // unspecified in case of HTTP notifications
paramsP->passwd = notification.mqttInfo.passwd; // unspecified in case of HTTP notifications
Expand Down Expand Up @@ -714,6 +715,7 @@ SenderThreadParams* Notifier::buildSenderParams
paramsP->subscriptionId = ncr.subscriptionId.get();
paramsP->registration = false;
paramsP->qos = notification.mqttInfo.qos; // unspecified in case of HTTP notifications
paramsP->retain = notification.mqttInfo.retain; // unspecified in case of HTTP notifications
paramsP->timeout = notification.httpInfo.timeout; // unspecified in case of MQTT notifications
paramsP->user = notification.mqttInfo.user; // unspecified in case of HTTP notifications
paramsP->passwd = notification.mqttInfo.passwd; // unspecified in case of HTTP notifications
Expand Down
2 changes: 1 addition & 1 deletion src/lib/ngsiNotify/doNotify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static void doNotifyMqtt(SenderThreadParams* params)

// Note that we use in subNotificationErrorStatus() statusCode -1 and failureReson "" to avoid using
// lastFailureReason and lastSuccessCode in MQTT notifications (they don't have sense in this case)
if (mqttMgr.sendMqttNotification(params->ip, params->port, params->user, params->passwd, params->content, params->resource, params->qos))
if (mqttMgr.sendMqttNotification(params->ip, params->port, params->user, params->passwd, params->content, params->resource, params->qos, params->retain))
{
// MQTT transaction is logged only in the case it was actually published. Upon successful publishing
// mqttOnPublishCallback is called (by the moment we are not doing nothing there, just printing in
Expand Down
1 change: 1 addition & 0 deletions src/lib/ngsiNotify/senderThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef struct SenderThreadParams
std::string xauthToken;
std::string resource; // path for HTTP notifications, topic for MQTT notifications
unsigned int qos; // used only in MQTT notifications
unsigned int retain; // used only in MQTT notifications
std::string user; // for user/pass auth connections (only MQTT at the present moment)
std::string passwd; // for user/pass auth connections (only MQTT at the present moment)
std::string content_type;
Expand Down
Loading
Loading