Skip to content

Commit

Permalink
Merge pull request #1678 from FIWARE/dds/first-xtypes-notification-in…
Browse files Browse the repository at this point in the history
…-orionld

first xtypes-notification in orionld
  • Loading branch information
kzangeli authored Sep 30, 2024
2 parents 10fbf28 + ef9102b commit 0d7016d
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 52 deletions.
4 changes: 3 additions & 1 deletion src/lib/orionld/dds/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.5)

SET (SOURCES
kjTreeLog.cpp
ddsInit.cpp
ddsConfigLoad.cpp
kjTreeLog.cpp
ddsCategoryToKlogSeverity.cpp
ddsNotification.cpp
ddsEntityCreateFromAttribute.cpp
)

# Include directories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ int ddsEntityCreateFromAttribute(KjNode* attrNodeP, const char* entityId, const

orionldState.payloadIdNode = kjString(orionldState.kjsonP, "id", entityId);
orionldState.payloadTypeNode = kjString(orionldState.kjsonP, "type", entityType);

KT_T(StDds, "Entity doesn't exist - calling orionldPostEntities");
if (orionldState.requestTree->type != KjObject)
{
KT_T(StDds, "But first, need to transform the incoming request tree into a JSON object");
KjNode* attributeP = orionldState.requestTree;
attributeP->name = (char*) attrName;
orionldState.requestTree = kjObject(orionldState.kjsonP, NULL);
KT_T(StDds, "But first, need to transform the incoming request tree into a JSON object");

kjChildAdd(orionldState.requestTree, attributeP);
kjTreeLog2(orionldState.requestTree, "Input KjNode tree to orionldPostEntities", StDds);
}
KjNode* attributeP = orionldState.requestTree;
attributeP->name = (char*) attrName;
orionldState.requestTree = kjObject(orionldState.kjsonP, NULL);

kjChildAdd(orionldState.requestTree, attributeP);

kjTreeLog2(orionldState.requestTree, "Input KjNode tree to orionldPostEntities", StDds);
return orionldPostEntities();
}
13 changes: 2 additions & 11 deletions src/lib/orionld/dds/ddsInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern "C"
#include "orionld/dds/ddsCategoryToKlogSeverity.h" // ddsCategoryToKlogSeverity
#include "orionld/dds/ddsConfigLoad.h" // ddsConfigLoad
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2
#include "orionld/dds/ddsNotification.h" // ddsNotification
#include "orionld/dds/ddsInit.h" // Own interface


Expand All @@ -52,17 +53,6 @@ DdsOperationMode ddsOpMode;



// -----------------------------------------------------------------------------
//
// ddsNotification -
//
void ddsNotification(const char* typeName, const char* topicName, const char* json, double publishTime)
{
KT_T(StDds, "Got a notification on %s:%s (json: %s)", typeName, topicName, json);
}



// -----------------------------------------------------------------------------
//
// ddsTypeNotification -
Expand All @@ -73,6 +63,7 @@ void ddsTypeNotification(const char* typeName, const char* topicName, const char
}



// -----------------------------------------------------------------------------
//
// ddsLog -
Expand Down
62 changes: 60 additions & 2 deletions src/lib/orionld/dds/ddsNotification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ extern "C"
{
#include "ktrace/kTrace.h" // trace messages - ktrace library
#include "kjson/KjNode.h" // KjNode
#include "kjson/kjParse.h" // kjParse
#include "kjson/kjLookup.h" // kjLookup
#include "kjson/kjBuilder.h" // kjChildRemove, ...
#include "kjson/kjBuilder.h" // kjObject, kjChildAdd
}

#include "orionld/common/orionldState.h" // orionldState, kjTreeLog
#include "orionld/common/traceLevels.h" // KT_T trace levels
#include "orionld/common/tenantList.h" // tenant0
#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2
#include "orionld/dds/ddsConfigTopicToAttribute.h" // ddsConfigTopicToAttribute
#include "orionld/dds/ddsNotification.h" // Own interface


