From 0bdb5e758e3ee002f184b60d795598efcfd5c3c8 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 14:53:16 +0800 Subject: [PATCH 1/9] add ip parser --- .../fleet_executor/fleet_executor.cc | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 842695fdea60b2..a5400b786d3693 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -31,6 +31,29 @@ FleetExecutor::~FleetExecutor() { void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { // Compile and Initialize + std::stringstream ss; + ss << "The DNS table of the message bus is: \n"; + int64_t cur_rank = exe_desc_.cur_rank(); + std::unordered_map interceptor_id_to_rank; + std::unordered_map rank_to_addr; + std::string addr; + for (const auto& rank_info : exe_desc_.cluster_info()) { + int64_t rank = rank_info.rank(); + std::string ip_port = rank_info.ip_port(); + ss << rank << ":\t" << ip_port << "\n"; + interceptor_id_to_rank.insert(std::make_pair(rank, rank)); + rank_to_addr.insert(std::make_pair(rank, ip_port)); + if (rank == cur_rank) { + addr = ip_port; + } + } + PADDLE_ENFORCE_NE( + addr, "", platform::errors::NotFound( + "Current rank's ip_port cannot be found in the config.")); + VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr + << "."; + VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << "."; + VLOG(5) << ss.str(); } void FleetExecutor::Run() { From 7de2cea8deb0de65253e7b3b8b86778c4d0b15d8 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 15:16:20 +0800 Subject: [PATCH 2/9] adapt singlton --- paddle/fluid/distributed/fleet_executor/CMakeLists.txt | 1 + paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 3 +++ 2 files changed, 4 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt index 9d6b30b06334a9..4114cb08119dba 100644 --- a/paddle/fluid/distributed/fleet_executor/CMakeLists.txt +++ b/paddle/fluid/distributed/fleet_executor/CMakeLists.txt @@ -19,6 +19,7 @@ if(WITH_DISTRIBUTE) set_source_files_properties(interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(message_bus.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(message_bus.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + set_source_files_properties(fleet_executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(carrier.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(interceptor_message_service.h PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) set_source_files_properties(interceptor_message_service.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index a5400b786d3693..bc069de2f5047e 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" +#include "paddle/fluid/distributed/fleet_executor/message_bus.h" #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/framework/program_desc.h" @@ -54,6 +55,8 @@ void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { << "."; VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << "."; VLOG(5) << ss.str(); + MessageBus& message_bus_instance = MessageBus::Instance(); + message_bus_instance.Init(interceptor_id_to_rank, rank_to_addr, addr); } void FleetExecutor::Run() { From 02ef4fee1e3b3ff7e1db5ae973e277d70f6e6e9f Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 15:17:16 +0800 Subject: [PATCH 3/9] update vlog --- paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index bc069de2f5047e..2723822df6c867 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -49,8 +49,10 @@ void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { } } PADDLE_ENFORCE_NE( - addr, "", platform::errors::NotFound( - "Current rank's ip_port cannot be found in the config.")); + addr, "", + platform::errors::NotFound( + "Current rank is %s, which ip_port cannot be found in the config.", + cur_rank)); VLOG(3) << "Current rank is " << cur_rank << " and the ip_port is " << addr << "."; VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << "."; From 2cbf78bb5fefe03b2ed7ac2b41e099f3567ba299 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 15:23:52 +0800 Subject: [PATCH 4/9] modify structure --- paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 4 ++++ paddle/fluid/distributed/fleet_executor/fleet_executor.h | 1 + 2 files changed, 5 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 2723822df6c867..e65dc1b02de9a5 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -32,6 +32,10 @@ FleetExecutor::~FleetExecutor() { void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { // Compile and Initialize + InitMessageBus(); +} + +void FleetExecutor::InitMessageBus() { std::stringstream ss; ss << "The DNS table of the message bus is: \n"; int64_t cur_rank = exe_desc_.cur_rank(); diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.h b/paddle/fluid/distributed/fleet_executor/fleet_executor.h index e12629844933a7..242e1a74fc489d 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.h +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.h @@ -42,6 +42,7 @@ class FleetExecutor final { DISABLE_COPY_AND_ASSIGN(FleetExecutor); FleetExecutorDesc exe_desc_; std::unique_ptr runtime_graph_; + void InitMessageBus(); static std::shared_ptr global_carrier_; }; From b99bcf818e1b1ee3fd6b3fe6cb8267c5a7018e2c Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 15:51:15 +0800 Subject: [PATCH 5/9] add IsInit interface for message bus --- paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 8 +++++--- paddle/fluid/distributed/fleet_executor/message_bus.cc | 2 ++ paddle/fluid/distributed/fleet_executor/message_bus.h | 2 ++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index e65dc1b02de9a5..4b23435c24f8a6 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -37,7 +37,7 @@ void FleetExecutor::Init(const paddle::framework::ProgramDesc& program_desc) { void FleetExecutor::InitMessageBus() { std::stringstream ss; - ss << "The DNS table of the message bus is: \n"; + ss << "\nThe DNS table of the message bus is: \n"; int64_t cur_rank = exe_desc_.cur_rank(); std::unordered_map interceptor_id_to_rank; std::unordered_map rank_to_addr; @@ -45,7 +45,7 @@ void FleetExecutor::InitMessageBus() { for (const auto& rank_info : exe_desc_.cluster_info()) { int64_t rank = rank_info.rank(); std::string ip_port = rank_info.ip_port(); - ss << rank << ":\t" << ip_port << "\n"; + ss << rank << "\t->\t" << ip_port << "\n"; interceptor_id_to_rank.insert(std::make_pair(rank, rank)); rank_to_addr.insert(std::make_pair(rank, ip_port)); if (rank == cur_rank) { @@ -62,7 +62,9 @@ void FleetExecutor::InitMessageBus() { VLOG(3) << "The number of ranks are " << interceptor_id_to_rank.size() << "."; VLOG(5) << ss.str(); MessageBus& message_bus_instance = MessageBus::Instance(); - message_bus_instance.Init(interceptor_id_to_rank, rank_to_addr, addr); + if (!message_bus_instance.IsInit()) { + message_bus_instance.Init(interceptor_id_to_rank, rank_to_addr, addr); + } } void FleetExecutor::Run() { diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 08cd100f108fd8..a09d98960e1bbe 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -42,6 +42,8 @@ void MessageBus::Init( }); } +bool MessageBus::IsInit() { return is_init_; } + void MessageBus::Release() { #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index 08e8d2e24abd83..e3cdf6be082541 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -48,6 +48,8 @@ class MessageBus final { const std::unordered_map& rank_to_addr, const std::string& addr); + bool IsInit(); + void Release(); // called by Interceptor, send InterceptorMessage to dst From e50b9bf1a4b40fc338eb474d8d3ee5c88f417127 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 15:58:21 +0800 Subject: [PATCH 6/9] some update --- paddle/fluid/distributed/fleet_executor/message_bus.cc | 3 ++- paddle/fluid/distributed/fleet_executor/message_bus.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index a09d98960e1bbe..044f549f7f6ea2 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -42,9 +42,10 @@ void MessageBus::Init( }); } -bool MessageBus::IsInit() { return is_init_; } +bool MessageBus::IsInit() const { return is_init_; } void MessageBus::Release() { + VLOG(3) << "Message bus releasing resource."; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) server_.Stop(1000); diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.h b/paddle/fluid/distributed/fleet_executor/message_bus.h index e3cdf6be082541..e45f2e3c712595 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.h +++ b/paddle/fluid/distributed/fleet_executor/message_bus.h @@ -48,7 +48,7 @@ class MessageBus final { const std::unordered_map& rank_to_addr, const std::string& addr); - bool IsInit(); + bool IsInit() const; void Release(); From ffb102853678687020ce6dd4c8b098a68bfbfd53 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 16:00:43 +0800 Subject: [PATCH 7/9] correct typo --- paddle/fluid/distributed/fleet_executor/message_bus.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/message_bus.cc b/paddle/fluid/distributed/fleet_executor/message_bus.cc index 044f549f7f6ea2..75e7b2fcb3dc5b 100644 --- a/paddle/fluid/distributed/fleet_executor/message_bus.cc +++ b/paddle/fluid/distributed/fleet_executor/message_bus.cc @@ -45,7 +45,7 @@ void MessageBus::Init( bool MessageBus::IsInit() const { return is_init_; } void MessageBus::Release() { - VLOG(3) << "Message bus releasing resource."; + VLOG(3) << "Message bus releases resource."; #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) && \ !defined(PADDLE_WITH_ASCEND_CL) server_.Stop(1000); From 4da3d101e846faaf7dc901c4d3730c4fbab1b8d8 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 16:04:17 +0800 Subject: [PATCH 8/9] add todo --- paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index 4b23435c24f8a6..b365c9f3efaa4d 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -46,6 +46,7 @@ void FleetExecutor::InitMessageBus() { int64_t rank = rank_info.rank(); std::string ip_port = rank_info.ip_port(); ss << rank << "\t->\t" << ip_port << "\n"; + // TODO(Yuang): replace the first rank with real interceptor id interceptor_id_to_rank.insert(std::make_pair(rank, rank)); rank_to_addr.insert(std::make_pair(rank, ip_port)); if (rank == cur_rank) { From b7715801f9fd3d740fefdc7c6ca77e59c72db174 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Thu, 11 Nov 2021 16:22:07 +0800 Subject: [PATCH 9/9] fix typo --- paddle/fluid/distributed/fleet_executor/fleet_executor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc index b365c9f3efaa4d..eed6d6ef7e47e1 100644 --- a/paddle/fluid/distributed/fleet_executor/fleet_executor.cc +++ b/paddle/fluid/distributed/fleet_executor/fleet_executor.cc @@ -46,7 +46,7 @@ void FleetExecutor::InitMessageBus() { int64_t rank = rank_info.rank(); std::string ip_port = rank_info.ip_port(); ss << rank << "\t->\t" << ip_port << "\n"; - // TODO(Yuang): replace the first rank with real interceptor id + // TODO(Yuang): replace the first 'rank' with real interceptor id interceptor_id_to_rank.insert(std::make_pair(rank, rank)); rank_to_addr.insert(std::make_pair(rank, ip_port)); if (rank == cur_rank) {