Skip to content

Commit

Permalink
Merge pull request #1398 from FIWARE/issue/1394_sysAttrs_in_subs_and_…
Browse files Browse the repository at this point in the history
…notifs

Issue/1394 sys attrs in subs and notifs
  • Loading branch information
kzangeli authored Jun 29, 2023
2 parents a13e7ed + 2bf77bb commit 0def54a
Show file tree
Hide file tree
Showing 37 changed files with 1,271 additions and 170 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ Fixed issues:
#1381 Bug in GeoJSON notifications
#1385 Fixed a bug in PATCH /entities/{EID}/attrs, about patching a relationship object to an array (this is not supported by the NGSI-LD spec, but Orion-LD supports it anyway - for now)
#1387 Part 1 of showChanges + previous values of attributes in notifications - for non-batch operations, non-distributed attributes, and non-delete operations
#1394 System Attributes in Notifications
1 change: 1 addition & 0 deletions src/lib/cache/CachedSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct CachedSubscription
char* qText; // Note that NGSIv2/mongoBackend q/mq are inside SubscriptionExpression
KjNode* geoCoordinatesP;
bool showChanges;
bool sysAttrs;

bool isActive;
std::string status;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/logMsg/traceLevels.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ typedef enum TraceLevels
LmtCurl = 250, // CURL library
LmtToDo, // To Do list
LmtPatchEntity, // Real merge+patch
LmtPatchEntity2, // Real merge+patch: merging for final API Entity, for notifications
LmtSysAttrs, // System Attributes
LmtLeak // Used when debugging leaks and valgrind errors
} TraceLevels;

Expand Down
13 changes: 10 additions & 3 deletions src/lib/orionld/common/subCacheApiSubscriptionInsert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ extern "C"

#include "cache/CachedSubscription.h" // CachedSubscription
#include "cache/subCache.h" // subCacheItemInsert
#include "common/RenderFormat.h" // stringToRenderFormat
#include "common/RenderFormat.h" // RenderFormat, stringToRenderFormat

#include "orionld/q/QNode.h" // QNode
#include "orionld/context/OrionldContext.h" // OrionldContext
Expand All @@ -62,7 +62,9 @@ CachedSubscription* subCacheApiSubscriptionInsert
KjNode* geoCoordinatesP,
OrionldContext* contextP,
const char* tenant,
KjNode* showChangesP
KjNode* showChangesP,
KjNode* sysAttrsP,
RenderFormat renderFormat
)
{
CachedSubscription* cSubP = new CachedSubscription();
Expand All @@ -74,6 +76,10 @@ CachedSubscription* subCacheApiSubscriptionInsert
cSubP->ldContext = (contextP != NULL)? contextP->url : "";
cSubP->geoCoordinatesP = NULL;
cSubP->showChanges = (showChangesP != NULL)? showChangesP->value.b : false;
cSubP->sysAttrs = (sysAttrsP != NULL)? sysAttrsP->value.b : false;
cSubP->renderFormat = renderFormat;

LM_T(LmtSysAttrs, ("sysAttrs: %s", (cSubP->sysAttrs == true)? "true" : "false"));

KjNode* subscriptionIdP = kjLookup(apiSubscriptionP, "_id"); // "id" was changed to "_id" by orionldPostSubscriptions to accomodate the DB insertion
KjNode* subscriptionNameP = kjLookup(apiSubscriptionP, "subscriptionName"); // "name" is accepted too ...
Expand Down Expand Up @@ -243,7 +249,7 @@ CachedSubscription* subCacheApiSubscriptionInsert
if (notificationP != NULL)
{
KjNode* attributesP = kjLookup(notificationP, "attributes");
KjNode* formatP = kjLookup(notificationP, "format");
KjNode* formatP = kjLookup(notificationP, "format"); // FIXME: pCheckSubscription has already found this field. Can be removed
KjNode* endpointP = kjLookup(notificationP, "endpoint");

if (attributesP != NULL)
Expand All @@ -254,6 +260,7 @@ CachedSubscription* subCacheApiSubscriptionInsert
}
}

// FIXME: pCheckSubscription has already found the "notificatrion::format" field. Can be removed
if (formatP != NULL)
{
cSubP->renderFormat = stringToRenderFormat(formatP->value.s, true);
Expand Down
6 changes: 5 additions & 1 deletion src/lib/orionld/common/subCacheApiSubscriptionInsert.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ extern "C"
#include "kjson/KjNode.h" // KjNode
}