Expand All @@ -44,6 +44,63 @@ extern "C"
//
// ddsNotification -
//
void ddsNotification(const char* typeName, const char* topicName, const char* json, double publishTime)
{
KT_T(StDds, "Got a notification on %s:%s (json: %s)", typeName, topicName, json);

orionldStateInit(NULL);

KjNode* kTree = kjParse(orionldState.kjsonP, (char*) json);
if (kTree == NULL)
KT_RVE("Error parsing json payload from DDS: '%s'", json);

KjNode* idNodeP = kjLookup(kTree, "id");
KjNode* typeNodeP = kjLookup(kTree, "type");
KjNode* attrValueNodeP = kjLookup(kTree, topicName);

if (idNodeP == NULL) KT_RVE("No 'id' field in DDS payload ");
if (typeNodeP == NULL) KT_RVE("No 'type' field in DDS payload ");
if (attrValueNodeP == NULL) KT_RVE("No attribute field ('%s') in DDS payload", topicName);

orionldState.payloadIdNode = idNodeP;
orionldState.payloadTypeNode = typeNodeP;
KT_T(StDds, "orionldState.payloadIdNode: %p", orionldState.payloadIdNode);
KT_T(StDds, "orionldState.payloadTypeNode: %p", orionldState.payloadTypeNode);

// char* attributeLongName = orionldAttributeExpand(coreContextP, topicName, true, NULL);

char* pipe = strchr(idNodeP->value.s, '|');
if (pipe != NULL)
*pipe = 0;

char id[256];
snprintf(id, sizeof(id) - 1, "urn:%s", idNodeP->value.s);
KT_T(StDds, "New entity id: %s", id);

KjNode* attrNodeP = kjObject(orionldState.kjsonP, NULL);
kjChildAdd(attrNodeP, attrValueNodeP);
attrValueNodeP->name = (char*) "value";

orionldState.requestTree = attrNodeP;
orionldState.uriParams.format = (char*) "simplified";
orionldState.uriParams.type = typeNodeP->value.s;
orionldState.wildcard[0] = id;
orionldState.wildcard[1] = (char*) topicName;

orionldState.tenantP = &tenant0; // FIXME ... Use tenants?
orionldState.in.pathAttrExpanded = (char*) topicName;
orionldState.ddsSample = true;

//
// If the entity does not exist, it needs to be created
// Except of course, if it is registered and exists elsewhere
//
KT_T(StDds, "Calling orionldPutAttribute");
orionldPutAttribute();
}

