From f2e5ceae8fbea0a6c9445a366faaca0b98a8ef86 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Mon, 18 Dec 2023 07:29:30 +0100 Subject: [PATCH] Fix CVE-2023-50257 Signed-off-by: Miguel Company Co-authored-by: Mario Dominguez commit f2bcd264798ebd0d45241e57f7d522c68920b26f Author: Miguel Company Date: Thu Dec 14 10:52:32 2023 +0100 Refs #19770. Improve BuiltinAuthenticationPlugin_second_participant_creation_loop. Signed-off-by: Miguel Company commit 07380f840422f8ea2a6ae62ec901fe94687f3b3c Author: Miguel Company Date: Wed Dec 13 08:21:34 2023 +0100 Refs #19770. Fix PDPTests. Signed-off-by: Miguel Company commit 4c2c21e5f2be63c1f4e0766cc8bc5a59a857b106 Author: Miguel Company Date: Wed Dec 13 07:26:51 2023 +0100 Refs #19770. Adding override keyword. Signed-off-by: Miguel Company commit d185a85b937a1fd64f86a40b9db2d0b994e39a7f Author: Miguel Company Date: Tue Dec 12 16:05:00 2023 +0100 Refs #19770. Uncrustify. Signed-off-by: Miguel Company commit 54a01ab0eff0e326c868d1a32bd44203bf184b4d Author: Miguel Company Date: Tue Dec 12 10:29:42 2023 +0100 Refs #19770. PDPSecurityInitiatorListener relies on PDPListener::onNewCacheChangeAdded. Signed-off-by: Miguel Company commit 202bc52a4584025c897e06a40f50d5d7d6be8ce6 Author: Miguel Company Date: Tue Dec 12 09:49:20 2023 +0100 Refs #19770. Only process removals when reader is matched. Signed-off-by: Miguel Company commit a4a30779c0ae3b410b8cce4aa56f47490995d20d Author: Miguel Company Date: Tue Dec 12 09:46:03 2023 +0100 Refs #19770. Added PDPSecurityInitiatorListener::process_alive_data. Signed-off-by: Miguel Company commit 3c7547e0b125ab7156aaf3f72b1a431a9d77672f Author: Miguel Company Date: Tue Dec 12 09:32:55 2023 +0100 Refs #19770. Refactor with PDPListener::process_alive_data. Signed-off-by: Miguel Company commit 4571178679af32902a5ba6719807b75eabafc296 Author: Miguel Company Date: Wed Nov 15 12:28:14 2023 +0100 Refs #19770. StatelessReader does not call update_last_notified for not matched writers. Signed-off-by: Miguel Company commit 8294ca3b0bae320f066779dfa1d9a8690b16ebfc Author: Miguel Company Date: Wed Nov 29 13:30:14 2023 +0100 Refs #19770. Avoid dynamic_cast when participant is not secure. Signed-off-by: Miguel Company commit 83820e84c6cc3dcaf53e6ffe784ed03b2fac9bfa Author: Miguel Company Date: Wed Nov 29 13:24:59 2023 +0100 Refs #19770. Avoid warning when built without security. Signed-off-by: Miguel Company commit 8ce1216f6af4758f4cb8dc1ea63c770a5ff86e1a Author: Miguel Company Date: Wed Nov 29 12:55:28 2023 +0100 Refs #19770. Fixed typo in comments. Signed-off-by: Miguel Company commit 7bb64eb4a0b899b8055beb8a8b0b279db80e20d6 Author: Miguel Company Date: Wed Nov 29 12:52:07 2023 +0100 Refs #19770. Use constexpr for topic names. Signed-off-by: Miguel Company commit eef95273d5464210c50a01cb22cc4a3989233f2f Author: Miguel Company Date: Wed Nov 29 12:50:56 2023 +0100 Refs #19770. Send participant dispose messages through both writers. Signed-off-by: Miguel Company commit c3c831f5f047c4a02a3ddd9944ce3015644f3e27 Author: Miguel Company Date: Tue Nov 28 10:07:17 2023 +0100 Refs #19770. Move to `AUTHENTICATION_FAILED` when authentication fails. Signed-off-by: Miguel Company commit 11f98a4d0132d630b76c68ed4157a8624dcffaa5 Author: Miguel Company Date: Tue Nov 28 10:05:47 2023 +0100 Refs #19770. Simulate DATA(p) reception on non secure `notifyAboveRemoteEndpoints`. Signed-off-by: Miguel Company commit 5097288e5f4816e653a7c5a4c4839d2a077f3d19 Author: Miguel Company Date: Wed Nov 22 09:36:14 2023 +0100 Refs #19770. Common factor method `notify_and_maybe_ignore_new_participant`. Signed-off-by: Miguel Company commit 9364310c5c3cf47b586470b90f9511b0be11fbc4 Author: Miguel Company Date: Tue Nov 21 07:56:17 2023 +0100 Refs #19770. Improve AllowUnauthenticatedParticipants blackbox tests. Signed-off-by: Miguel Company commit bb8af40be3c8c138e48acfced5835f2066d864dd Author: Miguel Company Date: Mon Nov 20 09:53:10 2023 +0100 Refs #19770. Change builtin endpoints matching workflow. Signed-off-by: Miguel Company commit 4fc685f75949637ee07ede7a72dd43ed4ab351f3 Author: Miguel Company Date: Fri Nov 17 12:34:40 2023 +0100 Refs #19770. Chages in announceParticipantState. Signed-off-by: Miguel Company commit fb26bbccf6400123d2cb0d89d34f8c907e301d1a Author: Miguel Company Date: Fri Nov 17 12:23:48 2023 +0100 Refs #19770. Chages in removeRemoteEndpoints. Signed-off-by: Miguel Company commit 96442d58fa0b16984289bbe16a132769e9cbcd81 Author: Miguel Company Date: Fri Nov 17 12:15:35 2023 +0100 Refs #19770. Chages in notifyAboveRemoteEndpoints. Signed-off-by: Miguel Company commit cea35be6c84e633a500bd4565119d1a7ec6066da Author: Miguel Company Date: Fri Nov 17 11:52:15 2023 +0100 Refs #19770. Additional work on endpoints creation. Signed-off-by: Miguel Company commit 02f95cf90b8acdf84c0c6dd357b8a7a26816db93 Author: Miguel Company Date: Fri Nov 17 10:46:48 2023 +0100 Refs #19770. Methods in PDP to setup builtin endpoints security attributes. Signed-off-by: Miguel Company commit 1f5c43734dfc6d420103e16132034e7e85ba0c3a Author: Miguel Company Date: Fri Nov 17 10:38:57 2023 +0100 Refs #19770. Methods in PDP to create builtin endpoints attributes. Signed-off-by: Miguel Company commit 49624e88746389b44384f007dc17610ba929731a Author: Miguel Company Date: Thu Nov 16 16:56:25 2023 +0100 Refs #19770. Endpoints creation refactor. Signed-off-by: Miguel Company commit 4ab8d1e1e24819da5d27009ec3e80051329dfbb0 Author: Miguel Company Date: Thu Nov 16 16:28:15 2023 +0100 Refs #19770. Fix unprotected access to local participant data. Signed-off-by: Miguel Company commit 57b99075eae290412b4d08798e1df27fc9972e1c Author: Miguel Company Date: Thu Nov 16 15:38:24 2023 +0100 Refs #19770. Added SimplePDPEndpointsSecure. Signed-off-by: Miguel Company commit e67c703db6956e15e187868f90f1f99b25e37627 Author: Miguel Company Date: Thu Nov 16 15:28:38 2023 +0100 Refs #19770. Listeners moved into BuiltinReader. Signed-off-by: Miguel Company commit b2fe97e1e0001b24b18d4b145143139a084dc37d Author: Mario Dominguez Date: Wed Nov 8 15:42:34 2023 +0100 Refs #19770. PDP listener moved into PDPEndpoints. Signed-off-by: Miguel Company commit 1af9da00af27591be14a1abe1d9928ccff83048d Author: Miguel Company Date: Thu Nov 16 13:33:37 2023 +0100 Refs #19770. Test improvements. Signed-off-by: Miguel Company commit 03eeda5f6b305768cc876056ff6584c153c947d1 Author: Miguel Company Date: Thu Nov 16 13:21:45 2023 +0100 Refs #19770. Regression test for vulnerability. Signed-off-by: Miguel Company commit 6cd538753fb8be550f9b3a58c508021d2bdf6ccd Author: Miguel Company Date: Thu Nov 16 12:30:36 2023 +0100 Refs #19770. Refactor on BlackboxTestsSecurity. Signed-off-by: Miguel Company Signed-off-by: Miguel Company --- .../rtps/builtin/discovery/participant/PDP.h | 18 +- .../discovery/participant/PDPListener.h | 38 +- .../builtin/discovery/participant/PDPSimple.h | 22 + src/cpp/rtps/builtin/BuiltinReader.hpp | 5 + .../builtin/discovery/endpoint/EDPSimple.cpp | 102 +-- .../DS/DiscoveryServerPDPEndpoints.hpp | 5 + .../DS/DiscoveryServerPDPEndpointsSecure.hpp | 2 - .../DS/PDPSecurityInitiatorListener.cpp | 122 +--- .../DS/PDPSecurityInitiatorListener.hpp | 36 +- .../builtin/discovery/participant/PDP.cpp | 170 ++++- .../discovery/participant/PDPClient.cpp | 44 +- .../discovery/participant/PDPEndpoints.hpp | 2 + .../discovery/participant/PDPListener.cpp | 196 +++--- .../discovery/participant/PDPServer.cpp | 20 +- .../discovery/participant/PDPSimple.cpp | 603 +++++++++++++----- .../participant/simple/SimplePDPEndpoints.hpp | 5 + .../simple/SimplePDPEndpointsSecure.hpp | 109 ++++ src/cpp/rtps/reader/StatelessReader.cpp | 22 +- src/cpp/rtps/security/SecurityManager.cpp | 16 +- .../blackbox/common/BlackboxTestsSecurity.cpp | 275 ++++++-- .../rtps/participant/RTPSParticipantImpl.h | 2 + test/unittest/rtps/discovery/PDPTests.cpp | 9 + 22 files changed, 1206 insertions(+), 617 deletions(-) create mode 100644 src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpointsSecure.hpp diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDP.h b/include/fastdds/rtps/builtin/discovery/participant/PDP.h index ec5eec62ada..d30d5cbc0d6 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDP.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDP.h @@ -330,9 +330,9 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable * Get a pointer to the local RTPSParticipant ParticipantProxyData object. * @return Pointer to the local RTPSParticipant ParticipantProxyData object. */ - ParticipantProxyData* getLocalParticipantProxyData() + ParticipantProxyData* getLocalParticipantProxyData() const { - return participant_proxies_.front(); + return participant_proxies_.empty() ? nullptr : participant_proxies_.front(); } /** @@ -431,7 +431,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable return temp_writer_proxies_; } + ReaderAttributes create_builtin_reader_attributes() const; + + WriterAttributes create_builtin_writer_attributes() const; + #if HAVE_SECURITY + void add_builtin_security_attributes( + ReaderAttributes& ratt, + WriterAttributes& watt) const; + virtual bool pairing_remote_writer_with_local_reader_after_security( const GUID_t& local_reader, const WriterProxyData& remote_writer_data); @@ -501,8 +509,6 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable ResourceLimitedVector writer_proxies_pool_; //!Variable to indicate if any parameter has changed. std::atomic_bool m_hasChangedLocalPDP; - //!Listener for the SPDP messages. - ReaderListener* mp_listener; //! ProxyPool for temporary reader proxies ProxyPool temp_reader_proxies_; //! ProxyPool for temporary writer proxies @@ -574,6 +580,10 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable */ virtual void update_builtin_locators() = 0; + void notify_and_maybe_ignore_new_participant( + ParticipantProxyData* pdata, + bool& should_be_ignored); + #ifdef FASTDDS_STATISTICS std::atomic proxy_observer_; diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDPListener.h b/include/fastdds/rtps/builtin/discovery/participant/PDPListener.h index 31b42c71856..6b4d6195f79 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDPListener.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDPListener.h @@ -37,34 +37,56 @@ class PDP; * This class is implemented in order to use the same structure than with any other RTPSReader. * @ingroup DISCOVERY_MODULE */ -class PDPListener: public ReaderListener +class PDPListener : public ReaderListener { public: + /** * @param parent Pointer to object creating this object */ - PDPListener(PDP* parent); + PDPListener( + PDP* parent); virtual ~PDPListener() override = default; /** - * New added cache - * @param reader - * @param change - */ + * New added cache + * @param reader + * @param change + */ void onNewCacheChangeAdded( RTPSReader* reader, const CacheChange_t* const change) override; protected: + /** + * Process an incoming DATA(p). + * This method is called from PDPListener::onNewCacheChangeAdded() when an alive DATA(p) is received. + * Both the reader lock and the PDP lock are held when this method is called. + * + * @param old_data Pointer to the ParticipantProxyData currently stored in the database. May be nullptr, for a + * recently discovered participant. + * @param new_data ParticipantProxyData from the DATA(p) message. + * @param writer_guid GUID of the writer that sent the DATA(p) message. + * @param reader RTPSReader that received the DATA(p) message. + * @param lock Lock on the PDP database. Passed so it can be released before invoking callbacks. + */ + virtual void process_alive_data( + ParticipantProxyData* old_data, + ParticipantProxyData& new_data, + GUID_t& writer_guid, + RTPSReader* reader, + std::unique_lock& lock); + /** * Get the key of a CacheChange_t * @param change Pointer to the CacheChange_t * @return True on success */ - bool get_key(CacheChange_t* change); + bool get_key( + CacheChange_t* change); //!Pointer to the associated mp_SPDP; PDP* parent_pdp_; @@ -82,5 +104,5 @@ class PDPListener: public ReaderListener } /* namespace fastrtps */ } /* namespace eprosima */ -#endif +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* _FASTDDS_RTPS_PDPLISTENER_H_ */ diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h b/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h index e576a3ba969..dadfcc8c7b0 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDPSimple.h @@ -139,6 +139,28 @@ class PDPSimple : public PDP */ bool createPDPEndpoints() override; + bool create_dcps_participant_endpoints(); + + void match_pdp_remote_endpoints( + const ParticipantProxyData& pdata, + bool notify_secure_endpoints); + + void assign_low_level_remote_endpoints( + const ParticipantProxyData& pdata, + bool notify_secure_endpoints); + +#if HAVE_SECURITY + bool create_dcps_participant_secure_endpoints(); + + bool pairing_remote_writer_with_local_reader_after_security( + const GUID_t& local_reader, + const WriterProxyData& remote_writer_data) override; + + bool pairing_remote_reader_with_local_writer_after_security( + const GUID_t& local_reader, + const ReaderProxyData& remote_reader_data) override; +#endif // HAVE_SECURITY + }; } /* namespace rtps */ diff --git a/src/cpp/rtps/builtin/BuiltinReader.hpp b/src/cpp/rtps/builtin/BuiltinReader.hpp index 38bf295946b..f93ececcf4a 100644 --- a/src/cpp/rtps/builtin/BuiltinReader.hpp +++ b/src/cpp/rtps/builtin/BuiltinReader.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -52,6 +53,8 @@ struct BuiltinReader payload_pool_->release_history(cfg, true); } } + + listener_.reset(); } void remove_from_history( @@ -75,6 +78,8 @@ struct BuiltinReader std::unique_ptr history_; //! Builtin RTPS reader TReader* reader_ = nullptr; + //! Listener for the builtin RTPS reader + std::unique_ptr listener_; }; } // namespace rtps diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp index 12ad97915cb..0466f149535 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp @@ -382,90 +382,21 @@ void EDPSimple::set_builtin_writer_history_attributes( void EDPSimple::set_builtin_reader_attributes( ReaderAttributes& attributes) { - const RTPSParticipantAttributes& pattr = mp_PDP->getRTPSParticipant()->getRTPSParticipantAttributes(); - - // Matched writers will depend on total number of participants - attributes.matched_writers_allocation = pattr.allocation.participants; - - // As participants allocation policy includes the local participant, one has to be substracted - if (attributes.matched_writers_allocation.initial > 1) - { - attributes.matched_writers_allocation.initial--; - } - if ((attributes.matched_writers_allocation.maximum > 1) && - (attributes.matched_writers_allocation.maximum < std::numeric_limits::max())) - { - attributes.matched_writers_allocation.maximum--; - } - - // Locators are copied from the local participant metatraffic locators - attributes.endpoint.unicastLocatorList.clear(); - for (const Locator_t& loc : mp_PDP->getLocalParticipantProxyData()->metatraffic_locators.unicast) - { - attributes.endpoint.unicastLocatorList.push_back(loc); - } - attributes.endpoint.multicastLocatorList.clear(); - for (const Locator_t& loc : mp_PDP->getLocalParticipantProxyData()->metatraffic_locators.multicast) - { - attributes.endpoint.multicastLocatorList.push_back(loc); - } - attributes.endpoint.external_unicast_locators = mp_PDP->builtin_attributes().metatraffic_external_unicast_locators; - attributes.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; + attributes = mp_PDP->create_builtin_reader_attributes(); // Timings are configured using EDP default values attributes.times.heartbeatResponseDelay = edp_heartbeat_response_delay; - - // EDP endpoints are always reliable, transsient local, keyed topics - attributes.endpoint.reliabilityKind = RELIABLE; - attributes.endpoint.durabilityKind = TRANSIENT_LOCAL; - attributes.endpoint.topicKind = WITH_KEY; - - // Built-in EDP readers never expect inline qos - attributes.expectsInlineQos = false; } void EDPSimple::set_builtin_writer_attributes( WriterAttributes& attributes) { - const RTPSParticipantAttributes& pattr = mp_PDP->getRTPSParticipant()->getRTPSParticipantAttributes(); - - // Matched readers will depend on total number of participants - attributes.matched_readers_allocation = pattr.allocation.participants; - - // As participants allocation policy includes the local participant, one has to be substracted - if (attributes.matched_readers_allocation.initial > 1) - { - attributes.matched_readers_allocation.initial--; - } - if ((attributes.matched_readers_allocation.maximum > 1) && - (attributes.matched_readers_allocation.maximum < std::numeric_limits::max())) - { - attributes.matched_readers_allocation.maximum--; - } - - // Locators are copied from the local participant metatraffic locators - attributes.endpoint.unicastLocatorList.clear(); - for (const Locator_t& loc : mp_PDP->getLocalParticipantProxyData()->metatraffic_locators.unicast) - { - attributes.endpoint.unicastLocatorList.push_back(loc); - } - attributes.endpoint.multicastLocatorList.clear(); - for (const Locator_t& loc : mp_PDP->getLocalParticipantProxyData()->metatraffic_locators.multicast) - { - attributes.endpoint.multicastLocatorList.push_back(loc); - } - attributes.endpoint.external_unicast_locators = mp_PDP->builtin_attributes().metatraffic_external_unicast_locators; - attributes.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; + attributes = mp_PDP->create_builtin_writer_attributes(); // Timings are configured using EDP default values attributes.times.heartbeatPeriod = edp_heartbeat_period; attributes.times.nackResponseDelay = edp_nack_response_delay; attributes.times.nackSupressionDuration = edp_nack_supression_duration; - - // EDP endpoints are always reliable, transsient local, keyed topics - attributes.endpoint.reliabilityKind = RELIABLE; - attributes.endpoint.durabilityKind = TRANSIENT_LOCAL; - attributes.endpoint.topicKind = WITH_KEY; } bool EDPSimple::createSEDPEndpoints() @@ -537,34 +468,7 @@ bool EDPSimple::create_sedp_secure_endpoints() set_builtin_writer_history_attributes(writer_history_att); set_builtin_reader_attributes(ratt); set_builtin_writer_attributes(watt); - - const security::ParticipantSecurityAttributes& part_attr = mp_RTPSParticipant->security_attributes(); - - ratt.endpoint.security_attributes().is_submessage_protected = part_attr.is_discovery_protected; - ratt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; - - watt.endpoint.security_attributes().is_submessage_protected = part_attr.is_discovery_protected; - watt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; - - if (part_attr.is_discovery_protected) - { - security::PluginParticipantSecurityAttributes plugin_part_attr(part_attr.plugin_participant_attributes); - - if (plugin_part_attr.is_discovery_encrypted) - { - ratt.endpoint.security_attributes().plugin_endpoint_attributes |= - PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; - watt.endpoint.security_attributes().plugin_endpoint_attributes |= - PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; - } - if (plugin_part_attr.is_discovery_origin_authenticated) - { - ratt.endpoint.security_attributes().plugin_endpoint_attributes |= - PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; - watt.endpoint.security_attributes().plugin_endpoint_attributes |= - PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; - } - } + mp_PDP->add_builtin_security_attributes(ratt, watt); if (m_discovery.discovery_config.m_simpleEDP.enable_builtin_secure_publications_writer_and_subscriptions_reader) { diff --git a/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp index 2cf2e5dd63a..84c9bf7c9ac 100644 --- a/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpoints.hpp @@ -46,6 +46,11 @@ struct DiscoveryServerPDPEndpoints : public PDPEndpoints return DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; } + const std::unique_ptr& main_listener() const override + { + return reader.listener_; + } + bool enable_pdp_readers( fastrtps::rtps::RTPSParticipantImpl* participant) override { diff --git a/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpointsSecure.hpp b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpointsSecure.hpp index e3bd5fc7744..150167ac954 100644 --- a/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpointsSecure.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/DS/DiscoveryServerPDPEndpointsSecure.hpp @@ -89,8 +89,6 @@ struct DiscoveryServerPDPEndpointsSecure : public DiscoveryServerPDPEndpoints //! Builtin Simple PDP reader BuiltinReader stateless_reader; - //! Listener for stateless_reader - std::unique_ptr stateless_listener; }; } // namespace rtps diff --git a/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.cpp b/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.cpp index 657431b76df..37120f93451 100644 --- a/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -47,121 +48,44 @@ namespace rtps { PDPSecurityInitiatorListener::PDPSecurityInitiatorListener( PDP* parent, SecurityInitiatedCallback response_cb) - : parent_pdp_(parent) - , temp_participant_data_(parent->getRTPSParticipant()->getRTPSParticipantAttributes().allocation) + : PDPListener(parent) , response_cb_(response_cb) { } -void PDPSecurityInitiatorListener::onNewCacheChangeAdded( +void PDPSecurityInitiatorListener::process_alive_data( + ParticipantProxyData* old_data, + ParticipantProxyData& new_data, + GUID_t& writer_guid, RTPSReader* reader, - const CacheChange_t* const change_in) + std::unique_lock& lock) { - CacheChange_t* change = const_cast(change_in); - GUID_t writer_guid = change->writerGUID; - EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Message received from: " << change_in->writerGUID); - - // Make sure we have an instance handle (i.e GUID) - if (change->instanceHandle == c_InstanceHandle_Unknown) + if (reader->matched_writer_is_matched(writer_guid)) { - if (!this->get_key(change)) - { - EPROSIMA_LOG_WARNING(RTPS_PDP, "Problem getting the key of the change, removing"); - parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); - return; - } + // Act as the standard PDPListener when the writer is matched. + // This will be the case for unauthenticated participants when + // allowed_unathenticated_participants is true + PDPListener::process_alive_data(old_data, new_data, writer_guid, reader, lock); + return; } - // Take GUID from instance handle - GUID_t guid; - iHandle2GUID(guid, change->instanceHandle); - - if (change->kind == ALIVE) + if (old_data == nullptr) { - // Ignore announcement from own RTPSParticipant - if (guid == parent_pdp_->getRTPSParticipant()->getGuid()) - { - EPROSIMA_LOG_INFO(RTPS_PDP, "Message from own RTPSParticipant, removing"); - parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); - return; - } - - // Release reader lock to avoid ABBA lock. PDP mutex should always be first. - // Keep change information on local variables to check consistency later - SequenceNumber_t seq_num = change->sequenceNumber; reader->getMutex().unlock(); - std::unique_lock lock(*parent_pdp_->getMutex()); - reader->getMutex().lock(); - - // If change is not consistent, it will be processed on the thread that has overriten it - if ((ALIVE != change->kind) || (seq_num != change->sequenceNumber) || (writer_guid != change->writerGUID)) - { - return; - } - - // Access to temp_participant_data_ is protected by reader lock + lock.unlock(); - // Load information on temp_participant_data_ - CDRMessage_t msg(change->serializedPayload); - temp_participant_data_.clear(); - if (temp_participant_data_.readFromCDRMessage(&msg, true, parent_pdp_->getRTPSParticipant()->network_factory(), - parent_pdp_->getRTPSParticipant()->has_shm_transport(), true)) + //! notify security manager in order to start handshake + bool ret = parent_pdp_->getRTPSParticipant()->security_manager().discovered_participant(new_data); + //! Reply to the remote participant + if (ret) { - // After correctly reading it - change->instanceHandle = temp_participant_data_.m_key; - guid = temp_participant_data_.m_guid; - - // Filter locators - const auto& pattr = parent_pdp_->getRTPSParticipant()->getAttributes(); - fastdds::rtps::ExternalLocatorsProcessor::filter_remote_locators(temp_participant_data_, - pattr.builtin.metatraffic_external_unicast_locators, pattr.default_external_unicast_locators, - pattr.ignore_non_matching_locators); - - // Check if participant already exists (updated info) - ParticipantProxyData* pdata = nullptr; - for (ParticipantProxyData* it : parent_pdp_->participant_proxies_) - { - if (guid == it->m_guid) - { - pdata = it; - break; - } - } - - if (pdata == nullptr) - { - // Create a new one when not found - - reader->getMutex().unlock(); - lock.unlock(); - - //! notify security manager in order to start handshake - bool ret = parent_pdp_->getRTPSParticipant()->security_manager().discovered_participant( - temp_participant_data_); - - //! Reply to the remote participant - if (ret) - { - response_cb_(temp_participant_data_); - } - - // Take again the reader lock - reader->getMutex().lock(); - - } //! Do nothing if already discovered - + response_cb_(temp_participant_data_); } - } //! Do nothing on participant removal - - //Remove change form history. - parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); -} + // Take again the reader lock + reader->getMutex().lock(); + } -bool PDPSecurityInitiatorListener::get_key( - CacheChange_t* change) -{ - return ParameterList::readInstanceHandleFromCDRMsg(change, fastdds::dds::PID_PARTICIPANT_GUID); } } /* namespace rtps */ diff --git a/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp b/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp index 13d2d599308..c15a283e9b8 100644 --- a/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/DS/PDPSecurityInitiatorListener.hpp @@ -21,7 +21,7 @@ #define _DS_PDP_SECURITY_INITIATOR_LISTENER_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include +#include #include #include @@ -36,7 +36,7 @@ class PDP; * Class PDPSecurityInitiatorListener, implementation for the secure discovery server handshake initiator. * @ingroup DISCOVERY_MODULE */ -class PDPSecurityInitiatorListener : public ReaderListener +class PDPSecurityInitiatorListener : public PDPListener { using SecurityInitiatedCallback = std::function; @@ -52,34 +52,14 @@ class PDPSecurityInitiatorListener : public ReaderListener virtual ~PDPSecurityInitiatorListener() override = default; - /** - * New added cache - * @param reader - * @param change - */ - void onNewCacheChangeAdded( - RTPSReader* reader, - const CacheChange_t* const change) override; - protected: - /** - * Get the key of a CacheChange_t - * @param change Pointer to the CacheChange_t - * @return True on success - */ - bool get_key( - CacheChange_t* change); - - //!Pointer to the associated mp_SPDP; - PDP* parent_pdp_; - - /** - * @brief Temporary data to avoid reallocations. - * - * @remarks This should be always accessed with the pdp_reader lock taken - */ - ParticipantProxyData temp_participant_data_; + void process_alive_data( + ParticipantProxyData* old_data, + ParticipantProxyData& new_data, + GUID_t& writer_guid, + RTPSReader* reader, + std::unique_lock& lock) override; //! What action to perform upon participant discovery SecurityInitiatedCallback response_cb_; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp index 5e467ac075f..1c2259f8922 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDP.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDP.cpp @@ -68,9 +68,7 @@ namespace eprosima { namespace fastrtps { namespace rtps { - // Default configuration values for PDP reliable entities. - const Duration_t pdp_heartbeat_period{ 0, 350 * 1000000 }; // 350 milliseconds const Duration_t pdp_nack_response_delay{ 0, 100 * 1000000 }; // 100 milliseconds const Duration_t pdp_nack_supression_duration{ 0, 11 * 1000000 }; // 11 milliseconds @@ -93,7 +91,6 @@ PDP::PDP ( , writer_proxies_number_(allocation.total_writers().initial) , writer_proxies_pool_(allocation.total_writers()) , m_hasChangedLocalPDP(true) - , mp_listener(nullptr) , temp_reader_proxies_({ allocation.locators.max_unicast_locators, allocation.locators.max_multicast_locators, @@ -141,9 +138,6 @@ PDP::~PDP() builtin_endpoints_->delete_pdp_endpoints(mp_RTPSParticipant); builtin_endpoints_.reset(); - delete mp_listener; - mp_listener = nullptr; - for (ParticipantProxyData* it : participant_proxies_) { delete it; @@ -524,7 +518,9 @@ void PDP::announceParticipantState( else { this->mp_mutex->lock(); - ParticipantProxyData proxy_data_copy(*getLocalParticipantProxyData()); + ParticipantProxyData* local_participant_data = getLocalParticipantProxyData(); + InstanceHandle_t key = local_participant_data->m_key; + ParticipantProxyData proxy_data_copy(*local_participant_data); this->mp_mutex->unlock(); if (history.getHistorySize() > 0) @@ -536,7 +532,7 @@ void PDP::announceParticipantState( { return cdr_size; }, - NOT_ALIVE_DISPOSED_UNREGISTERED, getLocalParticipantProxyData()->m_key); + NOT_ALIVE_DISPOSED_UNREGISTERED, key); if (change != nullptr) { @@ -581,6 +577,39 @@ void PDP::resetParticipantAnnouncement() } } +void PDP::notify_and_maybe_ignore_new_participant( + ParticipantProxyData* pdata, + bool& should_be_ignored) +{ + should_be_ignored = false; + + EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "New participant " + << pdata->m_guid << " at " + << "MTTLoc: " << pdata->metatraffic_locators + << " DefLoc:" << pdata->default_locators); + + RTPSParticipantListener* listener = getRTPSParticipant()->getListener(); + if (listener != nullptr) + { + { + std::lock_guard cb_lock(callback_mtx_); + ParticipantDiscoveryInfo info(*pdata); + info.status = ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT; + + + listener->onParticipantDiscovery( + getRTPSParticipant()->getUserRTPSParticipant(), + std::move(info), + should_be_ignored); + } + + if (should_be_ignored) + { + getRTPSParticipant()->ignore_participant(pdata->m_guid.guidPrefix); + } + } +} + bool PDP::has_reader_proxy_data( const GUID_t& reader) { @@ -1461,6 +1490,131 @@ void PDP::set_external_participant_properties_( } } +static void set_builtin_matched_allocation( + ResourceLimitedContainerConfig& allocation, + const RTPSParticipantAttributes& pattr) +{ + // Matched endpoints will depend on total number of participants + allocation = pattr.allocation.participants; + + // As participants allocation policy includes the local participant, one has to be substracted + if (allocation.initial > 1) + { + allocation.initial--; + } + if ((allocation.maximum > 1) && + (allocation.maximum < std::numeric_limits::max())) + { + allocation.maximum--; + } +} + +static void set_builtin_endpoint_locators( + EndpointAttributes& endpoint, + const PDP* pdp, + const BuiltinProtocols* builtin) +{ + const RTPSParticipantAttributes& pattr = pdp->getRTPSParticipant()->getRTPSParticipantAttributes(); + + auto part_data = pdp->getLocalParticipantProxyData(); + if (nullptr == part_data) + { + // Local participant data has not yet been created. + // This means we are creating the PDP endpoints, so we copy the locators from mp_builtin + endpoint.multicastLocatorList = builtin->m_metatrafficMulticastLocatorList; + endpoint.unicastLocatorList = builtin->m_metatrafficUnicastLocatorList; + } + else + { + // Locators are copied from the local participant metatraffic locators + endpoint.unicastLocatorList.clear(); + for (const Locator_t& loc : part_data->metatraffic_locators.unicast) + { + endpoint.unicastLocatorList.push_back(loc); + } + endpoint.multicastLocatorList.clear(); + for (const Locator_t& loc : part_data->metatraffic_locators.multicast) + { + endpoint.multicastLocatorList.push_back(loc); + } + } + + // External locators are always taken from the same place + endpoint.external_unicast_locators = pdp->builtin_attributes().metatraffic_external_unicast_locators; + endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; +} + +ReaderAttributes PDP::create_builtin_reader_attributes() const +{ + ReaderAttributes attributes; + + const RTPSParticipantAttributes& pattr = getRTPSParticipant()->getRTPSParticipantAttributes(); + set_builtin_matched_allocation(attributes.matched_writers_allocation, pattr); + set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin); + + // Builtin endpoints are always reliable, transient local, keyed topics + attributes.endpoint.reliabilityKind = RELIABLE; + attributes.endpoint.durabilityKind = TRANSIENT_LOCAL; + attributes.endpoint.topicKind = WITH_KEY; + + // Built-in readers never expect inline qos + attributes.expectsInlineQos = false; + + return attributes; +} + +WriterAttributes PDP::create_builtin_writer_attributes() const +{ + WriterAttributes attributes; + + const RTPSParticipantAttributes& pattr = getRTPSParticipant()->getRTPSParticipantAttributes(); + set_builtin_matched_allocation(attributes.matched_readers_allocation, pattr); + set_builtin_endpoint_locators(attributes.endpoint, this, mp_builtin); + + // Builtin endpoints are always reliable, transient local, keyed topics + attributes.endpoint.reliabilityKind = RELIABLE; + attributes.endpoint.durabilityKind = TRANSIENT_LOCAL; + attributes.endpoint.topicKind = WITH_KEY; + + return attributes; +} + +#if HAVE_SECURITY +void PDP::add_builtin_security_attributes( + ReaderAttributes& ratt, + WriterAttributes& watt) const +{ + const security::ParticipantSecurityAttributes& part_attr = mp_RTPSParticipant->security_attributes(); + + ratt.endpoint.security_attributes().is_submessage_protected = part_attr.is_discovery_protected; + ratt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + + watt.endpoint.security_attributes().is_submessage_protected = part_attr.is_discovery_protected; + watt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + + if (part_attr.is_discovery_protected) + { + security::PluginParticipantSecurityAttributes plugin_part_attr(part_attr.plugin_participant_attributes); + + if (plugin_part_attr.is_discovery_encrypted) + { + ratt.endpoint.security_attributes().plugin_endpoint_attributes |= + PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + watt.endpoint.security_attributes().plugin_endpoint_attributes |= + PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + } + if (plugin_part_attr.is_discovery_origin_authenticated) + { + ratt.endpoint.security_attributes().plugin_endpoint_attributes |= + PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; + watt.endpoint.security_attributes().plugin_endpoint_attributes |= + PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; + } + } +} + +#endif // HAVE_SECURITY + } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index f337372cbf3..82e758ec431 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -278,26 +278,20 @@ bool PDPClient::create_ds_pdp_best_effort_reader( ratt.endpoint.durabilityKind = VOLATILE; ratt.endpoint.reliabilityKind = BEST_EFFORT; - endpoints.stateless_listener.reset(new PDPSecurityInitiatorListener(this)); + endpoints.stateless_reader.listener_.reset(new PDPSecurityInitiatorListener(this)); // Create PDP Reader RTPSReader* reader = nullptr; if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.stateless_reader.history_.get(), - endpoints.stateless_listener.get(), c_EntityId_SPDPReader, true, false)) + endpoints.stateless_reader.listener_.get(), c_EntityId_SPDPReader, true, false)) { endpoints.stateless_reader.reader_ = dynamic_cast(reader); - - // Enable unknown clients to reach this reader - reader->enableMessagesFromUnkownWriters(true); - mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); } // Could not create PDP Reader, so return false else { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "PDPServer security initiation Reader creation failed"); - - endpoints.stateless_listener.reset(); endpoints.stateless_reader.release(); return false; } @@ -360,7 +354,7 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( } #endif // HAVE_SECURITY - mp_listener = new PDPListener(this); + endpoints.reader.listener_.reset(new PDPListener(this)); RTPSReader* reader = nullptr; #if HAVE_SECURITY @@ -369,7 +363,8 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( #else EntityId_t reader_entity = c_EntityId_SPDPReader; #endif // if HAVE_SECURITY - if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.reader.history_.get(), mp_listener, + if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.reader.history_.get(), + endpoints.reader.listener_.get(), reader_entity, true, false)) { endpoints.reader.reader_ = dynamic_cast(reader); @@ -381,8 +376,6 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( else { EPROSIMA_LOG_ERROR(RTPS_PDP, "PDPClient Reader creation failed"); - delete mp_listener; - mp_listener = nullptr; endpoints.reader.release(); return false; } @@ -483,25 +476,30 @@ bool PDPClient::create_ds_pdp_reliable_endpoints( void PDPClient::assignRemoteEndpoints( ParticipantProxyData* pdata) { + bool ignored = false; + notify_and_maybe_ignore_new_participant(pdata, ignored); + if (!ignored) { - eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); - - // Verify if this participant is a server - for (auto& svr : mp_builtin->m_DiscoveryServers) { - if (data_matches_with_prefix(svr.guidPrefix, *pdata)) + eprosima::shared_lock disc_lock(mp_builtin->getDiscoveryMutex()); + + // Verify if this participant is a server + for (auto& svr : mp_builtin->m_DiscoveryServers) { - std::unique_lock lock(*getMutex()); - svr.proxy = pdata; + if (data_matches_with_prefix(svr.guidPrefix, *pdata)) + { + std::unique_lock lock(*getMutex()); + svr.proxy = pdata; + } } } - } #if HAVE_SECURITY - if (mp_RTPSParticipant->security_manager().discovered_participant(*pdata)) + if (mp_RTPSParticipant->security_manager().discovered_participant(*pdata)) #endif // HAVE_SECURITY - { - perform_builtin_endpoints_matching(*pdata); + { + perform_builtin_endpoints_matching(*pdata); + } } } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp index a50d2bad6d1..ad775f1e720 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPEndpoints.hpp @@ -45,6 +45,8 @@ class PDPEndpoints */ virtual fastrtps::rtps::BuiltinEndpointSet_t builtin_endpoints() const = 0; + virtual const std::unique_ptr& main_listener() const = 0; + virtual bool enable_pdp_readers( fastrtps::rtps::RTPSParticipantImpl* participant) = 0; virtual void disable_pdp_readers( diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 580754090dd..3575837f6fe 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -61,7 +61,7 @@ void PDPListener::onNewCacheChangeAdded( { CacheChange_t* change = const_cast(change_in); GUID_t writer_guid = change->writerGUID; - EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Message received from: " << change_in->writerGUID); + EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Message received from: " << writer_guid); // Make sure we have an instance handle (i.e GUID) if (change->instanceHandle == c_InstanceHandle_Unknown) @@ -135,114 +135,10 @@ void PDPListener::onNewCacheChangeAdded( } } - auto status = (pdata == nullptr) ? ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT : - ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT; - - if (pdata == nullptr) - { - // Create a new one when not found - pdata = parent_pdp_->createParticipantProxyData(temp_participant_data_, writer_guid); - - reader->getMutex().unlock(); - lock.unlock(); - - if (pdata != nullptr) - { - EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "New participant " - << pdata->m_guid << " at " - << "MTTLoc: " << pdata->metatraffic_locators - << " DefLoc:" << pdata->default_locators); - - RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener(); - if (listener != nullptr) - { - bool should_be_ignored = false; - { - std::lock_guard cb_lock(parent_pdp_->callback_mtx_); - ParticipantDiscoveryInfo info(*pdata); - info.status = status; - - - listener->onParticipantDiscovery( - parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(), - std::move(info), - should_be_ignored); - } - if (should_be_ignored) - { - parent_pdp_->getRTPSParticipant()->ignore_participant(guid.guidPrefix); - } - - } - - // Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since - // StatelessWriter::matched_reader_add marks the entire history as unsent if the added reader's - // durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT), - // which is the case of ENTITYID_BUILTIN_SDP_PARTICIPANT_READER (TRANSIENT_LOCAL). If a remote - // participant is discovered before creating the first DATA(p) change (which happens at the end of - // BuiltinProtocols::initBuiltinProtocols), then StatelessWriter::matched_reader_add ends up marking - // no changes as unsent (since the history is empty), which is OK because this can only happen if a - // participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will - // create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched - // readers anyways. - parent_pdp_->assignRemoteEndpoints(pdata); - } - } - else - { - pdata->updateData(temp_participant_data_); - pdata->isAlive = true; - reader->getMutex().unlock(); - - EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant " - << pdata->m_guid << " at " - << "MTTLoc: " << pdata->metatraffic_locators - << " DefLoc:" << pdata->default_locators); - - if (parent_pdp_->updateInfoMatchesEDP()) - { - parent_pdp_->mp_EDP->assignRemoteEndpoints(*pdata, true); - } - - lock.unlock(); - - RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener(); - if (listener != nullptr) - { - bool should_be_ignored = false; - - { - std::lock_guard cb_lock(parent_pdp_->callback_mtx_); - ParticipantDiscoveryInfo info(*pdata); - info.status = status; - - listener->onParticipantDiscovery( - parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(), - std::move(info), - should_be_ignored); - } - if (should_be_ignored) - { - parent_pdp_->getRTPSParticipant()->ignore_participant(temp_participant_data_.m_guid.guidPrefix); - } - } - } - -#ifdef FASTDDS_STATISTICS - //! Addition or update of a participant proxy should trigger - //! a connections update on the local participant connection list - if (nullptr != parent_pdp_->getRTPSParticipant()->get_connections_observer()) - { - parent_pdp_->getRTPSParticipant()->get_connections_observer()->on_local_entity_connections_change( - parent_pdp_->getRTPSParticipant()->getGuid()); - } -#endif //FASTDDS_STATISTICS - - // Take again the reader lock - reader->getMutex().lock(); + process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock); } } - else + else if (reader->matched_writer_is_matched(writer_guid)) { reader->getMutex().unlock(); if (parent_pdp_->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)) @@ -267,6 +163,92 @@ void PDPListener::onNewCacheChangeAdded( parent_pdp_->builtin_endpoints_->remove_from_pdp_reader_history(change); } +void PDPListener::process_alive_data( + ParticipantProxyData* old_data, + ParticipantProxyData& new_data, + GUID_t& writer_guid, + RTPSReader* reader, + std::unique_lock& lock) +{ + GUID_t participant_guid = new_data.m_guid; + + if (old_data == nullptr) + { + // Create a new one when not found + old_data = parent_pdp_->createParticipantProxyData(new_data, writer_guid); + + reader->getMutex().unlock(); + lock.unlock(); + + if (old_data != nullptr) + { + // Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since + // StatelessWriter::matched_reader_add marks the entire history as unsent if the added reader's + // durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT), + // which is the case of ENTITYID_BUILTIN_SDP_PARTICIPANT_READER (TRANSIENT_LOCAL). If a remote + // participant is discovered before creating the first DATA(p) change (which happens at the end of + // BuiltinProtocols::initBuiltinProtocols), then StatelessWriter::matched_reader_add ends up marking + // no changes as unsent (since the history is empty), which is OK because this can only happen if a + // participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will + // create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched + // readers anyways. + parent_pdp_->assignRemoteEndpoints(old_data); + } + } + else + { + old_data->updateData(new_data); + old_data->isAlive = true; + reader->getMutex().unlock(); + + EPROSIMA_LOG_INFO(RTPS_PDP_DISCOVERY, "Update participant " + << old_data->m_guid << " at " + << "MTTLoc: " << old_data->metatraffic_locators + << " DefLoc:" << old_data->default_locators); + + if (parent_pdp_->updateInfoMatchesEDP()) + { + parent_pdp_->mp_EDP->assignRemoteEndpoints(*old_data, true); + } + + lock.unlock(); + + RTPSParticipantListener* listener = parent_pdp_->getRTPSParticipant()->getListener(); + if (listener != nullptr) + { + bool should_be_ignored = false; + + { + std::lock_guard cb_lock(parent_pdp_->callback_mtx_); + ParticipantDiscoveryInfo info(*old_data); + info.status = ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT; + + listener->onParticipantDiscovery( + parent_pdp_->getRTPSParticipant()->getUserRTPSParticipant(), + std::move(info), + should_be_ignored); + } + if (should_be_ignored) + { + parent_pdp_->getRTPSParticipant()->ignore_participant(participant_guid.guidPrefix); + } + } + } + +#ifdef FASTDDS_STATISTICS + //! Addition or update of a participant proxy should trigger + //! a connections update on the local participant connection list + if (nullptr != parent_pdp_->getRTPSParticipant()->get_connections_observer()) + { + parent_pdp_->getRTPSParticipant()->get_connections_observer()->on_local_entity_connections_change( + parent_pdp_->getRTPSParticipant()->getGuid()); + } +#endif //FASTDDS_STATISTICS + + // Take again the reader lock + reader->getMutex().lock(); +} + bool PDPListener::get_key( CacheChange_t* change) { diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp index 6f04b540b44..aa23b230f99 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServer.cpp @@ -293,7 +293,7 @@ bool PDPServer::create_ds_pdp_best_effort_reader( ratt.endpoint.durabilityKind = VOLATILE; ratt.endpoint.reliabilityKind = BEST_EFFORT; - endpoints.stateless_listener.reset(new PDPSecurityInitiatorListener(this, + endpoints.stateless_reader.listener_.reset(new PDPSecurityInitiatorListener(this, [this](const ParticipantProxyData& participant_data) { auto endpoints = static_cast(builtin_endpoints_.get()); @@ -320,21 +320,15 @@ bool PDPServer::create_ds_pdp_best_effort_reader( // Create PDP Reader RTPSReader* reader = nullptr; if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.stateless_reader.history_.get(), - endpoints.stateless_listener.get(), c_EntityId_SPDPReader, true, false)) + endpoints.stateless_reader.listener_.get(), c_EntityId_SPDPReader, true, false)) { endpoints.stateless_reader.reader_ = dynamic_cast(reader); - - // Enable unknown clients to reach this reader - reader->enableMessagesFromUnkownWriters(true); - mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); } // Could not create PDP Reader, so return false else { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "PDPServer security initiation Reader creation failed"); - - endpoints.stateless_listener.reset(); endpoints.stateless_reader.release(); return false; } @@ -403,7 +397,7 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( #endif // HAVE_SQLITE3 // PDP Listener - mp_listener = new PDPServerListener(this); + endpoints.reader.listener_.reset(new PDPServerListener(this)); // Create PDP Reader RTPSReader* reader = nullptr; @@ -412,8 +406,8 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( #else EntityId_t reader_entity = c_EntityId_SPDPReader; #endif // if HAVE_SECURITY - if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.reader.history_.get(), mp_listener, - reader_entity, true, false)) + if (mp_RTPSParticipant->createReader(&reader, ratt, endpoints.reader.history_.get(), + endpoints.reader.listener_.get(), reader_entity, true, false)) { endpoints.reader.reader_ = dynamic_cast(reader); @@ -428,8 +422,6 @@ bool PDPServer::create_ds_pdp_reliable_endpoints( else { EPROSIMA_LOG_ERROR(RTPS_PDP_SERVER, "PDPServer Reader creation failed"); - delete mp_listener; - mp_listener = nullptr; endpoints.reader.release(); return false; } @@ -1781,7 +1773,7 @@ bool PDPServer::process_backup_discovery_database_restore( { change_aux->writerGUID = change_aux->write_params.sample_identity().writer_guid(); change_aux->sequenceNumber = change_aux->write_params.sample_identity().sequence_number(); - mp_listener->onNewCacheChangeAdded(endpoints->reader.reader_, change_aux); + builtin_endpoints_->main_listener()->onNewCacheChangeAdded(endpoints->reader.reader_, change_aux); } } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index d492bc48a6e..99cec6078ab 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -41,16 +41,47 @@ #include #include +#include #include +#include #include #include -using namespace eprosima::fastrtps; - namespace eprosima { namespace fastrtps { namespace rtps { +static HistoryAttributes pdp_reader_history_attributes( + const BuiltinAttributes& builtin_att, + const RTPSParticipantAllocationAttributes& allocation) +{ + HistoryAttributes hatt; + hatt.payloadMaxSize = builtin_att.readerPayloadSize; + hatt.memoryPolicy = builtin_att.readerHistoryMemoryPolicy; + hatt.initialReservedCaches = 25; + if (allocation.participants.initial > 0) + { + hatt.initialReservedCaches = (int32_t)allocation.participants.initial; + } + if (allocation.participants.maximum < std::numeric_limits::max()) + { + hatt.maximumReservedCaches = (int32_t)allocation.participants.maximum; + } + + return hatt; +} + +static HistoryAttributes pdp_writer_history_attributes( + const BuiltinAttributes& builtin_att) +{ + HistoryAttributes hatt; + hatt.payloadMaxSize = builtin_att.writerPayloadSize; + hatt.memoryPolicy = builtin_att.writerHistoryMemoryPolicy; + hatt.initialReservedCaches = 1; + hatt.maximumReservedCaches = 1; + + return hatt; +} PDPSimple::PDPSimple ( BuiltinProtocols* built, @@ -233,15 +264,28 @@ void PDPSimple::announceParticipantState( { if (enabled_) { - auto endpoints = static_cast(builtin_endpoints_.get()); - StatelessWriter& writer = *(endpoints->writer.writer_); - WriterHistory& history = *(endpoints->writer.history_); + new_change |= m_hasChangedLocalPDP.exchange(false); +#if HAVE_SECURITY + if (mp_RTPSParticipant->is_secure()) + { + auto secure = dynamic_cast(builtin_endpoints_.get()); + assert(nullptr != secure); + + RTPSWriter& writer = *(secure->secure_writer.writer_); + WriterHistory& history = *(secure->secure_writer.history_); + PDP::announceParticipantState(writer, history, new_change, dispose, wp); + } +#endif // HAVE_SECURITY + + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + RTPSWriter& writer = *(endpoints->writer.writer_); + WriterHistory& history = *(endpoints->writer.history_); PDP::announceParticipantState(writer, history, new_change, dispose, wp); if (!(dispose || new_change)) { - writer.unsent_changes_reset(); + endpoints->writer.writer_->unsent_changes_reset(); } } } @@ -250,218 +294,261 @@ bool PDPSimple::createPDPEndpoints() { EPROSIMA_LOG_INFO(RTPS_PDP, "Beginning"); + fastdds::rtps::SimplePDPEndpoints* endpoints = nullptr; +#if HAVE_SECURITY + fastdds::rtps::SimplePDPEndpointsSecure* secure_endpoints = nullptr; + bool is_secure = mp_RTPSParticipant->is_secure(); + if (is_secure) + { + secure_endpoints = new fastdds::rtps::SimplePDPEndpointsSecure(); + secure_endpoints->secure_reader.listener_.reset(new PDPListener(this)); + + endpoints = secure_endpoints; + endpoints->reader.listener_.reset(new PDPSecurityInitiatorListener(this)); + } + else +#endif // HAVE_SECURITY + { + endpoints = new fastdds::rtps::SimplePDPEndpoints(); + endpoints->reader.listener_.reset(new PDPListener(this)); + } + builtin_endpoints_.reset(endpoints); + + bool ret_val = create_dcps_participant_endpoints(); +#if HAVE_SECURITY + if (ret_val && is_secure) + { + create_dcps_participant_secure_endpoints(); + } +#endif // HAVE_SECURITY + EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Endpoints creation finished"); + return ret_val; +} + +bool PDPSimple::create_dcps_participant_endpoints() +{ const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); const RTPSParticipantAllocationAttributes& allocation = pattr.allocation; const BuiltinAttributes& builtin_att = mp_builtin->m_att; + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + assert(nullptr != endpoints); - auto endpoints = new fastdds::rtps::SimplePDPEndpoints(); - builtin_endpoints_.reset(endpoints); + constexpr const char* topic_name = "DCPSParticipant"; + const EntityId_t reader_entity_id = c_EntityId_SPDPReader; + const EntityId_t writer_entity_id = c_EntityId_SPDPWriter; - //SPDP BUILTIN RTPSParticipant READER + // BUILTIN DCPSParticipant READER + auto& reader = endpoints->reader; HistoryAttributes hatt; - hatt.payloadMaxSize = builtin_att.readerPayloadSize; - hatt.memoryPolicy = builtin_att.readerHistoryMemoryPolicy; - hatt.initialReservedCaches = 25; - if (allocation.participants.initial > 0) - { - hatt.initialReservedCaches = (int32_t)allocation.participants.initial; - } - if (allocation.participants.maximum < std::numeric_limits::max()) - { - hatt.maximumReservedCaches = (int32_t)allocation.participants.maximum; - } + hatt = pdp_reader_history_attributes(builtin_att, allocation); PoolConfig reader_pool_cfg = PoolConfig::from_history_attributes(hatt); - endpoints->reader.payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", reader_pool_cfg); - endpoints->reader.payload_pool_->reserve_history(reader_pool_cfg, true); - - endpoints->reader.history_.reset(new ReaderHistory(hatt)); - - ReaderAttributes ratt; - ratt.endpoint.multicastLocatorList = mp_builtin->m_metatrafficMulticastLocatorList; - ratt.endpoint.unicastLocatorList = mp_builtin->m_metatrafficUnicastLocatorList; - ratt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators; - ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - ratt.endpoint.topicKind = WITH_KEY; - ratt.endpoint.durabilityKind = TRANSIENT_LOCAL; + reader.payload_pool_ = TopicPayloadPoolRegistry::get(topic_name, reader_pool_cfg); + reader.payload_pool_->reserve_history(reader_pool_cfg, true); + reader.history_.reset(new ReaderHistory(hatt)); + + ReaderAttributes ratt = create_builtin_reader_attributes(); ratt.endpoint.reliabilityKind = BEST_EFFORT; - ratt.matched_writers_allocation = allocation.participants; - mp_listener = new PDPListener(this); - RTPSReader* reader = nullptr; - if (mp_RTPSParticipant->createReader(&reader, ratt, - endpoints->reader.payload_pool_, endpoints->reader.history_.get(), - mp_listener, c_EntityId_SPDPReader, true, false)) - { - endpoints->reader.reader_ = dynamic_cast(reader); + + RTPSReader* rtps_reader = nullptr; + if (mp_RTPSParticipant->createReader(&rtps_reader, ratt, reader.payload_pool_, reader.history_.get(), + reader.listener_.get(), reader_entity_id, true, false)) + { + reader.reader_ = dynamic_cast(rtps_reader); + assert(nullptr != reader.reader_); + #if HAVE_SECURITY - mp_RTPSParticipant->set_endpoint_rtps_protection_supports(reader, false); + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(rtps_reader, false); #endif // if HAVE_SECURITY } else { - EPROSIMA_LOG_ERROR(RTPS_PDP, "SimplePDP Reader creation failed"); - delete mp_listener; - mp_listener = nullptr; - endpoints->reader.release(); + EPROSIMA_LOG_ERROR(RTPS_PDP, "'" << topic_name << "' builtin reader creation failed"); + reader.release(); return false; } - //SPDP BUILTIN RTPSParticipant WRITER - hatt.payloadMaxSize = mp_builtin->m_att.writerPayloadSize; - hatt.initialReservedCaches = 1; - hatt.maximumReservedCaches = 1; - hatt.memoryPolicy = mp_builtin->m_att.writerHistoryMemoryPolicy; + // BUILTIN DCPSParticipant WRITER + auto& writer = endpoints->writer; + hatt = pdp_writer_history_attributes(builtin_att); PoolConfig writer_pool_cfg = PoolConfig::from_history_attributes(hatt); - endpoints->writer.payload_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipant", writer_pool_cfg); - endpoints->writer.payload_pool_->reserve_history(writer_pool_cfg, false); - - endpoints->writer.history_.reset(new WriterHistory(hatt)); - WriterAttributes watt; - watt.endpoint.external_unicast_locators = mp_builtin->m_att.metatraffic_external_unicast_locators; - watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - watt.endpoint.endpointKind = WRITER; - watt.endpoint.durabilityKind = TRANSIENT_LOCAL; + writer.payload_pool_ = TopicPayloadPoolRegistry::get(topic_name, writer_pool_cfg); + writer.payload_pool_->reserve_history(writer_pool_cfg, false); + writer.history_.reset(new WriterHistory(hatt)); + + WriterAttributes watt = create_builtin_writer_attributes(); watt.endpoint.reliabilityKind = BEST_EFFORT; - watt.endpoint.topicKind = WITH_KEY; watt.endpoint.remoteLocatorList = m_discovery.initialPeersList; - watt.matched_readers_allocation = allocation.participants; if (pattr.throughputController.bytesPerPeriod != UINT32_MAX && pattr.throughputController.periodMillisecs != 0) { watt.mode = ASYNCHRONOUS_WRITER; } - RTPSWriter* wout = nullptr; - if (mp_RTPSParticipant->createWriter(&wout, watt, endpoints->writer.payload_pool_, endpoints->writer.history_.get(), - nullptr, - c_EntityId_SPDPWriter, true)) + RTPSWriter* rtps_writer = nullptr; + if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.payload_pool_, writer.history_.get(), + nullptr, writer_entity_id, true)) { - endpoints->writer.writer_ = dynamic_cast(wout); + writer.writer_ = dynamic_cast(rtps_writer); + assert(nullptr != writer.writer_); + #if HAVE_SECURITY - mp_RTPSParticipant->set_endpoint_rtps_protection_supports(wout, false); + mp_RTPSParticipant->set_endpoint_rtps_protection_supports(rtps_writer, false); #endif // if HAVE_SECURITY - if (endpoints->writer.writer_ != nullptr) + + const NetworkFactory& network = mp_RTPSParticipant->network_factory(); + LocatorList_t fixed_locators; + for (const Locator_t& loc : mp_builtin->m_initialPeersList) { - const NetworkFactory& network = mp_RTPSParticipant->network_factory(); - LocatorList_t fixed_locators; - for (const Locator_t& loc : mp_builtin->m_initialPeersList) + if (network.is_locator_remote_or_allowed(loc)) { - if (network.is_locator_remote_or_allowed(loc)) - { - // Add initial peers locator without transformation as we don't know whether the - // remote transport will allow localhost - fixed_locators.push_back(loc); - - /** - * TCP special case: - * - * In TCP, it is not possible to open a socket with 'any' (0.0.0.0) address as it's done - * in UDP, so when the TCP transports receive a locator with 'any', they open an input - * channel for the specified port in each of the machine interfaces (with the exception - * of localhost). In fact, a participant with a TCP transport will only listen on localhost - * if localhost is the address of any of the initial peers. - * - * However, when the TCP enabled participant does not have a whitelist (or localhost is in - * it), it allows for transformation of its locators to localhost for performance optimizations. - * In this case, the remote TCP participant it will send data using a socket in localhost, - * and for that the participant with the initial peers list needs to be listening there - * to receive it. - * - * That means: - * 1. Checking that the initial peer is not already localhost - * 2. Checking that the initial peer locator is of TCP kind - * 3. Checking that the network configuration allows for localhost locators - */ - Locator_t local_locator; - network.transform_remote_locator(loc, local_locator, - DISC_NETWORK_CONFIGURATION_LISTENING_LOCALHOST_ALL); - if (loc != local_locator - && (loc.kind == LOCATOR_KIND_TCPv4 || loc.kind == LOCATOR_KIND_TCPv6) - && network.is_locator_allowed(local_locator)) - { - fixed_locators.push_back(local_locator); - } - } - else + // Add initial peers locator without transformation as we don't know whether the + // remote transport will allow localhost + fixed_locators.push_back(loc); + + /** + * TCP special case: + * + * In TCP, it is not possible to open a socket with 'any' (0.0.0.0) address as it's done + * in UDP, so when the TCP transports receive a locator with 'any', they open an input + * channel for the specified port in each of the machine interfaces (with the exception + * of localhost). In fact, a participant with a TCP transport will only listen on localhost + * if localhost is the address of any of the initial peers. + * + * However, when the TCP enabled participant does not have a whitelist (or localhost is in + * it), it allows for transformation of its locators to localhost for performance optimizations. + * In this case, the remote TCP participant it will send data using a socket in localhost, + * and for that the participant with the initial peers list needs to be listening there + * to receive it. + * + * That means: + * 1. Checking that the initial peer is not already localhost + * 2. Checking that the initial peer locator is of TCP kind + * 3. Checking that the network configuration allows for localhost locators + */ + Locator_t local_locator; + network.transform_remote_locator(loc, local_locator, + DISC_NETWORK_CONFIGURATION_LISTENING_LOCALHOST_ALL); + if (loc != local_locator + && (loc.kind == LOCATOR_KIND_TCPv4 || loc.kind == LOCATOR_KIND_TCPv6) + && network.is_locator_allowed(local_locator)) { - EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed."); + fixed_locators.push_back(local_locator); } } - endpoints->writer.writer_->set_fixed_locators(fixed_locators); + else + { + EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed."); + } } + writer.writer_->set_fixed_locators(fixed_locators); } else { - EPROSIMA_LOG_ERROR(RTPS_PDP, "SimplePDP Writer creation failed"); - endpoints->writer.release(); + EPROSIMA_LOG_ERROR(RTPS_PDP, "'" << topic_name << "' builtin writer creation failed"); + writer.release(); return false; } - EPROSIMA_LOG_INFO(RTPS_PDP, "SPDP Endpoints creation finished"); return true; } -void PDPSimple::assignRemoteEndpoints( - ParticipantProxyData* pdata) +#if HAVE_SECURITY +bool PDPSimple::create_dcps_participant_secure_endpoints() { - EPROSIMA_LOG_INFO(RTPS_PDP, "For RTPSParticipant: " << pdata->m_guid.guidPrefix); + const RTPSParticipantAttributes& pattr = mp_RTPSParticipant->getRTPSParticipantAttributes(); + const RTPSParticipantAllocationAttributes& allocation = pattr.allocation; + const BuiltinAttributes& builtin_att = mp_builtin->m_att; + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + assert(nullptr != endpoints); - auto endpoints = static_cast(builtin_endpoints_.get()); + constexpr const char* topic_name = "DCPSParticipantsSecure"; + const EntityId_t reader_entity_id = c_EntityId_spdp_reliable_participant_secure_reader; + const EntityId_t writer_entity_id = c_EntityId_spdp_reliable_participant_secure_writer; - const NetworkFactory& network = mp_RTPSParticipant->network_factory(); - uint32_t endp = pdata->m_availableBuiltinEndpoints; - uint32_t auxendp = endp; - bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast || - pdata->metatraffic_locators.unicast.empty(); - auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; - if (auxendp != 0) - { - auto temp_writer_data = get_temporary_writer_proxies_pool().get(); + // BUILTIN DCPSParticipantsSecure READER + auto& reader = endpoints->secure_reader; + HistoryAttributes hatt; + hatt = pdp_reader_history_attributes(builtin_att, allocation); - temp_writer_data->clear(); - temp_writer_data->guid().guidPrefix = pdata->m_guid.guidPrefix; - temp_writer_data->guid().entityId = c_EntityId_SPDPWriter; - temp_writer_data->persistence_guid(pdata->get_persistence_guid()); - temp_writer_data->set_persistence_entity_id(c_EntityId_SPDPWriter); - temp_writer_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); - temp_writer_data->m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; - temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - endpoints->reader.reader_->matched_writer_add(*temp_writer_data); + PoolConfig reader_pool_cfg = PoolConfig::from_history_attributes(hatt); + reader.payload_pool_ = TopicPayloadPoolRegistry::get(topic_name, reader_pool_cfg); + reader.payload_pool_->reserve_history(reader_pool_cfg, true); + reader.history_.reset(new ReaderHistory(hatt)); + + ReaderAttributes ratt = create_builtin_reader_attributes(); + WriterAttributes watt = create_builtin_writer_attributes(); + add_builtin_security_attributes(ratt, watt); + + RTPSReader* rtps_reader = nullptr; + if (mp_RTPSParticipant->createReader(&rtps_reader, ratt, reader.payload_pool_, reader.history_.get(), + reader.listener_.get(), reader_entity_id, true, false)) + { + reader.reader_ = dynamic_cast(rtps_reader); + assert(nullptr != reader.reader_); } - auxendp = endp; - auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; - if (auxendp != 0) + else { - auto temp_reader_data = get_temporary_reader_proxies_pool().get(); + EPROSIMA_LOG_ERROR(RTPS_PDP, "'" << topic_name << "' builtin reader creation failed"); + reader.release(); + return false; + } - temp_reader_data->clear(); - temp_reader_data->m_expectsInlineQos = false; - temp_reader_data->guid().guidPrefix = pdata->m_guid.guidPrefix; - temp_reader_data->guid().entityId = c_EntityId_SPDPReader; - temp_reader_data->set_remote_locators(pdata->metatraffic_locators, network, use_multicast_locators); - temp_reader_data->m_qos.m_reliability.kind = BEST_EFFORT_RELIABILITY_QOS; - temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; - endpoints->writer.writer_->matched_reader_add(*temp_reader_data); + // SPDP BUILTIN RTPSParticipant WRITER + auto& writer = endpoints->secure_writer; + hatt = pdp_writer_history_attributes(builtin_att); - StatelessWriter* pW = endpoints->writer.writer_; + PoolConfig writer_pool_cfg = PoolConfig::from_history_attributes(hatt); + writer.payload_pool_ = TopicPayloadPoolRegistry::get(topic_name, writer_pool_cfg); + writer.payload_pool_->reserve_history(writer_pool_cfg, false); + writer.history_.reset(new WriterHistory(hatt)); - if (pW != nullptr) + RTPSWriter* rtps_writer = nullptr; + if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.payload_pool_, writer.history_.get(), + nullptr, writer_entity_id, true)) + { + writer.writer_ = dynamic_cast(rtps_writer); + assert(nullptr != writer.writer_); + } + else + { + EPROSIMA_LOG_ERROR(RTPS_PDP, "'" << topic_name << "' builtin writer creation failed"); + writer.release(); + return false; + } + return true; +} + +#endif // HAVE_SECURITY + +void PDPSimple::assignRemoteEndpoints( + ParticipantProxyData* pdata) +{ + bool ignored = false; + notify_and_maybe_ignore_new_participant(pdata, ignored); + if (!ignored) + { +#if HAVE_SECURITY + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + if (nullptr != endpoints) { - pW->unsent_changes_reset(); + // This participant is secure. + // PDP should have been matched inside notifyAboveRemoteEndpoints after completing the authentication process. + // We now match the other builtin endpoints. + GUID_t remote_guid = pdata->m_guid; + remote_guid.entityId = c_EntityId_spdp_reliable_participant_secure_writer; + bool notify_secure = endpoints->secure_reader.reader_->matched_writer_is_matched(remote_guid); + assign_low_level_remote_endpoints(*pdata, notify_secure); } else +#endif // if HAVE_SECURITY { - EPROSIMA_LOG_ERROR(RTPS_PDP, "Using PDPSimple protocol with a reliable writer"); + // This participant is not secure. + // Match PDP and other builtin endpoints. + match_pdp_remote_endpoints(*pdata, false); + assign_low_level_remote_endpoints(*pdata, false); } } - -#if HAVE_SECURITY - // Validate remote participant - mp_RTPSParticipant->security_manager().discovered_participant(*pdata); -#else - //Inform EDP of new RTPSParticipant data: - notifyAboveRemoteEndpoints(*pdata, true); -#endif // if HAVE_SECURITY } void PDPSimple::removeRemoteEndpoints( @@ -469,38 +556,175 @@ void PDPSimple::removeRemoteEndpoints( { EPROSIMA_LOG_INFO(RTPS_PDP, "For RTPSParticipant: " << pdata->m_guid); - auto endpoints = static_cast(builtin_endpoints_.get()); + GUID_t guid = pdata->m_guid; - uint32_t endp = pdata->m_availableBuiltinEndpoints; - uint32_t auxendp = endp; - auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; - if (auxendp != 0) { - GUID_t writer_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPWriter); - endpoints->reader.reader_->matched_writer_remove(writer_guid); + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + assert(nullptr != endpoints); + + guid.entityId = c_EntityId_SPDPWriter; + endpoints->reader.reader_->matched_writer_remove(guid); + + guid.entityId = c_EntityId_SPDPReader; + endpoints->writer.writer_->matched_reader_remove(guid); } - auxendp = endp; - auxendp &= DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; - if (auxendp != 0) + +#if HAVE_SECURITY + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + if (nullptr != endpoints) { - GUID_t reader_guid(pdata->m_guid.guidPrefix, c_EntityId_SPDPReader); - endpoints->writer.writer_->matched_reader_remove(reader_guid); + guid.entityId = c_EntityId_spdp_reliable_participant_secure_writer; + endpoints->secure_reader.reader_->matched_writer_remove(guid); + + guid.entityId = c_EntityId_spdp_reliable_participant_secure_reader; + endpoints->secure_writer.writer_->matched_reader_remove(guid); } +#endif // HAVE_SECURITY } void PDPSimple::notifyAboveRemoteEndpoints( const ParticipantProxyData& pdata, bool notify_secure_endpoints) { - //Inform EDP of new RTPSParticipant data: + if (notify_secure_endpoints) + { + match_pdp_remote_endpoints(pdata, true); + } + else + { + // Add remote participant data + GUID_t writer_guid{ pdata.m_guid.guidPrefix, c_EntityId_SPDPWriter }; + ParticipantProxyData* part_data = createParticipantProxyData(pdata, writer_guid); + if (part_data != nullptr) + { + bool ignored = false; + notify_and_maybe_ignore_new_participant(part_data, ignored); + if (!ignored) + { + match_pdp_remote_endpoints(*part_data, false); + assign_low_level_remote_endpoints(*part_data, false); + } + } + } + +} + +void PDPSimple::match_pdp_remote_endpoints( + const ParticipantProxyData& pdata, + bool notify_secure_endpoints) +{ +#if !HAVE_SECURITY + static_cast(notify_secure_endpoints); +#endif // !HAVE_SECURITY + + auto endpoints = static_cast(builtin_endpoints_.get()); + + const NetworkFactory& network = mp_RTPSParticipant->network_factory(); + bool use_multicast_locators = !mp_RTPSParticipant->getAttributes().builtin.avoid_builtin_multicast || + pdata.metatraffic_locators.unicast.empty(); + const uint32_t endp = pdata.m_availableBuiltinEndpoints; + + // Default to values for non-secure endpoints + auto reliability_kind = BEST_EFFORT_RELIABILITY_QOS; + uint32_t pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; + uint32_t pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; + EntityId_t reader_entity_id = c_EntityId_SPDPReader; + EntityId_t writer_entity_id = c_EntityId_SPDPWriter; + RTPSReader* reader = endpoints->reader.reader_; + RTPSWriter* writer = endpoints->writer.writer_; + +#if HAVE_SECURITY + // If the other participant has been authenticated, use values for secure endpoints + if (notify_secure_endpoints) + { + auto secure_endpoints = static_cast(builtin_endpoints_.get()); + reliability_kind = RELIABLE_RELIABILITY_QOS; + pdp_reader_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR; + pdp_writer_mask = DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER; + reader_entity_id = c_EntityId_spdp_reliable_participant_secure_reader; + writer_entity_id = c_EntityId_spdp_reliable_participant_secure_writer; + reader = secure_endpoints->secure_reader.reader_; + writer = secure_endpoints->secure_writer.writer_; + } +#endif // HAVE_SECURITY + + if (0 != (endp & pdp_writer_mask)) + { + auto temp_writer_data = get_temporary_writer_proxies_pool().get(); + + temp_writer_data->clear(); + temp_writer_data->guid().guidPrefix = pdata.m_guid.guidPrefix; + temp_writer_data->guid().entityId = writer_entity_id; + temp_writer_data->persistence_guid(pdata.get_persistence_guid()); + temp_writer_data->set_persistence_entity_id(writer_entity_id); + temp_writer_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators); + temp_writer_data->m_qos.m_reliability.kind = reliability_kind; + temp_writer_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; +#if HAVE_SECURITY + if (notify_secure_endpoints) + { + if (!mp_RTPSParticipant->security_manager().discovered_builtin_writer( + reader->getGuid(), pdata.m_guid, *temp_writer_data, + reader->getAttributes().security_attributes())) + { + EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for writer " << + temp_writer_data->guid()); + } + } + else +#endif // HAVE_SECURITY + { + reader->matched_writer_add(*temp_writer_data); + } + } + + if (0 != (endp & pdp_reader_mask)) + { + auto temp_reader_data = get_temporary_reader_proxies_pool().get(); + + temp_reader_data->clear(); + temp_reader_data->m_expectsInlineQos = false; + temp_reader_data->guid().guidPrefix = pdata.m_guid.guidPrefix; + temp_reader_data->guid().entityId = reader_entity_id; + temp_reader_data->set_remote_locators(pdata.metatraffic_locators, network, use_multicast_locators); + temp_reader_data->m_qos.m_reliability.kind = reliability_kind; + temp_reader_data->m_qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS; +#if HAVE_SECURITY + if (notify_secure_endpoints) + { + if (!mp_RTPSParticipant->security_manager().discovered_builtin_reader( + writer->getGuid(), pdata.m_guid, *temp_reader_data, + writer->getAttributes().security_attributes())) + { + EPROSIMA_LOG_ERROR(RTPS_EDP, "Security manager returns an error for reader " << + temp_reader_data->guid()); + } + } + else +#endif // HAVE_SECURITY + { + writer->matched_reader_add(*temp_reader_data); + } + + if (BEST_EFFORT_RELIABILITY_QOS == reliability_kind) + { + endpoints->writer.writer_->unsent_changes_reset(); + } + } +} + +void PDPSimple::assign_low_level_remote_endpoints( + const ParticipantProxyData& pdata, + bool notify_secure_endpoints) +{ if (mp_EDP != nullptr) { - mp_EDP->assignRemoteEndpoints(pdata, (notify_secure_endpoints ? true : false)); + mp_EDP->assignRemoteEndpoints(pdata, notify_secure_endpoints); } if (mp_builtin->mp_WLP != nullptr) { - mp_builtin->mp_WLP->assignRemoteEndpoints(pdata, (notify_secure_endpoints ? true : false)); + mp_builtin->mp_WLP->assignRemoteEndpoints(pdata, notify_secure_endpoints); } if (mp_builtin->tlm_ != nullptr) @@ -509,6 +733,37 @@ void PDPSimple::notifyAboveRemoteEndpoints( } } +#if HAVE_SECURITY +bool PDPSimple::pairing_remote_writer_with_local_reader_after_security( + const GUID_t& local_reader, + const WriterProxyData& remote_writer_data) +{ + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + if ((nullptr != endpoints) && (local_reader == endpoints->secure_reader.reader_->getGuid())) + { + endpoints->secure_reader.reader_->matched_writer_add(remote_writer_data); + return true; + } + + return PDP::pairing_remote_writer_with_local_reader_after_security(local_reader, remote_writer_data); +} + +bool PDPSimple::pairing_remote_reader_with_local_writer_after_security( + const GUID_t& local_writer, + const ReaderProxyData& remote_reader_data) +{ + auto endpoints = dynamic_cast(builtin_endpoints_.get()); + if ((nullptr != endpoints) && (local_writer == endpoints->secure_writer.writer_->getGuid())) + { + endpoints->secure_writer.writer_->matched_reader_add(remote_reader_data); + return true; + } + + return PDP::pairing_remote_reader_with_local_writer_after_security(local_writer, remote_reader_data); +} + +#endif // HAVE_SECURITY + bool PDPSimple::newRemoteEndpointStaticallyDiscovered( const GUID_t& pguid, int16_t userDefinedId, diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp index ba09cfe566a..4e1caeea91a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp +++ b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpoints.hpp @@ -46,6 +46,11 @@ struct SimplePDPEndpoints : public PDPEndpoints return DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; } + const std::unique_ptr& main_listener() const override + { + return reader.listener_; + } + bool enable_pdp_readers( fastrtps::rtps::RTPSParticipantImpl* participant) override { diff --git a/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpointsSecure.hpp b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpointsSecure.hpp new file mode 100644 index 00000000000..7f807760d71 --- /dev/null +++ b/src/cpp/rtps/builtin/discovery/participant/simple/SimplePDPEndpointsSecure.hpp @@ -0,0 +1,109 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unful required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SimplePDPEndpointsSecure.hpp + */ + +#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTSSECURE_HPP_ +#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTSSECURE_HPP_ + +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +/** + * Container for the builtin endpoints of SPDPSimple + */ +struct SimplePDPEndpointsSecure : public SimplePDPEndpoints +{ + ~SimplePDPEndpointsSecure() override = default; + + fastrtps::rtps::BuiltinEndpointSet_t builtin_endpoints() const override + { + return SimplePDPEndpoints::builtin_endpoints() | + DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR; + } + + const std::unique_ptr& main_listener() const override + { + return secure_reader.listener_; + } + + bool enable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + return SimplePDPEndpoints::enable_pdp_readers(participant) && + participant->enableReader(secure_reader.reader_); + } + + void disable_pdp_readers( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->disableReader(secure_reader.reader_); + SimplePDPEndpoints::disable_pdp_readers(participant); + } + + void delete_pdp_endpoints( + fastrtps::rtps::RTPSParticipantImpl* participant) override + { + participant->deleteUserEndpoint(secure_reader.reader_->getGuid()); + participant->deleteUserEndpoint(secure_writer.writer_->getGuid()); + SimplePDPEndpoints::delete_pdp_endpoints(participant); + } + + void remove_from_pdp_reader_history( + const fastrtps::rtps::InstanceHandle_t& remote_participant) override + { + secure_reader.remove_from_history(remote_participant); + SimplePDPEndpoints::remove_from_pdp_reader_history(remote_participant); + } + + void remove_from_pdp_reader_history( + fastrtps::rtps::CacheChange_t* change) override + { + assert(nullptr != change); + if (change->writerGUID.entityId == fastrtps::rtps::c_EntityId_SPDPWriter) + { + secure_reader.history_->remove_change(change); + } + else + { + SimplePDPEndpoints::remove_from_pdp_reader_history(change); + } + } + + //! Builtin Simple PDP secure reader + BuiltinReader secure_reader; + + //! Builtin Simple PDP secure writer + BuiltinWriter secure_writer; +}; + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + +#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__SIMPLEPDPENDPOINTSSECURE_HPP_ diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index fde6d26edc7..d1d7742e870 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -306,14 +306,24 @@ bool StatelessReader::change_received( // TODO Revisar si no hay que incluirlo. if (!thereIsUpperRecordOf(change->writerGUID, change->sequenceNumber)) { - // Update Ownership strength. - if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind) + bool update_notified = true; + + decltype(matched_writers_)::iterator writer = matched_writers_.end(); + if ((EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind) || + (m_trustedWriterEntityId == change->writerGUID.entityId)) { - auto writer = std::find_if(matched_writers_.begin(), matched_writers_.end(), + writer = std::find_if(matched_writers_.begin(), matched_writers_.end(), [change](const RemoteWriterInfo_t& item) { return item.guid == change->writerGUID; }); + bool is_matched = matched_writers_.end() != writer; + update_notified = is_matched; + } + + // Update Ownership strength. + if (EXCLUSIVE_OWNERSHIP_QOS == m_att.ownershipKind) + { assert(matched_writers_.end() != writer); change->reader_info.writer_ownership_strength = writer->ownership_strength; } @@ -329,7 +339,11 @@ bool StatelessReader::change_received( auto seq = change->sequenceNumber; Time_t::now(change->reader_info.receptionTimestamp); - SequenceNumber_t previous_seq = update_last_notified(change->writerGUID, change->sequenceNumber); + SequenceNumber_t previous_seq{ 0, 0 }; + if (update_notified) + { + previous_seq = update_last_notified(change->writerGUID, change->sequenceNumber); + } ++total_unread_; on_data_notify(guid, change->sourceTimestamp); diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index 61dae9fbb47..ec4469cf0a7 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -680,6 +680,11 @@ bool SecurityManager::discovered_participant( { return false; } + + if (remote_participant_info->auth_status_ == AUTHENTICATION_FAILED) + { + remote_participant_info->auth_status_ = AUTHENTICATION_REQUEST_NOT_SEND; + } } bool returnedValue = true; @@ -846,6 +851,7 @@ bool SecurityManager::on_process_handshake( if (ret == VALIDATION_FAILED) { + remote_participant_info->auth_status_ = AUTHENTICATION_FAILED; on_validation_failed(participant_data, exception); return false; } @@ -4180,9 +4186,13 @@ void SecurityManager::resend_handshake_message_token( { if (remote_participant_info->handshake_requests_sent_ >= DiscoveredParticipantInfo::MAX_HANDSHAKE_REQUESTS) { - SecurityException exception; - remote_participant_info->event_->cancel_timer(); - on_validation_failed(dp_it->second->participant_data(), exception); + if (remote_participant_info->auth_status_ != AUTHENTICATION_FAILED) + { + SecurityException exception; + remote_participant_info->event_->cancel_timer(); + remote_participant_info->auth_status_ = AUTHENTICATION_FAILED; + on_validation_failed(dp_it->second->participant_data(), exception); + } } else { diff --git a/test/blackbox/common/BlackboxTestsSecurity.cpp b/test/blackbox/common/BlackboxTestsSecurity.cpp index b38151bf415..b004a815015 100644 --- a/test/blackbox/common/BlackboxTestsSecurity.cpp +++ b/test/blackbox/common/BlackboxTestsSecurity.cpp @@ -16,21 +16,24 @@ #if HAVE_SECURITY +#include +#include +#include +#include + +#include + #include "PubSubReader.hpp" #include "PubSubWriter.hpp" #include "PubSubWriterReader.hpp" #include "PubSubParticipant.hpp" -#include -#include -#include -#include - -#include +#include +#include #include -#include +#include -#include +#include using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -88,6 +91,31 @@ class Security : public testing::TestWithParam }; +struct UDPMessageSender +{ + asio::io_service service; + asio::ip::udp::socket socket; + + UDPMessageSender() + : service() + , socket(service) + { + socket.open(asio::ip::udp::v4()); + } + + void send( + const CDRMessage_t& msg, + const Locator_t& destination) + { + std::string addr = IPLocator::toIPv4string(destination); + unsigned short port = static_cast(destination.port); + auto remote = asio::ip::udp::endpoint(asio::ip::address::from_string(addr), port); + asio::error_code ec; + + socket.send_to(asio::buffer(msg.buffer, msg.length), remote, 0, ec); + } + +}; class SecurityPkcs : public ::testing::Test { @@ -441,7 +469,7 @@ TEST_P(Security, BuiltinAuthenticationPlugin_PKIDH_lossy_conditions) } // Regresion test for Refs #13295, github #2362 -TEST_P(Security, BuiltinAuthenticationPlugin_second_participant_creation_loop) +TEST(Security, BuiltinAuthenticationPlugin_second_participant_creation_loop) { constexpr size_t n_loops = 101; @@ -477,6 +505,63 @@ TEST_P(Security, BuiltinAuthenticationPlugin_second_participant_creation_loop) Log::ClearConsumers(); Log::RegisterConsumer(std::unique_ptr(new TestConsumer(n_logs))); + // Class to allow waiting for the authentication message to be sent + class AuthMessageSendStatus + { + bool message_sent_ = false; + std::mutex mutex_; + std::condition_variable cv_; + + public: + + void reset() + { + std::lock_guard < std::mutex> guard(mutex_); + message_sent_ = false; + } + + void notify() + { + std::lock_guard guard(mutex_); + message_sent_ = true; + cv_.notify_one(); + } + + void wait() + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() -> bool + { + return message_sent_; + }); + } + + }; + + // Prepare transport to check that the authentication message is sent + auto transport = std::make_shared(); + AuthMessageSendStatus auth_message_send_status; + transport->drop_data_messages_filter_ = [&auth_message_send_status](eprosima::fastrtps::rtps::CDRMessage_t& msg) + -> bool + { + auto old_pos = msg.pos; + + // Jump to writer entity id + msg.pos += 2 + 2 + 4; + + // Read writer entity id + eprosima::fastrtps::rtps::GUID_t writer_guid; + eprosima::fastrtps::rtps::CDRMessage::readEntityId(&msg, &writer_guid.entityId); + msg.pos = old_pos; + + if (writer_guid.entityId == eprosima::fastrtps::rtps::participant_stateless_message_writer_entity_id) + { + auth_message_send_status.notify(); + } + + return false; + }; + // Prepare participant properties PropertyPolicy property_policy; property_policy.properties().emplace_back(Property("dds.sec.auth.plugin", "builtin.PKI-DH")); @@ -489,6 +574,7 @@ TEST_P(Security, BuiltinAuthenticationPlugin_second_participant_creation_loop) // Create the participant being checked PubSubReader main_participant("HelloWorldTopic"); + main_participant.disable_builtin_transport().add_user_transport_to_pparams(transport); main_participant.property_policy(property_policy).init(); EXPECT_TRUE(main_participant.isInitialized()); @@ -501,13 +587,14 @@ TEST_P(Security, BuiltinAuthenticationPlugin_second_participant_creation_loop) // Wait for undiscovery so we can wait for discovery below EXPECT_TRUE(main_participant.wait_participant_undiscovery()); + auth_message_send_status.reset(); // Create another participant with authentication enabled PubSubParticipant other_participant(0, 0, 0, 0); EXPECT_TRUE(other_participant.property_policy(property_policy).init_participant()); - // Wait for the new participant to be discovered by the main one - EXPECT_TRUE(main_participant.wait_participant_discovery()); + // Wait for the main participant to send an authentication message to the other participant + auth_message_send_status.wait(); // The created participant gets out of scope here, and is destroyed } @@ -3668,11 +3755,14 @@ TEST(Security, AllowUnauthenticatedParticipants_TwoSecureParticipantsWithDiffere ASSERT_TRUE(writer.isInitialized()); - //! Wait enough time for the PKI requests to time out and give validation_failed (~15secs) - writer.wait_discovery(std::chrono::seconds(20)); + //! Wait for the authorization to fail (~15secs) + writer.waitUnauthorized(); + + //! Wait for the discovery + writer.wait_discovery(); //! check that the writer matches the reader because of having allow_unauthenticated_participants enabled - ASSERT_TRUE(writer.get_matched()); + ASSERT_TRUE(writer.is_matched()); //! Data is correctly sent and received auto data = default_helloworld_data_generator(); @@ -3748,13 +3838,16 @@ TEST(Security, AllowUnauthenticatedParticipants_TwoParticipantsDifferentCertific ASSERT_TRUE(writer.isInitialized()); - //! Wait enough time for the PKI requests to time out and give validation_failed (~15secs) - writer.wait_discovery(std::chrono::seconds(20)); + //! Wait for the authorization to fail (~15secs) + writer.waitUnauthorized(); + + //! Wait some time afterwards (this will time out) + writer.wait_discovery(std::chrono::seconds(1)); //! check that the writer does not match the reader because of //! having read and write protection enabled //! despite allow_unauthenticated_participants is enabled - ASSERT_FALSE(writer.get_matched()); + ASSERT_FALSE(writer.is_matched()); } // *INDENT-OFF* @@ -4497,32 +4590,7 @@ TEST(Security, MaliciousHeartbeatIgnore) return avoid_sec_submessages.load() && (0x30 == (msg.buffer[msg.pos] & 0xF0)); }; - struct FakeMsg - { - asio::io_service service; - asio::ip::udp::socket socket; - - FakeMsg() - : service() - , socket(service) - { - socket.open(asio::ip::udp::v4()); - } - - void send( - const CDRMessage_t& msg, - const Locator_t& destination) - { - std::string addr = IPLocator::toIPv4string(destination); - unsigned short port = static_cast(destination.port); - auto remote = asio::ip::udp::endpoint(asio::ip::address::from_string(addr), port); - asio::error_code ec; - - socket.send_to(asio::buffer(msg.buffer, msg.length), remote, 0, ec); - } - - }; - FakeMsg fake_msg; + UDPMessageSender fake_msg_sender; writer.disable_builtin_transport().add_user_transport_to_pparams(transport); reader.disable_builtin_transport().add_user_transport_to_pparams(transport); @@ -4574,7 +4642,7 @@ TEST(Security, MaliciousHeartbeatIgnore) msg.init(reinterpret_cast(&hb), msg_len); msg.length = msg_len; msg.pos = msg_len; - fake_msg.send(msg, reader_locator); + fake_msg_sender.send(msg, reader_locator); } // Enable secure submessages @@ -4583,6 +4651,125 @@ TEST(Security, MaliciousHeartbeatIgnore) reader.block_for_all(); } +TEST_P(Security, MaliciousParticipantRemovalIgnore) +{ + PubSubWriter writer("HelloWorldTopic_MaliciousParticipantRemovalIgnore"); + PubSubReader reader("HelloWorldTopic_MaliciousParticipantRemovalIgnore"); + + struct MaliciousParticipantRemoval + { + std::array rtps_id{ {'R', 'T', 'P', 'S'} }; + std::array protocol_version{ {2, 3} }; + std::array vendor_id{ {0x01, 0x0F} }; + GuidPrefix_t sender_prefix{}; + + struct DataSubMsg + { + struct Header + { + uint8_t submessage_id = 0x15; +#if FASTDDS_IS_BIG_ENDIAN_TARGET + uint8_t flags = 0x02; +#else + uint8_t flags = 0x03; +#endif // FASTDDS_IS_BIG_ENDIAN_TARGET + uint16_t submessage_length = 2 + 2 + 4 + 4 + 8; + uint16_t extra_flags = 0; + uint16_t octets_to_inline_qos = 4 + 4 + 8; + EntityId_t reader_id{}; + EntityId_t writer_id{}; + SequenceNumber_t sn{}; + }; + + struct InlineQos + { + struct KeyHash + { + uint16_t pid = 0x0070; // PID_KEY_HASH + uint16_t plen = 16; + GUID_t guid{}; + }; + + struct StatusInfo + { + uint16_t pid = 0x0071; // PID_STATUS_INFO + uint16_t plen = 4; + uint8_t flags[4] = { 0x00, 0x00, 0x00, 0x03 }; + }; + + struct Sentinel + { + uint16_t pid = 0x0001; // PID_SENTINEL + uint16_t plen = 0; + }; + + KeyHash hash; + StatusInfo status; + Sentinel sentinel; + }; + + Header header; + InlineQos inline_qos; + } + data; + }; + + // Set common QoS + reader.history_depth(10).reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); + writer.history_depth(10).reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS); + + // Configure security + const std::string governance_file("governance_helloworld_all_enable.smime"); + const std::string permissions_file("permissions_helloworld.smime"); + CommonPermissionsConfigure(reader, writer, governance_file, permissions_file); + + // Initialize and wait for authorization and discovery + reader.init(); + ASSERT_TRUE(reader.isInitialized()); + writer.init(); + ASSERT_TRUE(writer.isInitialized()); + reader.waitAuthorized(); + writer.waitAuthorized(); + reader.wait_discovery(); + writer.wait_discovery(); + + // Send fake DATA(p[UD]) + UDPMessageSender fake_msg_sender; + { + auto writer_guid = writer.datawriter_guid(); + auto participant_guid = writer.participant_guid(); + auto domain_id = static_cast(GET_PID() % 230); + + MaliciousParticipantRemoval packet{}; + packet.sender_prefix = writer_guid.guidPrefix; + packet.data.header.submessage_length += sizeof(packet.data.inline_qos); + packet.data.header.writer_id = c_EntityId_SPDPWriter; + packet.data.header.reader_id = c_EntityId_SPDPReader; + packet.data.header.sn.low = 100; + packet.data.inline_qos.hash.guid = participant_guid; + + Locator_t mcast_locator; + ASSERT_TRUE(IPLocator::setIPv4(mcast_locator, "239.255.0.1")); + mcast_locator.port = 7400 + 250 * domain_id; + + CDRMessage_t msg(0); + uint32_t msg_len = static_cast(sizeof(packet)); + msg.init(reinterpret_cast(&packet), msg_len); + msg.length = msg_len; + msg.pos = msg_len; + fake_msg_sender.send(msg, mcast_locator); + } + + EXPECT_FALSE(reader.wait_participant_undiscovery(std::chrono::seconds(1))); + + auto data = default_helloworld_data_generator(); + reader.startReception(data); + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); +} + + void blackbox_security_init() { certs_path = std::getenv("CERTS_PATH"); diff --git a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h index 45ad9aa55a6..003306a10bb 100644 --- a/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h +++ b/test/mock/rtps/RTPSParticipantImpl/rtps/participant/RTPSParticipantImpl.h @@ -329,6 +329,8 @@ class RTPSParticipantImpl MOCK_METHOD(bool, should_match_local_endpoints, ()); + MOCK_METHOD(bool, ignore_participant, (const GuidPrefix_t&)); + private: MockParticipantListener listener_; diff --git a/test/unittest/rtps/discovery/PDPTests.cpp b/test/unittest/rtps/discovery/PDPTests.cpp index a4c19bfac38..67321b3cd97 100644 --- a/test/unittest/rtps/discovery/PDPTests.cpp +++ b/test/unittest/rtps/discovery/PDPTests.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include +#include #include #include @@ -71,6 +73,11 @@ class TesterPDPEndpoints : public fastdds::rtps::PDPEndpoints return DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; } + const std::unique_ptr& main_listener() const override + { + return no_listener_; + } + bool enable_pdp_readers( fastrtps::rtps::RTPSParticipantImpl*) override { @@ -101,6 +108,8 @@ class TesterPDPEndpoints : public fastdds::rtps::PDPEndpoints } + std::unique_ptr no_listener_; + }; class PDPTester : public PDP