diff --git a/include/fastdds/rtps/builtin/discovery/participant/PDP.h b/include/fastdds/rtps/builtin/discovery/participant/PDP.h index e38b532c1ee..f9df13b2264 100644 --- a/include/fastdds/rtps/builtin/discovery/participant/PDP.h +++ b/include/fastdds/rtps/builtin/discovery/participant/PDP.h @@ -22,9 +22,12 @@ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include +#include #include #include #include +#include +#include #include #include @@ -61,6 +64,7 @@ class RTPSWriter; class RTPSReader; class WriterHistory; class ReaderHistory; +struct RTPSParticipantAllocationAttributes; class RTPSParticipantImpl; class RTPSParticipantListener; class BuiltinProtocols; @@ -70,6 +74,7 @@ class ReaderProxyData; class WriterProxyData; class ParticipantProxyData; class ReaderListener; +class PDPEndpoints; class PDPListener; class PDPServerListener; class ITopicPayloadPool; diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 4a77accc971..2b63b846881 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -144,6 +144,127 @@ static bool get_unique_flows_parameters( return true; } +// -- The following is a backport from 3.x of a PDP method +static void local_participant_attributes_update_nts( + const RTPSParticipantAttributes& new_atts, + PDP* pdp, + RTPSParticipantImpl* participant) +{ + auto participant_data = pdp->getLocalParticipantProxyData(); + participant_data->m_userData.data_vec(new_atts.userData); + + // If we are intraprocess only, we do not need to update locators + bool announce_locators = !participant->is_intraprocess_only(); + if (announce_locators) + { + // Clear all locators + participant_data->metatraffic_locators.unicast.clear(); + participant_data->metatraffic_locators.multicast.clear(); + participant_data->default_locators.unicast.clear(); + participant_data->default_locators.multicast.clear(); + + // Update default locators + for (const Locator_t& loc : new_atts.defaultUnicastLocatorList) + { + participant_data->default_locators.add_unicast_locator(loc); + } + for (const Locator_t& loc : new_atts.defaultMulticastLocatorList) + { + participant_data->default_locators.add_multicast_locator(loc); + } + + // Update metatraffic locators + for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList) + { + participant_data->metatraffic_locators.add_unicast_locator(locator); + } + if (!new_atts.builtin.avoid_builtin_multicast || + participant_data->metatraffic_locators.unicast.empty()) + { + for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList) + { + participant_data->metatraffic_locators.add_multicast_locator(locator); + } + } + + fastdds::rtps::network::external_locators::add_external_locators(*participant_data, + new_atts.builtin.metatraffic_external_unicast_locators, + new_atts.default_external_unicast_locators); + } +} + +// -- The following is a backport from 3.x of a PDP method +static void update_endpoint_locators_if_default_nts( + const std::vector& writers, + const std::vector& readers, + const RTPSParticipantAttributes& old_atts, + const RTPSParticipantAttributes& new_atts, + PDP* pdp) +{ + // Check if default locators have changed + const auto& old_default_unicast = old_atts.defaultUnicastLocatorList; + const auto& old_default_multicast = old_atts.defaultMulticastLocatorList; + const auto& new_default_unicast = new_atts.defaultUnicastLocatorList; + const auto& new_default_multicast = new_atts.defaultMulticastLocatorList; + + // Early return if there is no change in default unicast locators + if ((old_default_unicast == new_default_unicast) && + (old_default_multicast == new_default_multicast)) + { + return; + } + + // Update proxies of endpoints with default configured locators + EDP* edp = pdp->getEDP(); + for (RTPSWriter* writer : writers) + { + if ((old_default_multicast == writer->getAttributes().multicastLocatorList) && + (old_default_unicast == writer->getAttributes().unicastLocatorList)) + { + writer->getAttributes().multicastLocatorList = new_default_multicast; + writer->getAttributes().unicastLocatorList = new_default_unicast; + + WriterProxyData* wdata = nullptr; + GUID_t participant_guid; + wdata = pdp->addWriterProxyData(writer->getGuid(), participant_guid, + [](WriterProxyData* proxy, bool is_update, + const ParticipantProxyData& participant) + { + static_cast(is_update); + assert(is_update); + proxy->set_locators(participant.default_locators); + return true; + }); + assert(wdata != nullptr); + edp->processLocalWriterProxyData(writer, wdata); + } + } + for (RTPSReader* reader : readers) + { + if ((old_default_multicast == reader->getAttributes().multicastLocatorList) && + (old_default_unicast == reader->getAttributes().unicastLocatorList)) + { + reader->getAttributes().multicastLocatorList = new_default_multicast; + reader->getAttributes().unicastLocatorList = new_default_unicast; + + ReaderProxyData* rdata = nullptr; + GUID_t participant_guid; + rdata = pdp->addReaderProxyData(reader->getGuid(), participant_guid, + [](ReaderProxyData* proxy, bool is_update, + const ParticipantProxyData& participant) + { + static_cast(is_update); + assert(is_update); + + proxy->set_locators(participant.default_locators); + return true; + }); + assert(rdata != nullptr); + edp->processLocalReaderProxyData(reader, rdata); + } + } +} + Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule( Locator_t& loc) { @@ -1377,29 +1498,49 @@ bool RTPSParticipantImpl::registerReader( void RTPSParticipantImpl::update_attributes( const RTPSParticipantAttributes& patt) { + // Avoid ABBA with PDP mutex by using a local copy of the attributes + RTPSParticipantAttributes temp_atts; + { + std::lock_guard guard(*mp_mutex); + temp_atts = m_att; + } + bool local_interfaces_changed = false; +<<<<<<< HEAD +======= + // Update cached network interfaces + if (!SystemInfo::update_interfaces()) + { + EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, + "Failed to update cached network interfaces during " << temp_atts.getName() << + " attributes update"); + } + +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)) // Check if new interfaces have been added if (internal_metatraffic_locators_) { - LocatorList_t metatraffic_unicast_locator_list = m_att.builtin.metatrafficUnicastLocatorList; - get_default_metatraffic_locators(); - if (!(metatraffic_unicast_locator_list == m_att.builtin.metatrafficUnicastLocatorList)) + LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList; + temp_atts.builtin.metatrafficUnicastLocatorList.clear(); + get_default_metatraffic_locators(temp_atts); + if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList)) { local_interfaces_changed = true; - EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, m_att.getName() << " updated its metatraffic locators"); + EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, temp_atts.getName() << " updated its metatraffic locators"); } } if (internal_default_locators_) { - LocatorList_t default_unicast_locator_list = m_att.defaultUnicastLocatorList; - get_default_unicast_locators(); - if (!(default_unicast_locator_list == m_att.defaultUnicastLocatorList)) + LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList; + temp_atts.defaultUnicastLocatorList.clear(); + get_default_unicast_locators(temp_atts); + if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList)) { local_interfaces_changed = true; EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, - m_att.getName() << " updated default unicast locator list, current locators: " - << m_att.defaultUnicastLocatorList); + temp_atts.getName() << " updated default unicast locator list, current locators: " + << temp_atts.defaultUnicastLocatorList); } } @@ -1414,9 +1555,9 @@ void RTPSParticipantImpl::update_attributes( // Check if discovery servers need to be updated eprosima::fastdds::rtps::RemoteServerList_t converted_discovery_servers = patt.builtin.discovery_config.m_DiscoveryServers; - if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers) + if (converted_discovery_servers != temp_atts.builtin.discovery_config.m_DiscoveryServers) { - for (auto& transportDescriptor : m_att.userTransports) + for (auto& transportDescriptor : temp_atts.userTransports) { TCPTransportDescriptor* pT = dynamic_cast(transportDescriptor.get()); if (pT) @@ -1441,8 +1582,8 @@ void RTPSParticipantImpl::update_attributes( } // Check if there are changes - if (converted_discovery_servers != m_att.builtin.discovery_config.m_DiscoveryServers - || patt.userData != m_att.userData + if (converted_discovery_servers != temp_atts.builtin.discovery_config.m_DiscoveryServers + || patt.userData != temp_atts.userData || local_interfaces_changed) { update_pdp = true; @@ -1450,7 +1591,7 @@ void RTPSParticipantImpl::update_attributes( LocatorList_t modified_locators; // Update RTPSParticipantAttributes members - m_att.userData = patt.userData; + temp_atts.userData = patt.userData; // If there's no PDP don't process Discovery-related attributes. if (!pdp) @@ -1463,6 +1604,7 @@ void RTPSParticipantImpl::update_attributes( namespace ExternalLocatorsProcessor = fastdds::rtps::ExternalLocatorsProcessor; if (local_interfaces_changed && internal_metatraffic_locators_) { +<<<<<<< HEAD ExternalLocatorsProcessor::set_listening_locators(m_att.builtin.metatraffic_external_unicast_locators, m_att.builtin.metatrafficUnicastLocatorList); } @@ -1470,12 +1612,21 @@ void RTPSParticipantImpl::update_attributes( { ExternalLocatorsProcessor::set_listening_locators(m_att.default_external_unicast_locators, m_att.defaultUnicastLocatorList); +======= + set_listening_locators(temp_atts.builtin.metatraffic_external_unicast_locators, + temp_atts.builtin.metatrafficUnicastLocatorList); + } + if (local_interfaces_changed && internal_default_locators_) + { + set_listening_locators(temp_atts.default_external_unicast_locators, + temp_atts.defaultUnicastLocatorList); +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)) } } // Check that the remote servers list is consistent: all the already known remote servers must be included in // the list and either new remote servers are added or remote server listening locator is modified. - for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers) + for (auto existing_server : temp_atts.builtin.discovery_config.m_DiscoveryServers) { bool contained = false; for (auto incoming_server : converted_discovery_servers) @@ -1514,34 +1665,22 @@ void RTPSParticipantImpl::update_attributes( } } + // Update discovery information { std::lock_guard lock(*pdp->getMutex()); + local_participant_attributes_update_nts(temp_atts, pdp, this); - // Update user data - auto local_participant_proxy_data = pdp->getLocalParticipantProxyData(); - local_participant_proxy_data->m_userData.data_vec(m_att.userData); - - // Update metatraffic locators - for (auto locator : m_att.builtin.metatrafficMulticastLocatorList) - { - local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator); - } - for (auto locator : m_att.builtin.metatrafficUnicastLocatorList) - { - local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator); - } - - // Update default locators - for (auto locator : m_att.defaultUnicastLocatorList) + if (local_interfaces_changed && internal_default_locators_) { - local_participant_proxy_data->default_locators.add_unicast_locator(locator); + std::lock_guard _(endpoints_list_mutex); + update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts, pdp); } if (local_interfaces_changed) { - createSenderResources(m_att.builtin.metatrafficMulticastLocatorList); - createSenderResources(m_att.builtin.metatrafficUnicastLocatorList); - createSenderResources(m_att.defaultUnicastLocatorList); + createSenderResources(temp_atts.builtin.metatrafficMulticastLocatorList); + createSenderResources(temp_atts.builtin.metatrafficUnicastLocatorList); + createSenderResources(temp_atts.defaultUnicastLocatorList); } if (!modified_locators.empty()) { @@ -1549,17 +1688,17 @@ void RTPSParticipantImpl::update_attributes( } // Update remote servers list - if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || - m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT || - m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || - m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) + if (temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || + temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT || + temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || + temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) { // Add incoming servers if we don't know about them already or the listening locator has been modified for (auto incoming_server : converted_discovery_servers) { eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it; - for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin(); - server_it != m_att.builtin.discovery_config.m_DiscoveryServers.end(); server_it++) + for (server_it = temp_atts.builtin.discovery_config.m_DiscoveryServers.begin(); + server_it != temp_atts.builtin.discovery_config.m_DiscoveryServers.end(); server_it++) { if (server_it->guidPrefix == incoming_server.guidPrefix) { @@ -1576,21 +1715,21 @@ void RTPSParticipantImpl::update_attributes( break; } } - if (server_it == m_att.builtin.discovery_config.m_DiscoveryServers.end()) + if (server_it == temp_atts.builtin.discovery_config.m_DiscoveryServers.end()) { - m_att.builtin.discovery_config.m_DiscoveryServers.push_back(incoming_server); + temp_atts.builtin.discovery_config.m_DiscoveryServers.push_back(incoming_server); } } // Update the servers list in builtin protocols { std::unique_lock disc_lock(mp_builtinProtocols->getDiscoveryMutex()); - mp_builtinProtocols->m_DiscoveryServers = m_att.builtin.discovery_config.m_DiscoveryServers; + mp_builtinProtocols->m_DiscoveryServers = temp_atts.builtin.discovery_config.m_DiscoveryServers; } // Notify PDPServer - if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || - m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) + if (temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || + temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) { fastdds::rtps::PDPServer* pdp_server = static_cast(pdp); pdp_server->update_remote_servers_list(); @@ -1601,8 +1740,8 @@ void RTPSParticipantImpl::update_attributes( } } // Notify PDPClient - else if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || - m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT) + else if (temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || + temp_atts.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT) { fastdds::rtps::PDPClient* pdp_client = static_cast(pdp); pdp_client->update_remote_servers_list(); @@ -1614,6 +1753,12 @@ void RTPSParticipantImpl::update_attributes( } } } + + // Update the attributes data member + { + std::lock_guard guard(*mp_mutex); + m_att = temp_atts; + } } if (update_pdp) @@ -2713,22 +2858,34 @@ void RTPSParticipantImpl::environment_file_has_changed() void RTPSParticipantImpl::get_default_metatraffic_locators() { - uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_); + get_default_metatraffic_locators(m_att); +} + +void RTPSParticipantImpl::get_default_metatraffic_locators( + RTPSParticipantAttributes& att) +{ + uint32_t metatraffic_multicast_port = att.port.getMulticastPort(domain_id_); - m_network_Factory.getDefaultMetatrafficMulticastLocators(m_att.builtin.metatrafficMulticastLocatorList, + m_network_Factory.getDefaultMetatrafficMulticastLocators(att.builtin.metatrafficMulticastLocatorList, metatraffic_multicast_port); - m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList); + m_network_Factory.NormalizeLocators(att.builtin.metatrafficMulticastLocatorList); - m_network_Factory.getDefaultMetatrafficUnicastLocators(m_att.builtin.metatrafficUnicastLocatorList, + m_network_Factory.getDefaultMetatrafficUnicastLocators(att.builtin.metatrafficUnicastLocatorList, metatraffic_unicast_port_); - m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList); + m_network_Factory.NormalizeLocators(att.builtin.metatrafficUnicastLocatorList); } void RTPSParticipantImpl::get_default_unicast_locators() { - uint32_t unicast_port = metatraffic_unicast_port_ + m_att.port.offsetd3 - m_att.port.offsetd1; - m_network_Factory.getDefaultUnicastLocators(m_att.defaultUnicastLocatorList, unicast_port); - m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); + get_default_unicast_locators(m_att); +} + +void RTPSParticipantImpl::get_default_unicast_locators( + RTPSParticipantAttributes& att) +{ + uint32_t unicast_port = metatraffic_unicast_port_ + att.port.offsetd3 - att.port.offsetd1; + m_network_Factory.getDefaultUnicastLocators(att.defaultUnicastLocatorList, unicast_port); + m_network_Factory.NormalizeLocators(att.defaultUnicastLocatorList); } bool RTPSParticipantImpl::is_participant_ignored( diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index 1bb84058698..e8e61b91386 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -762,11 +762,17 @@ class RTPSParticipantImpl */ void get_default_metatraffic_locators(); + void get_default_metatraffic_locators( + RTPSParticipantAttributes& att); + /** * Get default unicast locators when not provided by the user. */ void get_default_unicast_locators(); + void get_default_unicast_locators( + RTPSParticipantAttributes& att); + bool match_local_endpoints_ = true; bool should_match_local_endpoints( diff --git a/test/communication/PublisherMain.cpp b/test/communication/PublisherMain.cpp index f971536d215..8fe7d6fccaa 100644 --- a/test/communication/PublisherMain.cpp +++ b/test/communication/PublisherMain.cpp @@ -1,7 +1,25 @@ #include "Publisher.hpp" #include +<<<<<<< HEAD:test/communication/PublisherMain.cpp #include +======= + +using namespace eprosima::fastdds::dds; + +/* ARGUMENTS + * --exit_on_lost_liveliness + * --fixed_type + * --zero_copy + * --seed + * --wait + * --samples + * --interval + * --magic + * --xmlfile + * --rescan + */ +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)):test/dds/communication/security/PublisherMain.cpp int main( int argc, @@ -13,6 +31,11 @@ int main( uint32_t seed = 7800, wait = 0; char* xml_file = nullptr; uint32_t samples = 4; +<<<<<<< HEAD:test/communication/PublisherMain.cpp +======= + uint32_t interval = 250; + uint32_t rescan_interval_seconds = 0; +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)):test/dds/communication/security/PublisherMain.cpp std::string magic; while (arg_count < argc) @@ -76,6 +99,16 @@ int main( xml_file = argv[arg_count]; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -99,7 +132,11 @@ int main( publisher.wait_discovery(wait); } +<<<<<<< HEAD:test/communication/PublisherMain.cpp publisher.run(samples); +======= + publisher.run(samples, rescan_interval_seconds, 0, interval); +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)):test/dds/communication/security/PublisherMain.cpp return 0; } diff --git a/test/communication/SubscriberMain.cpp b/test/communication/SubscriberMain.cpp index 261891663dd..b8d218c5584 100644 --- a/test/communication/SubscriberMain.cpp +++ b/test/communication/SubscriberMain.cpp @@ -1,7 +1,25 @@ #include "Subscriber.hpp" #include +<<<<<<< HEAD:test/communication/SubscriberMain.cpp #include +======= + +using namespace eprosima::fastdds::dds; + +/* ARGUMENTS + * --notexit + * --fixed_type + * --zero_copy + * --seed + * --samples + * --magic + * --xmlfile + * --publishers + * --die_on_data_received + * --rescan + */ +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)):test/dds/communication/security/SubscriberMain.cpp int main( int argc, @@ -14,6 +32,7 @@ int main( uint32_t seed = 7800; uint32_t samples = 4; uint32_t publishers = 1; + uint32_t rescan_interval_seconds = 0; char* xml_file = nullptr; std::string magic; @@ -82,6 +101,16 @@ int main( { die_on_data_received = true; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -100,7 +129,7 @@ int main( if (subscriber.init(seed, magic, fixed_type)) { - return subscriber.run(notexit) ? 0 : -1; + return subscriber.run(notexit, rescan_interval_seconds) ? 0 : -1; } return -1; diff --git a/test/dds/communication/CMakeLists.txt b/test/dds/communication/CMakeLists.txt index d099dac93ce..5c02f1db47b 100644 --- a/test/dds/communication/CMakeLists.txt +++ b/test/dds/communication/CMakeLists.txt @@ -206,3 +206,7 @@ if(Python3_Interpreter_FOUND) endif() endforeach() endif() + +if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID)) + add_subdirectory(dyn_network) +endif() diff --git a/test/dds/communication/PubSubMain.cpp b/test/dds/communication/PubSubMain.cpp index 46553c9f64b..11ea78de1ee 100644 --- a/test/dds/communication/PubSubMain.cpp +++ b/test/dds/communication/PubSubMain.cpp @@ -52,7 +52,7 @@ void publisher_run( publisher->wait_discovery(wait); } - publisher->run(samples, loops, interval); + publisher->run(samples, 0, loops, interval); } int main( @@ -197,7 +197,7 @@ int main( DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file); } - SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy); + SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false); PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy); uint32_t result = 1; @@ -208,7 +208,7 @@ int main( if (subscriber.init(seed, magic)) { - result = subscriber.run(notexit, timeout) ? 0 : -1; + result = subscriber.run(notexit, 0, timeout) ? 0 : -1; } publisher_thread.join(); diff --git a/test/dds/communication/PublisherMain.cpp b/test/dds/communication/PublisherMain.cpp index 0d72b20ae5b..3124e1c592d 100644 --- a/test/dds/communication/PublisherMain.cpp +++ b/test/dds/communication/PublisherMain.cpp @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds; * --seed * --wait * --samples + * --loops + * --interval * --magic * --xmlfile - * --interval + * --rescan */ int main( @@ -46,7 +48,9 @@ int main( uint32_t wait = 0; char* xml_file = nullptr; uint32_t samples = 4; + uint32_t loops = 0; uint32_t interval = 250; + uint32_t rescan_interval_seconds = 0; std::string magic; while (arg_count < argc) @@ -93,6 +97,16 @@ int main( samples = strtol(argv[arg_count], nullptr, 10); } + else if (strcmp(argv[arg_count], "--loops") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--loops expects a parameter" << std::endl; + return -1; + } + + loops = strtol(argv[arg_count], nullptr, 10); + } else if (strcmp(argv[arg_count], "--interval") == 0) { if (++arg_count >= argc) @@ -123,6 +137,16 @@ int main( xml_file = argv[arg_count]; } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -146,7 +170,7 @@ int main( publisher.wait_discovery(wait); } - publisher.run(samples, 0, interval); + publisher.run(samples, rescan_interval_seconds, loops, interval); return 0; } diff --git a/test/dds/communication/PublisherModule.cpp b/test/dds/communication/PublisherModule.cpp index 35debd61a9c..e664090fbd1 100644 --- a/test/dds/communication/PublisherModule.cpp +++ b/test/dds/communication/PublisherModule.cpp @@ -131,6 +131,7 @@ void PublisherModule::wait_discovery( void PublisherModule::run( uint32_t samples, + const uint32_t rescan_interval, const uint32_t loops, uint32_t interval) { @@ -138,6 +139,22 @@ void PublisherModule::run( uint16_t index = 1; void* sample = nullptr; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (run_ && (loops == 0 || loops > current_loop)) { if (zero_copy_) @@ -184,6 +201,9 @@ void PublisherModule::run( std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } + + run_ = false; + net_rescan_thread.join(); } void PublisherModule::on_publication_matched( diff --git a/test/dds/communication/PublisherModule.hpp b/test/dds/communication/PublisherModule.hpp index fb144821a0a..593d224c6d6 100644 --- a/test/dds/communication/PublisherModule.hpp +++ b/test/dds/communication/PublisherModule.hpp @@ -19,6 +19,11 @@ #ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP #define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP +#include +#include +#include +#include + #include #include #include @@ -27,9 +32,6 @@ #include "types/FixedSizedPubSubTypes.h" #include "types/HelloWorldPubSubTypes.h" -#include -#include - namespace eprosima { namespace fastdds { namespace dds { @@ -41,8 +43,8 @@ class PublisherModule PublisherModule( bool exit_on_lost_liveliness, - bool fixed_type = false, - bool zero_copy = false) + bool fixed_type, + bool zero_copy) : exit_on_lost_liveliness_(exit_on_lost_liveliness) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required , zero_copy_(zero_copy) @@ -80,8 +82,9 @@ class PublisherModule void run( uint32_t samples, - uint32_t loops = 0, - uint32_t interval = 250); + const uint32_t rescan_interval, + uint32_t loops, + uint32_t interval); private: @@ -93,7 +96,7 @@ class PublisherModule bool exit_on_lost_liveliness_ = false; bool fixed_type_ = false; bool zero_copy_ = false; - bool run_ = true; + std::atomic_bool run_{true}; DomainParticipant* participant_ = nullptr; TypeSupport type_; Publisher* publisher_ = nullptr; diff --git a/test/dds/communication/SubscriberMain.cpp b/test/dds/communication/SubscriberMain.cpp index e1e578c64c7..a90ca04f478 100644 --- a/test/dds/communication/SubscriberMain.cpp +++ b/test/dds/communication/SubscriberMain.cpp @@ -26,13 +26,15 @@ using namespace eprosima::fastdds::dds; * --notexit * --fixed_type * --zero_copy + * --succeed_on_timeout * --seed * --samples * --magic + * --timeout * --xmlfile * --publishers - * --succeed_on_timeout - * --timeout + * --die_on_data_received + * --rescan */ int main( @@ -48,6 +50,7 @@ int main( uint32_t samples = 4; uint32_t publishers = 1; uint32_t timeout = 86400000; // 24 h in ms + uint32_t rescan_interval_seconds = 0; char* xml_file = nullptr; std::string magic; @@ -129,6 +132,23 @@ int main( publishers = strtol(argv[arg_count], nullptr, 10); } +<<<<<<< HEAD +======= + else if (strcmp(argv[arg_count], "--die_on_data_received") == 0) + { + die_on_data_received = true; + } + else if (strcmp(argv[arg_count], "--rescan") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--rescan expects a parameter" << std::endl; + return -1; + } + + rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10); + } +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)) else { std::cout << "Wrong argument " << argv[arg_count] << std::endl; @@ -147,7 +167,7 @@ int main( if (subscriber.init(seed, magic)) { - return subscriber.run(notexit, timeout) ? 0 : -1; + return subscriber.run(notexit, rescan_interval_seconds, timeout) ? 0 : -1; } return -1; diff --git a/test/dds/communication/SubscriberModule.cpp b/test/dds/communication/SubscriberModule.cpp index a1d2b7b2a24..d9516389096 100644 --- a/test/dds/communication/SubscriberModule.cpp +++ b/test/dds/communication/SubscriberModule.cpp @@ -130,17 +130,35 @@ bool SubscriberModule::init( bool SubscriberModule::run( bool notexit, + const uint32_t rescan_interval, uint32_t timeout) { - return run_for(notexit, std::chrono::milliseconds(timeout)); + return run_for(notexit, rescan_interval, std::chrono::milliseconds(timeout)); } bool SubscriberModule::run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout) { bool returned_value = false; + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + while (notexit && run_) { std::this_thread::sleep_for(std::chrono::milliseconds(250)); @@ -190,6 +208,9 @@ bool SubscriberModule::run_for( returned_value = false; } + run_ = false; + net_rescan_thread.join(); + return returned_value; } diff --git a/test/dds/communication/SubscriberModule.hpp b/test/dds/communication/SubscriberModule.hpp index e8559128440..5a6b0aad5d1 100644 --- a/test/dds/communication/SubscriberModule.hpp +++ b/test/dds/communication/SubscriberModule.hpp @@ -19,6 +19,12 @@ #ifndef TEST_COMMUNICATION_SUBSCRIBER_HPP #define TEST_COMMUNICATION_SUBSCRIBER_HPP +#include +#include +#include +#include +#include + #include #include #include @@ -27,11 +33,6 @@ #include "types/FixedSizedPubSubTypes.h" #include "types/HelloWorldPubSubTypes.h" -#include -#include -#include -#include - namespace eprosima { namespace fastdds { namespace dds { @@ -44,9 +45,16 @@ class SubscriberModule SubscriberModule( const uint32_t publishers, const uint32_t max_number_samples, +<<<<<<< HEAD bool fixed_type = false, bool zero_copy = false, bool succeed_on_timeout = false) +======= + bool fixed_type, + bool zero_copy, + bool succeed_on_timeout, + bool die_on_data_received) +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)) : publishers_(publishers) , max_number_samples_(max_number_samples) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required @@ -84,10 +92,12 @@ class SubscriberModule bool run( bool notexit, - uint32_t timeout = 86400000); + const uint32_t rescan_interval, + uint32_t timeout); bool run_for( bool notexit, + const uint32_t rescan_interval, const std::chrono::milliseconds& timeout); private: @@ -101,8 +111,13 @@ class SubscriberModule std::map number_samples_; bool fixed_type_ = false; bool zero_copy_ = false; +<<<<<<< HEAD bool run_ = true; bool succeeed_on_timeout_ = false; +======= + std::atomic_bool run_{true}; + bool succeed_on_timeout_ = false; +>>>>>>> 66ec998a8 (Fix issues in Dynamic Network Interfaces (#5282) (#5304)) DomainParticipant* participant_ = nullptr; TypeSupport type_; Subscriber* subscriber_ = nullptr; diff --git a/test/dds/communication/dyn_network/CMakeLists.txt b/test/dds/communication/dyn_network/CMakeLists.txt new file mode 100644 index 00000000000..0d95ac665e0 --- /dev/null +++ b/test/dds/communication/dyn_network/CMakeLists.txt @@ -0,0 +1,61 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +message(STATUS "Configuring dynamic network interfaces tests") + +# Find docker +find_program(DOCKER_EXECUTABLE docker) +if(NOT DOCKER_EXECUTABLE) + message(FATAL_ERROR "Docker not found") +endif() + +set(SHELL_EXECUTABLE "") +set(TINYXML2_LIB_DIR_COMPOSE_VOLUME "") +set(TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH "") + +# Linux configurations +if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID)) + # Find bash + find_program(BASH_EXECUTABLE bash) + if(NOT BASH_EXECUTABLE) + message(FATAL_ERROR "bash not found") + endif() + + set(SHELL_EXECUTABLE ${BASH_EXECUTABLE}) + +# Windows configurations +elseif(WIN32) + # We don't know which docker image to use for Windows yet + message(FATAL_ERROR "Windows not supported yet") + +# Unsupported platform +else() + message(FATAL_ERROR "Unsupported platform") +endif() + +# Configure TinyXML2 library path if installed in user library path +if(NOT (TINYXML2_FROM_SOURCE OR TINYXML2_FROM_THIRDPARTY)) + get_filename_component(TINYXML2_LIB_DIR ${TINYXML2_LIBRARY} DIRECTORY) + set(TINYXML2_LIB_DIR_COMPOSE_VOLUME "- ${TINYXML2_LIB_DIR}:${CMAKE_INSTALL_PREFIX}/${DATA_INSTALL_DIR}/fastdds:ro") + set(TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH ":${CMAKE_INSTALL_PREFIX}/${DATA_INSTALL_DIR}/fastdds") +endif() + +configure_file(Dockerfile + ${CMAKE_CURRENT_BINARY_DIR}/Dockerfile @ONLY) +configure_file(dynamic_interfaces.compose.yml + ${CMAKE_CURRENT_BINARY_DIR}/dynamic_interfaces.compose.yml @ONLY) +configure_file(launch_subscriber.bash + ${CMAKE_CURRENT_BINARY_DIR}/launch_subscriber.bash @ONLY) +add_test(NAME dds.communication.dynamic_interfaces + COMMAND ${DOCKER_EXECUTABLE} compose -f ${CMAKE_CURRENT_BINARY_DIR}/dynamic_interfaces.compose.yml up + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/test/dds/communication/dyn_network/Dockerfile b/test/dds/communication/dyn_network/Dockerfile new file mode 100644 index 00000000000..ebdcae10bba --- /dev/null +++ b/test/dds/communication/dyn_network/Dockerfile @@ -0,0 +1,27 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tag, branch, or commit in github.com/eProsima/DDS-Suite +ARG ubuntu_version=22.04 +FROM ubuntu:$ubuntu_version AS ubuntu-net-tools + +# Needed for a dependency that forces to set timezone +ENV TZ=Europe/Madrid +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +# Avoids using interactions during building +ENV DEBIAN_FRONTEND=noninteractive + +# Install apt dependencies +RUN apt-get update && apt-get install --yes net-tools && rm -rf /var/lib/apt/lists/* diff --git a/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml b/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml new file mode 100644 index 00000000000..9d45abbbc48 --- /dev/null +++ b/test/dds/communication/dyn_network/dynamic_interfaces.compose.yml @@ -0,0 +1,42 @@ +# Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +version: "3" + +services: + publisher: + image: ubuntu:22.04 + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/test/dds/communication + command: @SHELL_EXECUTABLE@ -c "$${EXAMPLE_DIR}/DDSCommunicationPublisher --xmlfile $${EXAMPLE_DIR}/simple_reliable_profile.xml --wait 1 --samples 10 --loops 1 --seed 0 --magic T" + + subscriber: + build: . + image: ubuntu-net-tools:22.04 + privileged: true + volumes: + - @PROJECT_BINARY_DIR@:@PROJECT_BINARY_DIR@ + - @fastcdr_LIB_DIR@:@fastcdr_LIB_DIR@ + @TINYXML2_LIB_DIR_COMPOSE_VOLUME@ + environment: + LD_LIBRARY_PATH: @PROJECT_BINARY_DIR@/src/cpp:@fastcdr_LIB_DIR@@TINYXML2_LIB_DIR_COMPOSE_LD_LIBRARY_PATH@ + EXAMPLE_DIR: @PROJECT_BINARY_DIR@/test/dds/communication + working_dir: @PROJECT_BINARY_DIR@/test/dds/communication + command: @SHELL_EXECUTABLE@ "dyn_network/launch_subscriber.bash" + depends_on: + - publisher diff --git a/test/dds/communication/dyn_network/launch_subscriber.bash b/test/dds/communication/dyn_network/launch_subscriber.bash new file mode 100755 index 00000000000..9ee62e1338b --- /dev/null +++ b/test/dds/communication/dyn_network/launch_subscriber.bash @@ -0,0 +1,33 @@ +# Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +# Note: This script is intended to be used in a privileged container, since it requires to bring down and up the eth0 interface. + +echo "Putting down eth0 interface..." +ifconfig eth0 down + +echo "Launching subscriber..." +${EXAMPLE_DIR}/DDSCommunicationSubscriber --xmlfile ${EXAMPLE_DIR}/simple_reliable_profile.xml --samples 10 --seed 0 --magic T --rescan 2 & +subs_pid=$! +echo "Subscriber launched." + +echo "Waiting 2 seconds and bring up eth0 interface..." +sleep 2s +ifconfig eth0 up +echo "eth0 interface is up." + +echo "Waiting 3s for the subscriber (process id $subs_pid) to finish..." +wait $subs_pid diff --git a/test/dds/communication/security/PublisherModule.cpp b/test/dds/communication/security/PublisherModule.cpp new file mode 100644 index 00000000000..809822164f9 --- /dev/null +++ b/test/dds/communication/security/PublisherModule.cpp @@ -0,0 +1,286 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PublisherModule.cpp + */ + +#include "PublisherModule.hpp" + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastrtps::rtps; + +PublisherModule::~PublisherModule() +{ + if (nullptr != writer_) + { + publisher_->delete_datawriter(writer_); + } + + if (nullptr != publisher_) + { + participant_->delete_publisher(publisher_); + } + + if (nullptr != topic_) + { + participant_->delete_topic(topic_); + } + + if (nullptr != participant_) + { + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +bool PublisherModule::init( + uint32_t seed, + const std::string& magic) +{ + std::cout << "Initializing Publisher" << std::endl; + + participant_ = + DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this); + + if (participant_ == nullptr) + { + std::cout << "Error creating publisher participant" << std::endl; + return false; + } + + // Construct a FixedSizedType if fixed type is required, defult HelloWro + if (fixed_type_) + { + type_.reset(new FixedSizedPubSubType()); + } + else + { + type_.reset(new HelloWorldPubSubType()); + } + + type_.register_type(participant_); + + // Generate topic name + std::ostringstream topic_name; + topic_name << "HelloWorldTopic_" << ((magic.empty()) ? asio::ip::host_name() : magic) << "_" << seed; + + //CREATE THE PUBLISHER + publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, this); + if (publisher_ == nullptr) + { + std::cout << "Error creating publisher" << std::endl; + return false; + } + + topic_ = participant_->create_topic(topic_name.str(), type_.get_type_name(), TOPIC_QOS_DEFAULT); + if (topic_ == nullptr) + { + std::cout << "Error creating publisher topic" << std::endl; + return false; + } + + DataWriterQos wqos = publisher_->get_default_datawriter_qos(); + wqos.liveliness().lease_duration = 3; + wqos.liveliness().announcement_period = 1; + wqos.liveliness().kind = AUTOMATIC_LIVELINESS_QOS; + + writer_ = publisher_->create_datawriter(topic_, wqos, this); + if (writer_ == nullptr) + { + std::cout << "Error creating publisher datawriter" << std::endl; + return false; + } + std::cout << "Writer created correctly in topic " << topic_->get_name() + << " with type " << type_.get_type_name() << std::endl; + + std::cout << "Publisher initialized correctly" << std::endl; + + return true; +} + +void PublisherModule::wait_discovery( + uint32_t how_many) +{ + std::unique_lock lock(mutex_); + cv_.wait(lock, [&] + { + return matched_ >= how_many; + }); +} + +void PublisherModule::run( + uint32_t samples, + const uint32_t rescan_interval, + const uint32_t loops, + uint32_t interval) +{ + uint32_t current_loop = 0; + uint16_t index = 1; + void* sample = nullptr; + + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + + while (run_ && (loops == 0 || loops > current_loop)) + { + if (zero_copy_) + { + if (ReturnCode_t::RETCODE_OK == writer_->loan_sample(sample)) + { + FixedSized* data = static_cast(sample); + data->index(index); + } + } + else + { + sample = type_.create_data(); + if (fixed_type_) + { + FixedSized* data = static_cast(sample); + data->index(index); + // FixedSized has no message + } + else + { + HelloWorld* data = static_cast(sample); + data->index(index); + data->message("HelloWorld"); + } + } + std::cout << "Publisher writting index " << index << std::endl; + writer_->write(sample); + + if (index == samples) + { + index = 1; + ++current_loop; + } + else + { + ++index; + } + + if (!zero_copy_) + { + type_.delete_data(sample); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + + run_ = false; + net_rescan_thread.join(); +} + +void PublisherModule::on_publication_matched( + DataWriter* /*publisher*/, + const PublicationMatchedStatus& info) +{ + std::unique_lock lock(mutex_); + if (info.current_count_change == 1) + { + std::cout << "Publisher matched with subscriber " << info.last_subscription_handle + << ": " << ++matched_ << std::endl; + } + else if (info.current_count_change == -1) + { + std::cout << "Publisher unmatched with subscriber " << info.last_subscription_handle + << ": " << --matched_ << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for PublicationMatchedStatus current count change" << std::endl; + } + cv_.notify_all(); +} + +void PublisherModule::on_participant_discovery( + DomainParticipant* /*participant*/, + ParticipantDiscoveryInfo&& info) +{ + if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) + { + std::cout << "Publisher participant " << //participant->getGuid() << + " discovered participant " << info.info.m_guid << std::endl; + } + else if (info.status == ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT) + { + std::cout << "Publisher participant " << //participant->getGuid() << + " detected changes on participant " << info.info.m_guid << std::endl; + } + else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT) + { + std::cout << "Publisher participant " << // participant->getGuid() << + " removed participant " << info.info.m_guid << std::endl; + if (exit_on_disposal_received_) + { + run_ = false; + } + } + else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) + { + std::cout << "Publisher participant " << //participant->getGuid() << + " dropped participant " << info.info.m_guid << std::endl; + if (exit_on_lost_liveliness_) + { + run_ = false; + } + } +} + +#if HAVE_SECURITY +void PublisherModule::onParticipantAuthentication( + DomainParticipant* participant, + ParticipantAuthenticationInfo&& info) +{ + if (ParticipantAuthenticationInfo::AUTHORIZED_PARTICIPANT == info.status) + { + std::cout << "Publisher participant " << participant->guid() << + " authorized participant " << info.guid << std::endl; + } + else + { + std::cout << "Publisher participant " << participant->guid() << + " unauthorized participant " << info.guid << std::endl; + } +} + +#endif // if HAVE_SECURITY diff --git a/test/dds/communication/security/PublisherModule.hpp b/test/dds/communication/security/PublisherModule.hpp new file mode 100644 index 00000000000..3b91aec0087 --- /dev/null +++ b/test/dds/communication/security/PublisherModule.hpp @@ -0,0 +1,113 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file Publisher.hpp + */ + +#ifndef TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP +#define TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP + +#include +#include +#include + +#include +#include +#include +#include + +#include "types/FixedSizedPubSubTypes.h" +#include "types/HelloWorldPubSubTypes.h" + +namespace eprosima { +namespace fastdds { +namespace dds { + +class PublisherModule + : public DomainParticipantListener +{ +public: + + PublisherModule( + bool exit_on_lost_liveliness, + bool exit_on_disposal_received, + bool fixed_type, + bool zero_copy) + : exit_on_lost_liveliness_(exit_on_lost_liveliness) + , exit_on_disposal_received_(exit_on_disposal_received) + , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required + , zero_copy_(zero_copy) + { + } + + ~PublisherModule(); + + void on_publication_matched( + DataWriter* /*publisher*/, + const PublicationMatchedStatus& info) override; + + /** + * This method is called when a new Participant is discovered, or a previously discovered participant + * changes its QOS or is removed. + * @param p Pointer to the Participant + * @param info DiscoveryInfo. + */ + void on_participant_discovery( + DomainParticipant* /*participant*/, + fastrtps::rtps::ParticipantDiscoveryInfo&& info) override; + +#if HAVE_SECURITY + void onParticipantAuthentication( + DomainParticipant* participant, + fastrtps::rtps::ParticipantAuthenticationInfo&& info) override; +#endif // if HAVE_SECURITY + + bool init( + uint32_t seed, + const std::string& magic); + + void wait_discovery( + uint32_t how_many); + + void run( + uint32_t samples, + const uint32_t rescan_interval, + uint32_t loops, + uint32_t interval); + +private: + + using DomainParticipantListener::on_participant_discovery; + + std::mutex mutex_; + std::condition_variable cv_; + unsigned int matched_ = 0; + bool exit_on_lost_liveliness_ = false; + bool exit_on_disposal_received_ = false; + bool fixed_type_ = false; + bool zero_copy_ = false; + std::atomic_bool run_ {true}; + DomainParticipant* participant_ = nullptr; + TypeSupport type_; + Publisher* publisher_ = nullptr; + DataWriter* writer_ = nullptr; + Topic* topic_ = nullptr; +}; + +} // dds +} // fastdds +} // eprosima + +#endif // TEST_DDS_COMMUNICATION_PUBLISHERMODULE_HPP diff --git a/test/dds/communication/security/SubscriberModule.cpp b/test/dds/communication/security/SubscriberModule.cpp new file mode 100644 index 00000000000..102231a5566 --- /dev/null +++ b/test/dds/communication/security/SubscriberModule.cpp @@ -0,0 +1,364 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SubscriberModule.cpp + * + */ + +#include "SubscriberModule.hpp" + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +using namespace eprosima::fastdds::dds; +using namespace eprosima::fastrtps::rtps; + +SubscriberModule::~SubscriberModule() +{ + if (nullptr != reader_) + { + subscriber_->delete_datareader(reader_); + } + + if (nullptr != subscriber_) + { + participant_->delete_subscriber(subscriber_); + } + + if (nullptr != topic_) + { + participant_->delete_topic(topic_); + } + + if (nullptr != participant_) + { + DomainParticipantFactory::get_instance()->delete_participant(participant_); + } +} + +bool SubscriberModule::init( + uint32_t seed, + const std::string& magic) +{ + std::cout << "Initializing Subscriber" << std::endl; + + StatusMask mask = StatusMask::subscription_matched() + << StatusMask::data_available() + << StatusMask::liveliness_changed(); + + participant_ = + DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this, + mask); + + if (participant_ == nullptr) + { + std::cout << "Error creating subscriber participant" << std::endl; + return false; + } + + // Construct a FixedSizedType if fixed type is required, defult HelloWro + if (fixed_type_) + { + type_.reset(new FixedSizedPubSubType()); + } + else + { + type_.reset(new HelloWorldPubSubType()); + } + type_.register_type(participant_); + + // Generate topic name + std::ostringstream topic_name; + topic_name << "HelloWorldTopic_" << ((magic.empty()) ? asio::ip::host_name() : magic) << "_" << seed; + + //CREATE THE SUBSCRIBER + subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); + if (subscriber_ == nullptr) + { + std::cout << "Error creating subscriber" << std::endl; + return false; + } + + //CREATE THE TOPIC + topic_ = participant_->create_topic(topic_name.str(), type_.get_type_name(), TOPIC_QOS_DEFAULT); + if (topic_ == nullptr) + { + std::cout << "Error creating subscriber topic" << std::endl; + return false; + } + + //CREATE THE DATAREADER + DataReaderQos rqos = subscriber_->get_default_datareader_qos(); + rqos.liveliness().lease_duration = 3; + rqos.liveliness().announcement_period = 1; + rqos.liveliness().kind = AUTOMATIC_LIVELINESS_QOS; + + reader_ = subscriber_->create_datareader(topic_, rqos); + if (reader_ == nullptr) + { + std::cout << "Error creating subscriber datareader" << std::endl; + return false; + } + std::cout << "Reader created correctly in topic " << topic_->get_name() + << " with type " << type_.get_type_name() << std::endl; + + std::cout << "Subscriber initialized correctly" << std::endl; + + return true; +} + +bool SubscriberModule::run( + bool notexit, + const uint32_t rescan_interval) +{ + return run_for(notexit, rescan_interval, std::chrono::hours(24)); +} + +bool SubscriberModule::run_for( + bool notexit, + const uint32_t rescan_interval, + const std::chrono::milliseconds& timeout) +{ + bool returned_value = false; + + std::thread net_rescan_thread([this, rescan_interval]() + { + if (rescan_interval > 0) + { + auto interval = std::chrono::seconds(rescan_interval); + while (run_) + { + std::this_thread::sleep_for(interval); + if (run_) + { + participant_->set_qos(participant_->get_qos()); + } + } + } + }); + + while (notexit && run_) + { + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + } + + if (run_) + { + std::unique_lock lock(mutex_); + returned_value = cv_.wait_for(lock, timeout, [&] + { + if (publishers_ < number_samples_.size()) + { + // Will fail later. + return true; + } + else if (publishers_ > number_samples_.size()) + { + return false; + } + + for (auto& number_samples : number_samples_) + { + if (max_number_samples_ > number_samples.second) + { + return false; + } + } + + return true; + }); + } + else + { + returned_value = true; + } + + + if (publishers_ < number_samples_.size()) + { + std::cout << "ERROR: detected more than " << publishers_ << " publishers" << std::endl; + returned_value = false; + } + + run_ = false; + net_rescan_thread.join(); + + return returned_value; +} + +void SubscriberModule::on_participant_discovery( + DomainParticipant* /*participant*/, + ParticipantDiscoveryInfo&& info) +{ + if (info.status == ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT) + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " discovered participant " << info.info.m_guid << std::endl; + } + else if (info.status == ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT) + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " detected changes on participant " << info.info.m_guid << std::endl; + } + else if (info.status == ParticipantDiscoveryInfo::REMOVED_PARTICIPANT) + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " removed participant " << info.info.m_guid << std::endl; + } + else if (info.status == ParticipantDiscoveryInfo::DROPPED_PARTICIPANT) + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " dropped participant " << info.info.m_guid << std::endl; + } +} + +#if HAVE_SECURITY +void SubscriberModule::onParticipantAuthentication( + DomainParticipant* /*participant*/, + ParticipantAuthenticationInfo&& info) +{ + if (ParticipantAuthenticationInfo::AUTHORIZED_PARTICIPANT == info.status) + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " authorized participant " << info.guid << std::endl; + } + else + { + std::cout << "Subscriber participant " << //participant->getGuid() << + " unauthorized participant " << info.guid << std::endl; + } +} + +#endif // if HAVE_SECURITY + +void SubscriberModule::on_subscription_matched( + DataReader* /*reader*/, + const SubscriptionMatchedStatus& info) +{ + if (info.current_count_change == 1) + { + std::cout << "Subscriber matched with publisher " << info.last_publication_handle << std::endl; + } + else if (info.current_count_change == -1) + { + std::cout << "Subscriber unmatched with publisher " << info.last_publication_handle << std::endl; + } + else + { + std::cout << info.current_count_change + << " is not a valid value for SubscriptionMatchedStatus current count change" << std::endl; + } +} + +void SubscriberModule::on_data_available( + DataReader* reader) +{ + if (die_on_data_received_) + { + std::abort(); + } + std::cout << "Subscriber on_data_available from :" << participant_->guid() << std::endl; + + if (zero_copy_) + { + LoanableSequence l_sample; + LoanableSequence l_info; + + if (ReturnCode_t::RETCODE_OK == reader->take_next_instance(l_sample, l_info)) + { + SampleInfo info = l_info[0]; + + if (info.valid_data && info.instance_state == ALIVE_INSTANCE_STATE) + { + FixedSized& data = l_sample[0]; + + std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << data.index() << ")" << std::endl; + + + if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) + { + cv_.notify_all(); + } + } + } + reader->return_loan(l_sample, l_info); + } + else + { + SampleInfo info; + + if (fixed_type_) + { + FixedSized sample; + if (reader->take_next_sample((void*)&sample, &info) == ReturnCode_t::RETCODE_OK) + { + if (info.instance_state == ALIVE_INSTANCE_STATE) + { + std::unique_lock lock(mutex_); + std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << sample.index() << ")" << std::endl; + if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) + { + cv_.notify_all(); + } + } + } + } + else + { + HelloWorld sample; + if (reader->take_next_sample((void*)&sample, &info) == ReturnCode_t::RETCODE_OK) + { + if (info.instance_state == ALIVE_INSTANCE_STATE) + { + std::unique_lock lock(mutex_); + std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << sample.index() << "), message(" + << sample.message() << ")" << std::endl; + if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) + { + cv_.notify_all(); + } + } + } + } + } +} + +void SubscriberModule::on_liveliness_changed( + DataReader* /*reader*/, + const LivelinessChangedStatus& status) +{ + if (status.alive_count_change == 1) + { + std::cout << "Subscriber recovered liveliness" << std::endl; + } + else if (status.not_alive_count_change == 1) + { + std::cout << "Subscriber lost liveliness" << std::endl; + run_ = false; + } +} diff --git a/test/dds/communication/security/SubscriberModule.hpp b/test/dds/communication/security/SubscriberModule.hpp new file mode 100644 index 00000000000..dd73934f990 --- /dev/null +++ b/test/dds/communication/security/SubscriberModule.hpp @@ -0,0 +1,119 @@ +// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file SubscriberModule.hpp + * + */ +#ifndef TEST_COMMUNICATION_SUBSCRIBER_HPP +#define TEST_COMMUNICATION_SUBSCRIBER_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "types/FixedSizedPubSubTypes.h" +#include "types/HelloWorldPubSubTypes.h" + +namespace eprosima { +namespace fastdds { +namespace dds { + +class SubscriberModule + : public DomainParticipantListener +{ +public: + + SubscriberModule( + const uint32_t publishers, + const uint32_t max_number_samples, + bool fixed_type, + bool zero_copy, + bool die_on_data_received) + : publishers_(publishers) + , max_number_samples_(max_number_samples) + , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required + , zero_copy_(zero_copy) + , die_on_data_received_(die_on_data_received) + { + } + + ~SubscriberModule(); + + void on_participant_discovery( + DomainParticipant* /*participant*/, + fastrtps::rtps::ParticipantDiscoveryInfo&& info) override; + +#if HAVE_SECURITY + void onParticipantAuthentication( + DomainParticipant* /*participant*/, + fastrtps::rtps::ParticipantAuthenticationInfo&& info) override; +#endif // if HAVE_SECURITY + + void on_subscription_matched( + DataReader* /*reader*/, + const SubscriptionMatchedStatus& info) override; + + void on_data_available( + DataReader* reader) override; + + void on_liveliness_changed( + DataReader* /*reader*/, + const eprosima::fastdds::dds::LivelinessChangedStatus& status) override; + + bool init( + uint32_t seed, + const std::string& magic); + + bool run( + bool notexit, + const uint32_t rescan_interval); + + bool run_for( + bool notexit, + const uint32_t rescan_interval, + const std::chrono::milliseconds& timeout); + +private: + + using DomainParticipantListener::on_participant_discovery; + + std::mutex mutex_; + std::condition_variable cv_; + const uint32_t publishers_ = 0; + const uint32_t max_number_samples_ = 0; + std::map number_samples_; + bool fixed_type_ = false; + bool zero_copy_ = false; + std::atomic_bool run_{true}; + DomainParticipant* participant_ = nullptr; + TypeSupport type_; + Subscriber* subscriber_ = nullptr; + DataReader* reader_ = nullptr; + Topic* topic_ = nullptr; + bool die_on_data_received_ = false; +}; + +} // dds +} // fastdds +} // eprosima + +#endif // TEST_COMMUNICATION_SUBSCRIBER_HPP