#include "common/RenderFormat.h" // RenderFormat
#include "cache/subCache.h" // CachedSubscription

#include "orionld/q/QNode.h" // QNode
#include "orionld/context/OrionldContext.h" // OrionldContext

Expand All @@ -47,7 +49,9 @@ extern CachedSubscription* subCacheApiSubscriptionInsert
KjNode* geoCoordinatesP,
OrionldContext* contextP,
const char* tenant,
KjNode* showChangesP
KjNode* showChangesP,
KjNode* sysAttrsP,
RenderFormat renderFormat
);

#endif // SRC_LIB_ORIONLD_COMMON_SUBCACHEAPISUBSCRIPTIONINSERT_H_
5 changes: 5 additions & 0 deletions src/lib/orionld/dbModel/dbModelFromApiSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ bool dbModelFromApiSubscription(KjNode* apiSubscriptionP, bool patch)
kjChildRemove(notificationP, nItemP);
kjChildAdd(apiSubscriptionP, nItemP);
}
else if (strcmp(nItemP->name, "sysAttrs") == 0)
{
kjChildRemove(notificationP, nItemP);
kjChildAdd(apiSubscriptionP, nItemP);
}
else if (strcmp(nItemP->name, "endpoint") == 0)
{
KjNode* uriP = kjLookup(nItemP, "uri");
Expand Down
14 changes: 9 additions & 5 deletions src/lib/orionld/dbModel/dbModelToApiAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ void dbModelToApiAttribute(KjNode* dbAttrP, bool sysAttrs, bool eqsForDots)
{
if ((sysAttrs == true) && (ix > 2))
{
char* dateTimeBuf = kaAlloc(&orionldState.kalloc, 32);
numberToDate(dbAttrP->value.f, dateTimeBuf, 32);
dbAttrP->name = (char*) ngsildName[ix];
dbAttrP->value.s = dateTimeBuf;
dbAttrP->type = KjString;
nodeP->name = (char*) ngsildName[ix];

if (dbAttrP->type == KjFloat)
{
char* dateTimeBuf = kaAlloc(&orionldState.kalloc, 32);
numberToDate(dbAttrP->value.f, dateTimeBuf, 32);
nodeP->value.s = dateTimeBuf;
nodeP->type = KjString;
}
}
else
kjChildRemove(dbAttrP, nodeP);
Expand Down
34 changes: 26 additions & 8 deletions src/lib/orionld/dbModel/dbModelToApiSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C"
}

#include "logMsg/logMsg.h" // LM_*
#include "common/RenderFormat.h" // RenderFormat

#include "orionld/common/orionldState.h" // orionldState
#include "orionld/common/orionldError.h" // orionldError
Expand Down Expand Up @@ -172,13 +173,15 @@ static bool notificationStatus(KjNode* dbLastSuccessP, KjNode* dbLastFailureP)
//
KjNode* dbModelToApiSubscription
(
KjNode* dbSubP,
const char* tenant,
bool forSubCache,
QNode** qNodePP,
KjNode** coordinatesPP,
KjNode** contextNodePP,
KjNode** showChangesP
KjNode* dbSubP,
const char* tenant,
bool forSubCache,
QNode** qNodePP,
KjNode** coordinatesPP,
KjNode** contextNodePP,
KjNode** showChangesP,
KjNode** sysAttrsP,
RenderFormat* renderFormatP
)
{
KjNode* dbSubIdP = kjLookup(dbSubP, "_id"); DB_ITEM_NOT_FOUND(dbSubIdP, "id", tenant);
Expand All @@ -204,6 +207,7 @@ KjNode* dbModelToApiSubscription
KjNode* dbLastSuccessP = kjLookup(dbSubP, "lastSuccess");
KjNode* dbLastFailureP = kjLookup(dbSubP, "lastFailure");
KjNode* dbShowChangesP = kjLookup(dbSubP, "showChanges");
KjNode* dbSysAttrsP = kjLookup(dbSubP, "sysAttrs");
KjNode* dbCreatedAtP = NULL;
KjNode* dbModifiedAtP = NULL;

Expand All @@ -229,6 +233,10 @@ KjNode* dbModelToApiSubscription
if ((dbShowChangesP != NULL) && (showChangesP != NULL))
*showChangesP = dbShowChangesP;

// sysAttrs
if ((dbSysAttrsP != NULL) && (sysAttrsP != NULL))
*sysAttrsP = dbShowChangesP;

//
// If dbSubIdP is a JSON Object, it's an NGSIv2 subscription and its "id" looks like this:
// "id": { "$oid": "6290eafec8112b5716a931a7" }
Expand Down Expand Up @@ -447,19 +455,29 @@ KjNode* dbModelToApiSubscription
}

if (dbFormatP != NULL)
{
kjChildAdd(notificationP, dbFormatP);
*renderFormatP = stringToRenderFormat(dbFormatP->value.s);
}

KjNode* endpointP = kjObject(orionldState.kjsonP, "endpoint");

kjChildAdd(notificationP, endpointP);

// notification::showChanges
// Notification::showChanges
if ((dbShowChangesP != NULL) && (dbShowChangesP->value.b == true))
{
KjNode* showChangesP = kjBoolean(orionldState.kjsonP, "showChanges", true);
kjChildAdd(notificationP, showChangesP);
}

// notification::sysAttrs
if ((dbSysAttrsP != NULL) && (dbSysAttrsP->value.b == true))
{
KjNode* sysAttrsP = kjBoolean(orionldState.kjsonP, "sysAttrs", true);
kjChildAdd(notificationP, sysAttrsP);
}

// notification::status
bool nStatus = notificationStatus(dbLastSuccessP, dbLastFailureP);
KjNode* nStatusNodeP = kjString(orionldState.kjsonP, "status", (nStatus == true)? "ok" : "failed");
Expand Down
18 changes: 11 additions & 7 deletions src/lib/orionld/dbModel/dbModelToApiSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ extern "C"
#include "kjson/KjNode.h" // KjNode
}

