From e67156cda5e91186293bde3bd3470f5247a27a3c Mon Sep 17 00:00:00 2001 From: Ken Zangelin Date: Fri, 17 May 2024 21:45:35 +0200 Subject: [PATCH] Refactor - moved classes to their own modules and all method implementation to *.cpp --- src/lib/orionld/dds/CMakeLists.txt | 3 + .../orionld/dds/DdsNotificationReceiver.cpp | 139 +++++++++++ src/lib/orionld/dds/DdsNotificationReceiver.h | 79 +++++++ src/lib/orionld/dds/DdsNotificationSender.cpp | 64 +++++ src/lib/orionld/dds/DdsNotificationSender.h | 65 ++++++ src/lib/orionld/dds/NgsildPublisher.h | 61 +---- src/lib/orionld/dds/NgsildSubscriber.cpp | 130 +++++++++++ src/lib/orionld/dds/NgsildSubscriber.h | 221 ++---------------- src/lib/orionld/dds/ddsSubscribe.cpp | 2 +- 9 files changed, 512 insertions(+), 252 deletions(-) create mode 100644 src/lib/orionld/dds/DdsNotificationReceiver.cpp create mode 100644 src/lib/orionld/dds/DdsNotificationReceiver.h create mode 100644 src/lib/orionld/dds/DdsNotificationSender.cpp create mode 100644 src/lib/orionld/dds/DdsNotificationSender.h create mode 100644 src/lib/orionld/dds/NgsildSubscriber.cpp diff --git a/src/lib/orionld/dds/CMakeLists.txt b/src/lib/orionld/dds/CMakeLists.txt index 86f53aa312..a0996752cd 100644 --- a/src/lib/orionld/dds/CMakeLists.txt +++ b/src/lib/orionld/dds/CMakeLists.txt @@ -24,6 +24,9 @@ SET (SOURCES NgsildEntity.cxx NgsildEntityPubSubTypes.cxx NgsildPublisher.cpp + NgsildSubscriber.cpp + DdsNotificationReceiver.cpp + DdsNotificationSender.cpp ddsSubscribe.cpp ddsPublish.cpp kjTreeLog.cpp diff --git a/src/lib/orionld/dds/DdsNotificationReceiver.cpp b/src/lib/orionld/dds/DdsNotificationReceiver.cpp new file mode 100644 index 0000000000..589cff7841 --- /dev/null +++ b/src/lib/orionld/dds/DdsNotificationReceiver.cpp @@ -0,0 +1,139 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +*/ +#include "fastdds/dds/domain/DomainParticipant.hpp" +#include "fastdds/dds/domain/DomainParticipantFactory.hpp" +#include "fastdds/dds/subscriber/DataReader.hpp" +#include "fastdds/dds/subscriber/DataReaderListener.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" +#include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/Subscriber.hpp" +#include "fastdds/dds/topic/TypeSupport.hpp" + +extern "C" +{ +#include "ktrace/kTrace.h" // trace messages - ktrace library +#include "kjson/KjNode.h" // KjNode +#include "kjson/kjBuilder.h" // kjObject, kjString, ... +#include "kjson/kjParse.h" // kjParse +#include "kjson/kjClone.h" // kjClone +} + +#include "orionld/common/orionldState.h" // orionldState +#include "orionld/common/traceLevels.h" // Trace levels +#include "orionld/dds/config.h" // DDS_RELIABLE, ... +#include "orionld/dds/kjTreeLog.h" // kjTreeLog2 +#include "orionld/dds/NgsildEntity.h" // NgsildEntity +#include "orionld/dds/DdsNotificationReceiver.h" // The class + +using namespace eprosima::fastdds::dds; + + + +// ----------------------------------------------------------------------------- +// +// ddsDumpArray - accumulating data from DDS notifications +// +extern KjNode* ddsDumpArray; + + + +void DdsNotificationReceiver::on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) +{ + if (info.current_count_change == 1) + KT_T(StDds, "Subscriber matched."); + else if (info.current_count_change == -1) + KT_T(StDds, "Subscriber unmatched."); + else + KT_T(StDds, "'%d' is not a valid value for SubscriptionMatchedStatus current count change", info.current_count_change); +} + +void DdsNotificationReceiver::on_data_available(DataReader* reader) +{ + SampleInfo info; + + KT_T(StDds, "Notification arrived"); + + if (reader->take_next_sample(&ngsildEntity_, &info) == ReturnCode_t::RETCODE_OK) + { + if (info.valid_data) + { + samples_++; + + // + // This is "more or less" how it should work: + // KjNode* entityP = kjEntityFromDds(&ngsildEntity_); + // notificationReceived(entityP); + // The callback 'notificationReceived' is set in some constructor or init() method + // + KT_T(StDds, "Entity Id: %s with type: %s RECEIVED.", ngsildEntity_.id().c_str(), ngsildEntity_.type().c_str()); + + // + // Accumulate notifications + // + KjNode* dump = kjObject(NULL, "item"); // No name as it is part of an array + KjNode* tenantP = (ngsildEntity_.tenant() != "")? kjString(NULL, "tenant", ngsildEntity_.tenant().c_str()) : NULL; + KjNode* idP = (ngsildEntity_.id() != "")? kjString(NULL, "id", ngsildEntity_.id().c_str()) : NULL; + KjNode* typeP = (ngsildEntity_.type() != "")? kjString(NULL, "type", ngsildEntity_.type().c_str()) : NULL; + KjNode* scopeP = (ngsildEntity_.scope() != "")? kjString(NULL, "scope", ngsildEntity_.scope().c_str()) : NULL; + KjNode* createdAtP = (ngsildEntity_.createdAt() != 0)? kjInteger(NULL, "createdAt", ngsildEntity_.createdAt()) : NULL; + KjNode* modifiedAtP = (ngsildEntity_.modifiedAt() != 0)? kjInteger(NULL, "modifiedAt", ngsildEntity_.modifiedAt()) : NULL; + char* attributes = (ngsildEntity_.attributes() != "")? (char*) ngsildEntity_.attributes().c_str() : NULL; + + if (tenantP != NULL) kjChildAdd(dump, tenantP); + if (idP != NULL) kjChildAdd(dump, idP); + if (typeP != NULL) kjChildAdd(dump, typeP); + if (scopeP != NULL) kjChildAdd(dump, scopeP); + if (createdAtP != NULL) kjChildAdd(dump, createdAtP); + if (modifiedAtP != NULL) kjChildAdd(dump, modifiedAtP); + + if (attributes != NULL) + { + KT_T(StDds, "Entity '%s' has attributes: '%s'", ngsildEntity_.id().c_str(), attributes); + + // Initializing orionldState, to call kjParse (not really necessary, it's overkill) + orionldStateInit(NULL); + + // parse the string 'attributes' and add all attributes to 'dump' + KjNode* attrsNode = kjParse(orionldState.kjsonP, attributes); + if (attrsNode != NULL) + attrsNode = kjClone(NULL, attrsNode); + KT_T(StDds, "After kjParse"); + + kjTreeLog2(attrsNode, "attrsNode", StDds); + kjTreeLog2(dump, "dump w/o attrs", StDds); + // Concatenate the attributes to the "dump entity" + dump->lastChild->next = attrsNode->value.firstChildP; + dump->lastChild = attrsNode->lastChild; + kjTreeLog2(dump, "dump with attrs", StDds); + } + else + KT_T(StDds, "Entity Id: %s has no attributes", ngsildEntity_.id().c_str()); + + if (ddsDumpArray == NULL) + ddsDumpArray = kjArray(NULL, "ddsDumpArray"); + + kjChildAdd(ddsDumpArray, dump); + } + } +} diff --git a/src/lib/orionld/dds/DdsNotificationReceiver.h b/src/lib/orionld/dds/DdsNotificationReceiver.h new file mode 100644 index 0000000000..e1a682fa3b --- /dev/null +++ b/src/lib/orionld/dds/DdsNotificationReceiver.h @@ -0,0 +1,79 @@ +#ifndef SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONRECEIVER_H_ +#define SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONRECEIVER_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +*/ + +// +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +#include "fastdds/dds/domain/DomainParticipant.hpp" +#include "fastdds/dds/domain/DomainParticipantFactory.hpp" +#include "fastdds/dds/subscriber/DataReader.hpp" +#include "fastdds/dds/subscriber/DataReaderListener.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" +#include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/Subscriber.hpp" +#include "fastdds/dds/topic/TypeSupport.hpp" + +#include "orionld/dds/config.h" // DDS_RELIABLE, ... +#include "orionld/dds/kjTreeLog.h" // kjTreeLog2 + + + +using namespace eprosima::fastdds::dds; + + + +// ----------------------------------------------------------------------------- +// +// DdsNotificationReceiver - +// +// FIXME: All the implementation to DdsNotificationReceiver.cpp +// +class DdsNotificationReceiver : public DataReaderListener +{ + public: + DdsNotificationReceiver() : samples_(0) { } + ~DdsNotificationReceiver() override { } + + void on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) override; + void on_data_available(DataReader* reader) override; + NgsildEntity ngsildEntity_; + std::atomic_int samples_; +}; + +#endif // SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONRECEIVER_H_ diff --git a/src/lib/orionld/dds/DdsNotificationSender.cpp b/src/lib/orionld/dds/DdsNotificationSender.cpp new file mode 100644 index 0000000000..571c06f3eb --- /dev/null +++ b/src/lib/orionld/dds/DdsNotificationSender.cpp @@ -0,0 +1,64 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +*/ +#include +#include +#include +#include +#include +#include + +extern "C" +{ +#include "ktrace/kTrace.h" // trace messages - ktrace library +#include "kjson/KjNode.h" // KjNode +} + +#include "orionld/common/traceLevels.h" // Trace Levels +#include "orionld/dds/NgsildPublisher.h" // The class + + + +// ----------------------------------------------------------------------------- +// +// DdsNotificationSender::on_publication_matched - +// +void DdsNotificationSender::on_publication_matched(DataWriter*, const PublicationMatchedStatus& info) +{ + // FIXME: Don't Publish until entering here! (mutex) + KT_V("info.current_count_change: %d", info.current_count_change); + if (info.current_count_change == 1) + { + matched_ = info.total_count; + KT_T(StDds, "Publisher matched."); + ready_ = true; + } + else if (info.current_count_change == -1) + { + matched_ = info.total_count; + KT_T(StDds, "Publisher unmatched."); + ready_ = false; + } + else + KT_T(StDds, "'%d' is not a valid value for PublicationMatchedStatus current count change.", info.total_count); +} diff --git a/src/lib/orionld/dds/DdsNotificationSender.h b/src/lib/orionld/dds/DdsNotificationSender.h new file mode 100644 index 0000000000..6454f57065 --- /dev/null +++ b/src/lib/orionld/dds/DdsNotificationSender.h @@ -0,0 +1,65 @@ +#ifndef SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONSENDER_H_ +#define SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONSENDER_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +*/ + +// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +using namespace eprosima::fastdds::dds; + + + +// ----------------------------------------------------------------------------- +// +// DdsNotificationSender - +// +class DdsNotificationSender : public DataWriterListener +{ + public: + bool ready_; + + DdsNotificationSender() : ready_(false), matched_(0) {} + ~DdsNotificationSender() override {} + + void on_publication_matched(DataWriter*, const PublicationMatchedStatus& info) override; + std::atomic_int matched_; +}; + +#endif // SRC_LIB_ORIONLD_DDS_DDSNOTIFICATIONSENDER_H_ diff --git a/src/lib/orionld/dds/NgsildPublisher.h b/src/lib/orionld/dds/NgsildPublisher.h index a54a6a3a1c..21e5a9e99f 100644 --- a/src/lib/orionld/dds/NgsildPublisher.h +++ b/src/lib/orionld/dds/NgsildPublisher.h @@ -60,11 +60,12 @@ extern "C" #include "orionld/dds/NgsildEntityPubSubTypes.h" #include "orionld/dds/NgsildEntity.h" +#include "orionld/dds/DdsNotificationSender.h" // DdsNotificationSender using namespace eprosima::fastdds::dds; -// NgsildPublisher::_on_ + // ----------------------------------------------------------------------------- // // NgsildPublisher - @@ -72,53 +73,13 @@ using namespace eprosima::fastdds::dds; class NgsildPublisher // : DataWriterListener { private: - NgsildEntity entity_; - DomainParticipant* participant_; - Publisher* publisher_; - Topic* topic_; - DataWriter* writer_; - TypeSupport type_; - - class PubListener : public DataWriterListener - { - public: - bool ready_; - - PubListener() : ready_(false), matched_(0) - { - } - - ~PubListener() override - { - } - - void on_publication_matched - ( - DataWriter*, - const PublicationMatchedStatus& info - ) - override - { - // FIXME: Don't Publish until entering here! (mutex) - KT_V("info.current_count_change: %d", info.current_count_change); - if (info.current_count_change == 1) - { - matched_ = info.total_count; - KT_T(StDds, "Publisher matched."); - ready_ = true; - } - else if (info.current_count_change == -1) - { - matched_ = info.total_count; - KT_T(StDds, "Publisher unmatched."); - ready_ = false; - } - else - KT_T(StDds, "'%d' is not a valid value for PublicationMatchedStatus current count change.", info.total_count); - } - - std::atomic_int matched_; - } listener_; + NgsildEntity entity_; + DomainParticipant* participant_; + Publisher* publisher_; + Topic* topic_; + DataWriter* writer_; + TypeSupport type_; + DdsNotificationSender listener_; public: explicit NgsildPublisher(const char* topicType) @@ -131,8 +92,8 @@ class NgsildPublisher // : DataWriterListener } virtual ~NgsildPublisher(); - bool init(const char* topicName); - bool publish(KjNode* entityP); + bool init(const char* topicName); + bool publish(KjNode* entityP); }; #endif // SRC_LIB_ORIONLD_DDS_NGSILDPUBLISHER_H_ diff --git a/src/lib/orionld/dds/NgsildSubscriber.cpp b/src/lib/orionld/dds/NgsildSubscriber.cpp new file mode 100644 index 0000000000..7953093f0f --- /dev/null +++ b/src/lib/orionld/dds/NgsildSubscriber.cpp @@ -0,0 +1,130 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +*/ +#include "fastdds/dds/domain/DomainParticipant.hpp" +#include "fastdds/dds/domain/DomainParticipantFactory.hpp" +#include "fastdds/dds/subscriber/DataReader.hpp" +#include "fastdds/dds/subscriber/DataReaderListener.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" +#include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/Subscriber.hpp" +#include "fastdds/dds/topic/TypeSupport.hpp" + +#include "orionld/dds/NgsildSubscriber.h" // The class + +using namespace eprosima::fastdds::dds; + + + +// ----------------------------------------------------------------------------- +// +// NgsildSubscriber::~NgsildSubscriber - +// +NgsildSubscriber::~NgsildSubscriber() +{ + if (reader_ != nullptr) + subscriber_->delete_datareader(reader_); + + if (topic_ != nullptr) + participant_->delete_topic(topic_); + + if (subscriber_ != nullptr) + participant_->delete_subscriber(subscriber_); + + DomainParticipantFactory::get_instance()->delete_participant(participant_); +} + + + +// ----------------------------------------------------------------------------- +// +// NgsildSubscriber::init - +// +bool NgsildSubscriber::init(const char* topicName) +{ + DomainParticipantQos participantQos; + + participantQos.name("Participant_subscriber"); + participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos); + + if (participant_ == nullptr) + return false; + + // Register the Type + type_.register_type(participant_); + + // Create the subscriptions Topic + const char* topicType = type_->getName(); + topic_ = participant_->create_topic(topicName, topicType, TOPIC_QOS_DEFAULT); + + if (topic_ == nullptr) + { + KT_V("Error creating topic (type: '%s') '%s'", topicType, topicName); + return false; + } + + // Create the Subscriber + subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); + if (subscriber_ == nullptr) + return false; + + // Create the DataReader + KT_V("Creating reader"); +#ifdef DDS_RELIABLE + DataReaderQos rqos = DATAREADER_QOS_DEFAULT; + + rqos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; + rqos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; +// rqos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; +// rqos.history().depth = 5; + reader_ = subscriber_->create_datareader(topic_, rqos, &listener_); +#else + reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_); +#endif + + if (reader_ == nullptr) + { + KT_E("Error creating DataReader"); + return false; + } + + KT_V("Created reader"); + KT_V("Init done"); + + return true; +} + + + +// ----------------------------------------------------------------------------- +// +// NgsildSubscriber::run - +// +void NgsildSubscriber::run(void) +{ + KT_V("Awaiting notifications"); + while (1) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100000)); + } +} diff --git a/src/lib/orionld/dds/NgsildSubscriber.h b/src/lib/orionld/dds/NgsildSubscriber.h index 1439c0c35f..bce16dd88e 100644 --- a/src/lib/orionld/dds/NgsildSubscriber.h +++ b/src/lib/orionld/dds/NgsildSubscriber.h @@ -43,14 +43,14 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include "fastdds/dds/domain/DomainParticipant.hpp" +#include "fastdds/dds/domain/DomainParticipantFactory.hpp" +#include "fastdds/dds/subscriber/DataReader.hpp" +#include "fastdds/dds/subscriber/DataReaderListener.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" +#include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/Subscriber.hpp" +#include "fastdds/dds/topic/TypeSupport.hpp" extern "C" { @@ -63,21 +63,13 @@ extern "C" #include "orionld/common/orionldState.h" // orionldState #include "orionld/common/traceLevels.h" // Trace Levels #include "orionld/dds/NgsildEntityPubSubTypes.h" // DDS stuff ... +#include "orionld/dds/DdsNotificationReceiver.h" // DdsNotificationReceiver #include "orionld/dds/config.h" // DDS_RELIABLE, ... -#include "orionld/dds/kjTreeLog.h" // kjTreeLog2 using namespace eprosima::fastdds::dds; -// ----------------------------------------------------------------------------- -// -// ddsDumpArray - accumulating data from DDS notifications -// -extern KjNode* ddsDumpArray; - - - // ----------------------------------------------------------------------------- // // NgsildSubscriber - @@ -87,103 +79,14 @@ extern KjNode* ddsDumpArray; class NgsildSubscriber { private: - DomainParticipant* participant_; - Subscriber* subscriber_; - DataReader* reader_; - Topic* topic_; - TypeSupport type_; - - class SubListener : public DataReaderListener - { - public: - SubListener() : samples_(0) { } - ~SubListener() override { } - - void on_subscription_matched(DataReader*, const SubscriptionMatchedStatus& info) override - { - if (info.current_count_change == 1) - KT_T(StDds, "Subscriber matched."); - else if (info.current_count_change == -1) - KT_T(StDds, "Subscriber unmatched."); - else - KT_T(StDds, "'%d' is not a valid value for SubscriptionMatchedStatus current count change", info.current_count_change); - } - - void on_data_available(DataReader* reader) override - { - SampleInfo info; - - KT_T(StDds, "Notification arrived"); - - if (reader->take_next_sample(&ngsildEntity_, &info) == ReturnCode_t::RETCODE_OK) - { - if (info.valid_data) - { - samples_++; - - // - // This is "more or less" how it should work: - // KjNode* entityP = kjEntityFromDds(&ngsildEntity_); - // notificationReceived(entityP); - // The callback 'notificationReceived' is set in some constructor or init() method - // - KT_T(StDds, "Entity Id: %s with type: %s RECEIVED.", ngsildEntity_.id().c_str(), ngsildEntity_.type().c_str()); - - // - // Accumulate notifications - // - KjNode* dump = kjObject(NULL, "item"); // No name as it is part of an array - KjNode* tenantP = (ngsildEntity_.tenant() != "")? kjString(NULL, "tenant", ngsildEntity_.tenant().c_str()) : NULL; - KjNode* idP = (ngsildEntity_.id() != "")? kjString(NULL, "id", ngsildEntity_.id().c_str()) : NULL; - KjNode* typeP = (ngsildEntity_.type() != "")? kjString(NULL, "type", ngsildEntity_.type().c_str()) : NULL; - KjNode* scopeP = (ngsildEntity_.scope() != "")? kjString(NULL, "scope", ngsildEntity_.scope().c_str()) : NULL; - KjNode* createdAtP = (ngsildEntity_.createdAt() != 0)? kjInteger(NULL, "createdAt", ngsildEntity_.createdAt()) : NULL; - KjNode* modifiedAtP = (ngsildEntity_.modifiedAt() != 0)? kjInteger(NULL, "modifiedAt", ngsildEntity_.modifiedAt()) : NULL; - char* attributes = (ngsildEntity_.attributes() != "")? (char*) ngsildEntity_.attributes().c_str() : NULL; - - if (tenantP != NULL) kjChildAdd(dump, tenantP); - if (idP != NULL) kjChildAdd(dump, idP); - if (typeP != NULL) kjChildAdd(dump, typeP); - if (scopeP != NULL) kjChildAdd(dump, scopeP); - if (createdAtP != NULL) kjChildAdd(dump, createdAtP); - if (modifiedAtP != NULL) kjChildAdd(dump, modifiedAtP); - - if (attributes != NULL) - { - KT_T(StDds, "Entity '%s' has attributes: '%s'", ngsildEntity_.id().c_str(), attributes); - - // Initializing orionldState, to call kjParse (not really necessary, it's overkill) - orionldStateInit(NULL); - - // parse the string 'attributes' and add all attributes to 'dump' - KjNode* attrsNode = kjParse(orionldState.kjsonP, attributes); - if (attrsNode != NULL) - attrsNode = kjClone(NULL, attrsNode); - KT_T(StDds, "After kjParse"); - - kjTreeLog2(attrsNode, "attrsNode", StDds); - kjTreeLog2(dump, "dump w/o attrs", StDds); - // Concatenate the attributes to the "dump entity" - dump->lastChild->next = attrsNode->value.firstChildP; - dump->lastChild = attrsNode->lastChild; - kjTreeLog2(dump, "dump with attrs", StDds); - } - else - KT_T(StDds, "Entity Id: %s has no attributes", ngsildEntity_.id().c_str()); - - if (ddsDumpArray == NULL) - ddsDumpArray = kjArray(NULL, "ddsDumpArray"); - - kjChildAdd(ddsDumpArray, dump); - } - } - } - - NgsildEntity ngsildEntity_; - std::atomic_int samples_; - } listener_; - - public: + DomainParticipant* participant_; + Subscriber* subscriber_; + DataReader* reader_; + Topic* topic_; + TypeSupport type_; + DdsNotificationReceiver listener_; + +public: explicit NgsildSubscriber(const char* topicType) : participant_(nullptr) , subscriber_(nullptr) @@ -193,93 +96,9 @@ class NgsildSubscriber { } - virtual ~NgsildSubscriber() - { - if (reader_ != nullptr) - { - subscriber_->delete_datareader(reader_); - } - if (topic_ != nullptr) - { - participant_->delete_topic(topic_); - } - if (subscriber_ != nullptr) - { - participant_->delete_subscriber(subscriber_); - } - DomainParticipantFactory::get_instance()->delete_participant(participant_); - } - - bool init(const char* topicName) - { - DomainParticipantQos participantQos; - participantQos.name("Participant_subscriber"); - participant_ = DomainParticipantFactory::get_instance()->create_participant(0, participantQos); - - if (participant_ == nullptr) - return false; - - // Register the Type - type_.register_type(participant_); - - // Create the subscriptions Topic - const char* topicType = type_->getName(); - topic_ = participant_->create_topic(topicName, topicType, TOPIC_QOS_DEFAULT); - - if (topic_ == nullptr) - { - KT_V("Error creating topic (type: '%s') '%s'", topicType, topicName); - return false; - } - - // Create the Subscriber - subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); - if (subscriber_ == nullptr) - return false; - - // Create the DataReader - KT_V("Creating reader"); -#ifdef DDS_RELIABLE - DataReaderQos rqos = DATAREADER_QOS_DEFAULT; - - rqos.reliability().kind = eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS; - rqos.durability().kind = eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS; -// rqos.history().kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; -// rqos.history().depth = 5; - reader_ = subscriber_->create_datareader(topic_, rqos, &listener_); -#else - reader_ = subscriber_->create_datareader(topic_, DATAREADER_QOS_DEFAULT, &listener_); -#endif - - if (reader_ == nullptr) - { - KT_E("Error creating DataReader"); - return false; - } - - KT_V("Created reader"); - KT_V("Init done"); - - return true; - } - - void run(uint32_t samples) - { - KT_V("Awaiting notifications"); - while ((uint32_t) listener_.samples_ < samples) - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } - - void run(void) - { - KT_V("Awaiting notifications"); - while (1) - { - std::this_thread::sleep_for(std::chrono::milliseconds(100000)); - } - } + virtual ~NgsildSubscriber(); + bool init(const char* topicName); + void run(void); }; #endif // SRC_LIB_ORIONLD_DDS_NGSILDSUBSCRIBER_H_ diff --git a/src/lib/orionld/dds/ddsSubscribe.cpp b/src/lib/orionld/dds/ddsSubscribe.cpp index 084d1cc53d..6fee1c8a7d 100644 --- a/src/lib/orionld/dds/ddsSubscribe.cpp +++ b/src/lib/orionld/dds/ddsSubscribe.cpp @@ -51,7 +51,7 @@ typedef struct SubscriberParams // ddsSubscribe - // // EPROS: We would like to have one single subscriber, that subscribes to all DDS notifications -// Obviously, we'd need a way to add topic to that subscriber "on the fly" +// Obviously, we'd need a way to add topics to that subscriber "on the fly" // static void* ddsSubscribe2(void* vP) {