Skip to content

Commit

Permalink
Merge branch 'develop' into issue/1557
Browse files Browse the repository at this point in the history
  • Loading branch information
kzangeli committed Jun 5, 2024
2 parents 72c8e49 + f77788a commit 232c3e3
Show file tree
Hide file tree
Showing 28 changed files with 1,542 additions and 17 deletions.
3 changes: 2 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
* Support for attributes of type JsonProperty
* Support for the new URL parameter "format" for output formats (normalized, concise, simplified)
* New service: DELETE /ngsi-ld/v1/entities (URL param 'type' only - the rest are missing still)

* Distributed Subscriptions

## Notes
* TRoE is still not prepared for attributes of type Vocab/Json/Language, so, attributes of those types are not stored in the historical database
* Modified the @context hosting feature to be according to API spec
Expand Down
39 changes: 26 additions & 13 deletions src/app/orionld/orionld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
extern "C"
{
#include "kbase/kInit.h" // kInit
#include "kbase/kStringSplit.h" // kStringSplit
#include "kalloc/kaInit.h" // kaInit
#include "kalloc/kaBufferInit.h" // kaBufferInit
#include "kalloc/kaBufferReset.h" // kaBufferReset
Expand Down Expand Up @@ -321,7 +322,7 @@ bool noArrayReduction = false;
#define SOCKET_SERVICE_PORT_DESC "port to receive new socket service connections"
#define DISTRIBUTED_DESC "turn on distributed operation"
#define BROKER_ID_DESC "identity of this broker instance for registrations - for the Via header"
#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps)"
#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps,distSubs)"
#define FORWARDING_DESC "turn on distributed operation (deprecated)"
#define ID_INDEX_DESC "automatic mongo index on _id.id"
#define NOSWAP_DESC "no swapping - for testing only!!!"
Expand Down Expand Up @@ -1062,8 +1063,18 @@ int main(int argC, char* argV[])

if (wip[0] != 0)
{
if (strcmp(wip, "entityMaps") == 0)
entityMapsEnabled = true;
char* wipV[3];
int wips = kStringSplit(wip, ',', wipV, 3);

for (int ix = 0; ix < wips; ix++)
{
if (strcmp(wipV[ix], "entityMaps") == 0)
entityMapsEnabled = true;
else if (strcmp(wipV[ix], "distSubs") == 0)
distSubsEnabled = true;
else
LM_X(1, ("Invalid value for -wip comma-separated list (allowed: 'entityMaps', 'distSubs')"));
}
}

#if 0
Expand Down Expand Up @@ -1348,27 +1359,29 @@ int main(int argC, char* argV[])

LM_K(("Initialization is Done"));
LM_K((" Accepting REST requests on port %d (experimental API endpoints are %sabled)", port, (experimental == true)? "en" : "dis"));
LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled"));
LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled"));
LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled"));
LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled"));
LM_K((" Entity Maps: %s", (entityMapsEnabled == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Subscriptions: %s", (distSubsEnabled == true)? "Enabled" : "Disabled"));

if (troe)
LM_K((" Postgres Server Version: %s", postgresServerVersion));
LM_K((" Postgres Server Version: %s", postgresServerVersion));

LM_K((" Mongo Server Version: %s", mongocServerVersion));
LM_K((" Mongo Server Version: %s", mongocServerVersion));

if (mongocOnly == true)
{
LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
}
else if (experimental == true)
{
LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
}
else
LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)"));
LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)"));

// Startup is done - we can free up the allocated kalloc buffers - assuming socketService doesn't use kalloc ...
kaBufferReset(&orionldState.kalloc, KFALSE);
Expand Down
2 changes: 2 additions & 0 deletions src/app/orionld/orionldRestServices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "orionld/serviceRoutines/orionldPostEntities.h"
#include "orionld/serviceRoutines/orionldPostNotify.h"
#include "orionld/serviceRoutines/orionldPostNotification.h"
#include "orionld/serviceRoutines/orionldPostEntity.h"
#include "orionld/serviceRoutines/orionldPostSubscriptions.h"
#include "orionld/serviceRoutines/orionldPostRegistrations.h"
Expand Down Expand Up @@ -123,6 +124,7 @@ static OrionLdRestServiceSimplified postServiceV[] =
{ "/ngsi-ld/v1/entities/*/attrs", orionldPostEntity },
{ "/ngsi-ld/v1/entities", orionldPostEntities },
{ "/ngsi-ld/ex/v1/notify", orionldPostNotify },
{ "/ngsi-ld/ex/v1/notifications/*", orionldPostNotification },
{ "/ngsi-ld/v1/entityOperations/create", orionldPostBatchCreate },
{ "/ngsi-ld/v1/entityOperations/upsert", orionldPostBatchUpsert },
{ "/ngsi-ld/v1/entityOperations/update", orionldPostBatchUpdate },
Expand Down
3 changes: 3 additions & 0 deletions src/lib/cache/CachedSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "orionld/types/OrionldAlteration.h" // OrionldAlterationTypes
#include "orionld/types/OrionldTenant.h" // OrionldTenant
#include "orionld/types/OrionldContext.h" // OrionldContext
#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription



Expand Down Expand Up @@ -132,6 +133,8 @@ struct CachedSubscription
double createdAt;
double modifiedAt;

SubordinateSubscription* subordinateP; // Linked list of subordinate subscriptions

struct CachedSubscription* next;
bool inDB; // Used by mongocSubCachePopulateByTenant to find subs that have been removed
};
Expand Down
1 change: 1 addition & 0 deletions src/lib/logMsg/traceLevels.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ typedef enum TraceLevels
LmtDistOpResponseDetail, // Details on responses to distributed requests
LmtDistOpResponseHeaders, // HTTP headers of responses to distributed requests
LmtRegMatch, // Distributed Operations: registration matching
LmtDistOpSubMatch, // Matching subscriptions with registrations

//
// Distributed Operations - misc
Expand Down
1 change: 0 additions & 1 deletion src/lib/orionld/common/orionldRequestSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*
* Author: Ken Zangelin
*/
#include <strings.h> // bcopy
#include <curl/curl.h> // curl

#include "logMsg/logMsg.h" // LM_*
Expand Down
1 change: 1 addition & 0 deletions src/lib/orionld/common/orionldState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ size_t hostHeaderLen;
PernotSubCache pernotSubCache;
EntityMap* entityMaps = NULL; // Used by GET /entities in the distributed case, for pagination
bool entityMapsEnabled = false;
bool distSubsEnabled = false;



Expand Down
3 changes: 2 additions & 1 deletion src/lib/orionld/common/orionldState.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ extern bool noCache; // From orionld.cpp
extern uint32_t cSubCounters; // Number of subscription counter updates before flush from sub-cache to DB
extern PernotSubCache pernotSubCache;
extern EntityMap* entityMaps; // Used by GET /entities in the distributed case, for pagination
extern bool entityMapsEnabled;
extern bool entityMapsEnabled; // Enable Entity Maps
extern bool distSubsEnabled; // Enable distributed subscriptions
extern bool noArrayReduction; // Used by arrayReduce in pCheckAttribute.cpp

extern char localIpAndPort[135]; // Local address for X-Forwarded-For (from orionld.cpp)
Expand Down
25 changes: 24 additions & 1 deletion src/lib/orionld/dbModel/dbModelToApiSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C"

#include "orionld/types/QNode.h" // QNode
#include "orionld/types/OrionldRenderFormat.h" // OrionldRenderFormat
#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription
#include "orionld/common/orionldState.h" // orionldState
#include "orionld/common/orionldError.h" // orionldError
#include "orionld/common/numberToDate.h" // numberToDate
Expand Down Expand Up @@ -168,7 +169,13 @@ static bool notificationStatus(KjNode* dbLastSuccessP, KjNode* dbLastFailureP)
// }
// },
// "expiresAt": "2028-12-31T10:00:00", => "expiresAt" => "expiration"
// "throttling": 5 => SAME
// "throttling": 5, => SAME
// "subordinate": [
// {
// "registrationId": "urn:R1",
// "subscriptionId": "urn:ngsi-ld:Subscription:S1:1"
// }
// ]
// }
//
KjNode* dbModelToApiSubscription
Expand Down Expand Up @@ -215,6 +222,7 @@ KjNode* dbModelToApiSubscription
KjNode* dbCreatedAtP = NULL;
KjNode* dbModifiedAtP = NULL;
KjNode* timeIntervalNodeP = kjLookup(dbSubP, "timeInterval");
KjNode* subordinateNodeP = kjLookup(dbSubP, "subordinate");

if ((orionldState.uriParamOptions.sysAttrs == true) || (forSubCache == true))
{
Expand Down Expand Up @@ -693,6 +701,21 @@ KjNode* dbModelToApiSubscription
dbLdContextP->name = (char*) "jsonldContext";
}

//
//
if (subordinateNodeP != NULL)
{
kjChildRemove(dbSubP, subordinateNodeP);
kjChildAdd(apiSubP, subordinateNodeP);

for (KjNode* subordinateP = subordinateNodeP->value.firstChildP; subordinateP != NULL; subordinateP = subordinateP->next)
{
KjNode* runNoP = kjLookup(subordinateP, "runNo");

if (runNoP != NULL)
kjChildRemove(subordinateP, runNoP);
}
}

if (qNodePP != NULL) // FIXME: This is more than a bit weird ...
*qNodePP = NULL;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/orionld/http/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
SET (SOURCES
httpHeaderLocationAdd.cpp
httpHeaderLinkAdd.cpp
httpRequest.cpp
httpRequestHeaderAdd.cpp
)

# Include directories
Expand Down
Loading

0 comments on commit 232c3e3

Please sign in to comment.