#include "common/RenderFormat.h" // RenderFormat

#include "orionld/q/QNode.h" // QNode


Expand All @@ -40,13 +42,15 @@ extern "C"
//
extern KjNode* dbModelToApiSubscription
(
KjNode* dbSubP,
const char* tenant,
bool forSubCache,
QNode** qNodePP,
KjNode** coordinatesPP,
KjNode** contextNodePP,
KjNode** showChangesP
KjNode* dbSubP,
const char* tenant,
bool forSubCache,
QNode** qNodePP,
KjNode** coordinatesPP,
KjNode** contextNodePP,
KjNode** showChangesP,
KjNode** sysAttrsP,
RenderFormat* renderFormatP
);

#endif // SRC_LIB_ORIONLD_DBMODEL_DBMODELTOAPISUBSCRIPTION_H_
8 changes: 8 additions & 0 deletions src/lib/orionld/kjTree/kjTreeFromCachedSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,14 @@ KjNode* kjTreeFromCachedSubscription(CachedSubscription* cSubP, bool sysAttrs, b
kjChildAdd(notificationNodeP, nodeP);
}

// notification::sysAttrs
if (cSubP->sysAttrs == true)
{
nodeP = kjBoolean(orionldState.kjsonP, "sysAttrs", true);
NULL_CHECK(nodeP);
kjChildAdd(notificationNodeP, nodeP);
}

// notification::endpoint
KjNode* endpointNodeP = kjObject(orionldState.kjsonP, "endpoint");
NULL_CHECK(endpointNodeP);
Expand Down
25 changes: 17 additions & 8 deletions src/lib/orionld/mongoc/mongocSubCachePopulateByTenant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ bool mongocSubCachePopulateByTenant(OrionldTenant* tenantP)
//
bson_init(&mongoFilter);

mongoc_client_t* connectionP = mongoc_client_pool_pop(mongocPool);

mongoc_client_t* connectionP = mongoc_client_pool_pop(mongocPool);
mongoc_collection_t* subscriptionsP = mongoc_client_get_collection(connectionP, tenantP->mongoDbName, "csubs");

//
Expand All @@ -99,11 +98,21 @@ bool mongocSubCachePopulateByTenant(OrionldTenant* tenantP)
continue;
}

QNode* qTree = NULL;
KjNode* contextNodeP = NULL;
KjNode* coordinatesP = NULL;
KjNode* showChangesP = NULL;
KjNode* apiSubP = dbModelToApiSubscription(dbSubP, tenantP->tenant, true, &qTree, &coordinatesP, &contextNodeP, &showChangesP);
QNode* qTree = NULL;
KjNode* contextNodeP = NULL;
KjNode* coordinatesP = NULL;
KjNode* showChangesP = NULL;
KjNode* sysAttrsP = NULL;
RenderFormat renderFormat = RF_NORMALIZED;
KjNode* apiSubP = dbModelToApiSubscription(dbSubP,
tenantP->tenant,
true,
&qTree,
&coordinatesP,
&contextNodeP,
&showChangesP,
&sysAttrsP,
&renderFormat);