#if 0
#include "orionld/dds/ddsConfigTopicToAttribute.h" // ddsConfigTopicToAttribute
void ddsNotification(const char* entityType, const char* entityId, const char* topicName, KjNode* notificationP)
{
KT_V("Got a notification from DDS");
Expand Down Expand Up @@ -112,3 +169,4 @@ void ddsNotification(const char* entityType, const char* entityId, const char* t
//
orionldPutAttribute();
}
#endif
6 changes: 1 addition & 5 deletions src/lib/orionld/dds/ddsNotification.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@
*
* Author: Ken Zangelin
*/
extern "C"
{
#include "kjson/KjNode.h" // KjNode
}



// -----------------------------------------------------------------------------
//
// ddsNotification -
//
extern void ddsNotification(const char* entityType, const char* entityId, const char* attrName, KjNode* notificationP);
extern void ddsNotification(const char* typeName, const char* topicName, const char* json, double publishTime);

#endif // SRC_LIB_ORIONLD_DDS_DDSNOTIFICATION_H_
4 changes: 3 additions & 1 deletion src/lib/orionld/payloadCheck/pCheckAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,9 @@ static bool pCheckAttributeObject
// Check for Deletion -
// FIXME: Change ORIONLD_SERVICE_OPTION_ACCEPT_JSONLD_NULL for ... ORIONLD_SERVICE_OPTION_xxxxx
//
if ((orionldState.serviceP->options & ORIONLD_SERVICE_OPTION_ACCEPT_JSONLD_NULL) != 0)
if (orionldState.serviceP == NULL)
LM_W(("orionldState.serviceP is NULL as 'DDS initiated via PutAttribute'"));
else if ((orionldState.serviceP->options & ORIONLD_SERVICE_OPTION_ACCEPT_JSONLD_NULL) != 0)
{
if (deletionWithoutTypePresent(attrP, attributeType, valueP, objectP, languageMapP, vocabP, jsonP) == true)
return true;
Expand Down
5 changes: 3 additions & 2 deletions src/lib/orionld/serviceRoutines/orionldPutAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ extern "C"
#include "orionld/distOp/distOpFailure.h" // distOpFailure
#include "orionld/distOp/distOpSuccess.h" // distOpSuccess
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2
#include "orionld/dds/ddsEntityCreateFromAttribute.h" // ddsEntityCreateFromAttribute
#include "orionld/notifications/alteration.h" // alteration
#include "orionld/notifications/previousValuePopulate.h" // previousValuePopulate
#include "orionld/notifications/sysAttrsStrip.h" // sysAttrsStrip
Expand Down Expand Up @@ -120,8 +121,8 @@ bool orionldPutAttribute(void)
{
if (orionldState.distributed == false)
{
// if (orionldState.ddsSample == true)
// return ddsEntityCreateFromAttribute(orionldState.requestTree, entityId, attrName);
if (orionldState.ddsSample == true)
return ddsEntityCreateFromAttribute(orionldState.requestTree, entityId, attrName);

orionldError(OrionldResourceNotFound, "Entity Not Found", entityId, 404);
return false;
Expand Down
40 changes: 23 additions & 17 deletions test/functionalTest/cases/0000_dds/dds_notifications.test
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ ftClientStart -t 0-5000
#
# 01. Ask FT to publish an entity urn:E1 on topic 'P1'
# 02. Ask FT to publish an entity urn:E2 on topic 'P2'
# 03. Ask FT to publish an entity urn:E3 on topic 'P3'
# 03. Ask FT to publish an entity urn:E1 on topic 'P3'
# 04. Query the broker for all its entities, see urn:E1 and urn:E2
#

Expand All @@ -42,7 +42,7 @@ echo "===================================================="
payload='{
"s": "abc"
}'
orionCurl --url '/dds/pub?ddsTopicType=xyz&entityId=urn:e1&entityType=T&ddsTopicName=P1' --port $FT_PORT --payload "$payload"
orionCurl --url '/dds/pub?ddsTopicType=xyz&entityId=urn:E1&entityType=T&ddsTopicName=P1' --port $FT_PORT --payload "$payload"
echo
echo

Expand All @@ -52,17 +52,17 @@ echo "===================================================="
payload='{
"i": 2
}'
orionCurl --url '/dds/pub?ddsTopicType=xyz&xyz&entityId=urn:e2&ddsTopicName=P2' --port $FT_PORT --payload "$payload"
orionCurl --url '/dds/pub?ddsTopicType=xyz&xyz&entityId=urn:E2&ddsTopicName=P2' --port $FT_PORT --payload "$payload"
echo
echo


echo "03. Ask FT to publish an entity urn:E3 on topic 'P3'"
echo "03. Ask FT to publish an entity urn:E1 on topic 'P3'"
echo "===================================================="
payload='{
"f": 3.14
}'
orionCurl --url '/dds/pub?ddsTopicType=xyzxyz&entityId=urn:e1&ddsTopicName=P3' --port $FT_PORT --payload "$payload"
orionCurl --url '/dds/pub?ddsTopicType=xyzxyz&entityId=urn:E1&ddsTopicName=P3' --port $FT_PORT --payload "$payload"
echo
echo

Expand All @@ -89,7 +89,7 @@ Date: REGEX(.*)



03. Ask FT to publish an entity urn:E3 on topic 'P3'
03. Ask FT to publish an entity urn:E1 on topic 'P3'
====================================================
HTTP/1.1 204 No Content
Date: REGEX(.*)
Expand All @@ -99,26 +99,32 @@ Date: REGEX(.*)
04. Query the broker for all its entities, see urn:E1 and urn:E2
================================================================
HTTP/1.1 200 OK
Content-Length: 125
Content-Length: 203
Content-Type: application/json
Date: REGEX(.*)
Link: <https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-contextREGEX(.*)

[
{
"P1": {
"type": "Property",
"value": 1
},
"id": "urn:e1",
"type": "T"
},
{
"P2": {
"type": "Property",
"value": 2
"value": {
"data": {
"0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0": {
"b": false,
"f": 0,
"i": 2,
"ia": [
0,
0
],
"s": ""
}
},
"type": "NgsildSample"
}
},
"id": "urn:e2",
"id": "urn:01.0f.bc.a1.62.42.9a.b8.01.00.01.00",
"type": "T"
}
]
Expand Down
4 changes: 2 additions & 2 deletions test/functionalTest/ftClient/NgsildPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ bool NgsildPublisher::publish(const char* entityType, const char* entityId, cons
entity_.f(f);
entity_.b(b);

b = writer_->write(&entity_);
eprosima::fastdds::dds::ReturnCode_t rc = writer_->write(&entity_);

if (b == false)
if (rc != eprosima::fastdds::dds::RETCODE_OK)
{
KT_E("Not able to publish");
return false;
Expand Down
3 changes: 3 additions & 0 deletions test/functionalTest/ftClient/ddsPublish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
*
* Author: David Campo, Ken Zangelin
*/
#include <unistd.h> // usleep

extern "C"
{
#include "ktrace/kTrace.h" // trace messages - ktrace library
Expand Down Expand Up @@ -127,6 +129,7 @@ void ddsPublishEntity
KT_V("Publishing the entity '%s' in DDS", entityId);

NgsildPublisher* publisherP = new NgsildPublisher(topicType);
usleep(100000);

KT_V("Initializing publisher for topicType '%s', topicName '%s'", topicType, topicName);
if (publisherP->init(topicName))
Expand Down
12 changes: 10 additions & 2 deletions test/functionalTest/testHarness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,14 @@ fi



# ------------------------------------------------------------------------------
#
# Copy the DDS Enabler config file to /tmp
#
cp config/DDS_ENABLER_CONFIGURATION.yaml /tmp



# ------------------------------------------------------------------------------
#
# Check unmatching --dir and 'parameter that is a directory' AND
Expand Down Expand Up @@ -1101,7 +1109,7 @@ function partExecute()
#
grep -v "already exists" $dirname/$filename.$what.stderr > $dirname/$filename.$what.stderr2
grep -v "mongoc: falling back to malloc for counters." $dirname/$filename.$what.stderr2 > $dirname/$filename.$what.stderr3
grep -v "mongoc: Falling back to malloc for counters." $dirname/$filename.$what.stderr3 > $dirname/$filename.$what.stderr4
grep -v "Port 7411 Zombie" $dirname/$filename.$what.stderr3 > $dirname/$filename.$what.stderr4
grep -v "screen size is bogus" $dirname/$filename.$what.stderr4 > $dirname/$filename.$what.stderr
rm -f $dirname/$filename.$what.stderr2
rm -f $dirname/$filename.$what.stderr3
Expand Down Expand Up @@ -1288,7 +1296,7 @@ function runTest()
logMsg "SHELL-INIT part for $path DONE. eCode=$eCode"
grep -v "already exists" $dirname/$filename.shellInit.stderr > $dirname/$filename.shellInit.stderr2
grep -v "mongoc: falling back to malloc for counters." $dirname/$filename.shellInit.stderr2 > $dirname/$filename.shellInit.stderr3
grep -v "mongoc: Falling back to malloc for counters." $dirname/$filename.shellInit.stderr3 > $dirname/$filename.shellInit.stderr
grep -v "Port 7411 Zombie" $dirname/$filename.shellInit.stderr3 > $dirname/$filename.shellInit.stderr
rm $dirname/$filename.shellInit.stderr2
rm $dirname/$filename.shellInit.stderr3

Expand Down

0 comments on commit 0d7016d

Please sign in to comment.