Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distops/dist sub creation #1617

Merged
merged 6 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
* Support for attributes of type JsonProperty
* Support for the new URL parameter "format" for output formats (normalized, concise, simplified)
* New service: DELETE /ngsi-ld/v1/entities (URL param 'type' only - the rest are missing still)

* Distributed Subscriptions

## Notes
* TRoE is still not prepared for attributes of type Vocab/Json/Language, so, attributes of those types are not stored in the historical database
* Modified the @context hosting feature to be according to API spec
Expand Down
39 changes: 26 additions & 13 deletions src/app/orionld/orionld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
extern "C"
{
#include "kbase/kInit.h" // kInit
#include "kbase/kStringSplit.h" // kStringSplit
#include "kalloc/kaInit.h" // kaInit
#include "kalloc/kaBufferInit.h" // kaBufferInit
#include "kalloc/kaBufferReset.h" // kaBufferReset
Expand Down Expand Up @@ -321,7 +322,7 @@ bool noArrayReduction = false;
#define SOCKET_SERVICE_PORT_DESC "port to receive new socket service connections"
#define DISTRIBUTED_DESC "turn on distributed operation"
#define BROKER_ID_DESC "identity of this broker instance for registrations - for the Via header"
#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps)"
#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps,distSubs)"
#define FORWARDING_DESC "turn on distributed operation (deprecated)"
#define ID_INDEX_DESC "automatic mongo index on _id.id"
#define NOSWAP_DESC "no swapping - for testing only!!!"
Expand Down Expand Up @@ -1062,8 +1063,18 @@ int main(int argC, char* argV[])

if (wip[0] != 0)
{
if (strcmp(wip, "entityMaps") == 0)
entityMapsEnabled = true;
char* wipV[3];
int wips = kStringSplit(wip, ',', wipV, 3);

for (int ix = 0; ix < wips; ix++)
{
if (strcmp(wipV[ix], "entityMaps") == 0)
entityMapsEnabled = true;
else if (strcmp(wipV[ix], "distSubs") == 0)
distSubsEnabled = true;
else
LM_X(1, ("Invalid value for -wip comma-separated list (allowed: 'entityMaps', 'distSubs')"));
}
}

#if 0
Expand Down Expand Up @@ -1348,27 +1359,29 @@ int main(int argC, char* argV[])

LM_K(("Initialization is Done"));
LM_K((" Accepting REST requests on port %d (experimental API endpoints are %sabled)", port, (experimental == true)? "en" : "dis"));
LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled"));
LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled"));
LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled"));
LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled"));
LM_K((" Entity Maps: %s", (entityMapsEnabled == true)? "Enabled" : "Disabled"));
LM_K((" Distributed Subscriptions: %s", (distSubsEnabled == true)? "Enabled" : "Disabled"));

if (troe)
LM_K((" Postgres Server Version: %s", postgresServerVersion));
LM_K((" Postgres Server Version: %s", postgresServerVersion));

LM_K((" Mongo Server Version: %s", mongocServerVersion));
LM_K((" Mongo Server Version: %s", mongocServerVersion));

if (mongocOnly == true)
{
LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
}
else if (experimental == true)
{
LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2"));
LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S));
}
else
LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)"));
LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)"));

// Startup is done - we can free up the allocated kalloc buffers - assuming socketService doesn't use kalloc ...
kaBufferReset(&orionldState.kalloc, KFALSE);
Expand Down
2 changes: 2 additions & 0 deletions src/app/orionld/orionldRestServices.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "orionld/serviceRoutines/orionldPostEntities.h"
#include "orionld/serviceRoutines/orionldPostNotify.h"
#include "orionld/serviceRoutines/orionldPostNotification.h"
#include "orionld/serviceRoutines/orionldPostEntity.h"
#include "orionld/serviceRoutines/orionldPostSubscriptions.h"
#include "orionld/serviceRoutines/orionldPostRegistrations.h"
Expand Down Expand Up @@ -123,6 +124,7 @@ static OrionLdRestServiceSimplified postServiceV[] =
{ "/ngsi-ld/v1/entities/*/attrs", orionldPostEntity },
{ "/ngsi-ld/v1/entities", orionldPostEntities },
{ "/ngsi-ld/ex/v1/notify", orionldPostNotify },
{ "/ngsi-ld/ex/v1/notifications/*", orionldPostNotification },
{ "/ngsi-ld/v1/entityOperations/create", orionldPostBatchCreate },
{ "/ngsi-ld/v1/entityOperations/upsert", orionldPostBatchUpsert },
{ "/ngsi-ld/v1/entityOperations/update", orionldPostBatchUpdate },
Expand Down
3 changes: 3 additions & 0 deletions src/lib/cache/CachedSubscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "orionld/types/OrionldAlteration.h" // OrionldAlterationTypes
#include "orionld/types/OrionldTenant.h" // OrionldTenant
#include "orionld/types/OrionldContext.h" // OrionldContext
#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription



Expand Down Expand Up @@ -132,6 +133,8 @@ struct CachedSubscription
double createdAt;
double modifiedAt;

SubordinateSubscription* subordinateP; // Linked list of subordinate subscriptions

struct CachedSubscription* next;
bool inDB; // Used by mongocSubCachePopulateByTenant to find subs that have been removed
};
Expand Down
1 change: 1 addition & 0 deletions src/lib/logMsg/traceLevels.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ typedef enum TraceLevels
LmtDistOpResponseDetail, // Details on responses to distributed requests
LmtDistOpResponseHeaders, // HTTP headers of responses to distributed requests
LmtRegMatch, // Distributed Operations: registration matching
LmtDistOpSubMatch, // Matching subscriptions with registrations

//
// Distributed Operations - misc
Expand Down
1 change: 0 additions & 1 deletion src/lib/orionld/common/orionldRequestSend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*
* Author: Ken Zangelin
*/
#include <strings.h> // bcopy
#include <curl/curl.h> // curl

#include "logMsg/logMsg.h" // LM_*
Expand Down
1 change: 1 addition & 0 deletions src/lib/orionld/common/orionldState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ size_t hostHeaderLen;
PernotSubCache pernotSubCache;
EntityMap* entityMaps = NULL; // Used by GET /entities in the distributed case, for pagination
bool entityMapsEnabled = false;
bool distSubsEnabled = false;



Expand Down
3 changes: 2 additions & 1 deletion src/lib/orionld/common/orionldState.h
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ extern bool noCache; // From orionld.cpp
extern uint32_t cSubCounters; // Number of subscription counter updates before flush from sub-cache to DB
extern PernotSubCache pernotSubCache;
extern EntityMap* entityMaps; // Used by GET /entities in the distributed case, for pagination
extern bool entityMapsEnabled;
extern bool entityMapsEnabled; // Enable Entity Maps
extern bool distSubsEnabled; // Enable distributed subscriptions
extern bool noArrayReduction; // Used by arrayReduce in pCheckAttribute.cpp

extern char localIpAndPort[135]; // Local address for X-Forwarded-For (from orionld.cpp)
Expand Down
25 changes: 24 additions & 1 deletion src/lib/orionld/dbModel/dbModelToApiSubscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C"

#include "orionld/types/QNode.h" // QNode
#include "orionld/types/OrionldRenderFormat.h" // OrionldRenderFormat
#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription
#include "orionld/common/orionldState.h" // orionldState
#include "orionld/common/orionldError.h" // orionldError
#include "orionld/common/numberToDate.h" // numberToDate
Expand Down Expand Up @@ -168,7 +169,13 @@ static bool notificationStatus(KjNode* dbLastSuccessP, KjNode* dbLastFailureP)
// }
// },
// "expiresAt": "2028-12-31T10:00:00", => "expiresAt" => "expiration"
// "throttling": 5 => SAME
// "throttling": 5, => SAME
// "subordinate": [
// {
// "registrationId": "urn:R1",
// "subscriptionId": "urn:ngsi-ld:Subscription:S1:1"
// }
// ]
// }
//
KjNode* dbModelToApiSubscription
Expand Down Expand Up @@ -215,6 +222,7 @@ KjNode* dbModelToApiSubscription
KjNode* dbCreatedAtP = NULL;
KjNode* dbModifiedAtP = NULL;
KjNode* timeIntervalNodeP = kjLookup(dbSubP, "timeInterval");
KjNode* subordinateNodeP = kjLookup(dbSubP, "subordinate");

if ((orionldState.uriParamOptions.sysAttrs == true) || (forSubCache == true))
{
Expand Down Expand Up @@ -693,6 +701,21 @@ KjNode* dbModelToApiSubscription
dbLdContextP->name = (char*) "jsonldContext";
}

//
//
if (subordinateNodeP != NULL)
{
kjChildRemove(dbSubP, subordinateNodeP);
kjChildAdd(apiSubP, subordinateNodeP);

for (KjNode* subordinateP = subordinateNodeP->value.firstChildP; subordinateP != NULL; subordinateP = subordinateP->next)
{
KjNode* runNoP = kjLookup(subordinateP, "runNo");

if (runNoP != NULL)
kjChildRemove(subordinateP, runNoP);
}
}

if (qNodePP != NULL) // FIXME: This is more than a bit weird ...
*qNodePP = NULL;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/orionld/http/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
SET (SOURCES
httpHeaderLocationAdd.cpp
httpHeaderLinkAdd.cpp
httpRequest.cpp
httpRequestHeaderAdd.cpp
)

# Include directories
Expand Down
Loading
Loading