if (apiSubP == NULL)
continue;
Expand All @@ -112,7 +121,7 @@ bool mongocSubCachePopulateByTenant(OrionldTenant* tenantP)
if (contextNodeP != NULL)
contextP = orionldContextFromUrl(contextNodeP->value.s, NULL);

subCacheApiSubscriptionInsert(apiSubP, qTree, coordinatesP, contextP, tenantP->tenant, showChangesP);
subCacheApiSubscriptionInsert(apiSubP, qTree, coordinatesP, contextP, tenantP->tenant, showChangesP, sysAttrsP, renderFormat);
}

mongoc_client_pool_push(mongocPool, connectionP);
Expand Down
1 change: 1 addition & 0 deletions src/lib/orionld/notifications/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SET (SOURCES
previousValues.cpp
previousValuePopulate.cpp
previousValueAdd.cpp
sysAttrsStrip.cpp
)

# Include directories
Expand Down
4 changes: 3 additions & 1 deletion src/lib/orionld/notifications/alteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ extern "C"
//
// alteration -
//
void alteration(const char* entityId, const char* entityType, KjNode* apiEntityP, KjNode* incomingP, KjNode* dbEntityBeforeP)
OrionldAlteration* alteration(const char* entityId, const char* entityType, KjNode* apiEntityP, KjNode* incomingP, KjNode* dbEntityBeforeP)
{
OrionldAlteration* alterationP = (OrionldAlteration*) kaAlloc(&orionldState.kalloc, sizeof(OrionldAlteration));

Expand Down Expand Up @@ -71,4 +71,6 @@ void alteration(const char* entityId, const char* entityType, KjNode* apiEntityP
orionldState.alterationsTail = alterationP;

LM_T(LmtAlt, ("Added alteration for entity '%s'", entityId));

return alterationP;
}
2 changes: 1 addition & 1 deletion src/lib/orionld/notifications/alteration.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
//
// alteration -
//
extern void alteration(const char* entityId, const char* entityType, KjNode* apiEntityP, KjNode* incomingP, KjNode* dbEntityBeforeP);
extern OrionldAlteration* alteration(const char* entityId, const char* entityType, KjNode* apiEntityP, KjNode* incomingP, KjNode* dbEntityBeforeP);

#endif // SRC_LIB_ORIONLD_NOTIFICATIONS_ALTERATION_H_
9 changes: 6 additions & 3 deletions src/lib/orionld/notifications/notificationSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,11 @@ KjNode* notificationTree(OrionldAlterationMatch* matchList)

for (OrionldAlterationMatch* matchP = matchList; matchP != NULL; matchP = matchP->next)
{
kjTreeLog(matchP->altP->finalApiEntityP, "matchP->altP->finalApiEntityP", LmtSR);
KjNode* apiEntityP = matchP->altP->finalApiEntityP;
KjNode* apiEntityP = (subP->sysAttrs == false)? matchP->altP->finalApiEntityP : matchP->altP->finalApiEntityWithSysAttrsP;

LM_T(LmtSysAttrs, ("sysAttrs:%s, apiEntityP at %p", (subP->sysAttrs == true)? "true" : "false", apiEntityP));
if (apiEntityP == NULL)
apiEntityP = matchP->altP->finalApiEntityP; // Temporary !!!

// If the entity is already in "data", and, it's not a BATCH Operation, skip - already there
if (orionldState.serviceP->isBatchOp == false)
Expand All @@ -578,7 +581,6 @@ KjNode* notificationTree(OrionldAlterationMatch* matchList)
if (matchP->subP->attributes.size() > 0)
apiEntityP = attributeFilter(apiEntityP, matchP);

kjTreeLog(apiEntityP, "apiEntityP before entityFix", LmtSR);
apiEntityP = entityFix(apiEntityP, subP);
kjChildAdd(dataNodeP, apiEntityP);
}
Expand Down Expand Up @@ -634,6 +636,7 @@ int notificationSend(OrionldAlterationMatch* mAltP, double timestamp, CURL** cur
//
KjNode* notificationP = (ngsiv2 == false)? notificationTree(mAltP) : notificationTreeForNgsiV2(mAltP);
char* preferHeader = NULL;

if ((ngsiv2 == false) && (mAltP->subP->httpInfo.mimeType == GEOJSON))
{
char* geometryProperty = (char*) mAltP->subP->expression.geoproperty.c_str();
Expand Down
Loading

0 comments on commit 0def54a

Please sign in to comment.