From f6a10e1b056068f5304273cee72c165245d17e33 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 16:31:36 +0800 Subject: [PATCH 01/23] run method for dist model --- .../distributed/fleet_executor/dist_model.cc | 160 +++++++++++++++++- .../distributed/fleet_executor/dist_model.h | 14 +- 2 files changed, 167 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 6454a34950513..0610daef14d19 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -18,6 +18,7 @@ #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/framework/block_desc.h" +#include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/naive_executor.h" #include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/program_desc.h" @@ -37,10 +38,67 @@ bool IsPersistable(const framework::VarDesc *var) { } return false; } + +bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, + framework::LoDTensor *input_tensor, + const platform::Place &place) { + VLOG(3) << "Loading data from DistModelTensor for " << input_data.name; + framework::DDim dims = framework::make_ddim(input_data.shape); + void *input_tensor_ptr; + if (input_data.dtype == DistModelDataType::INT64) { + input_tensor_ptr = input_tensor->mutable_data(dims, place); + } else if (input_data.dtype == DistModelDataType::FLOAT32) { + input_tensor_ptr = input_tensor->mutable_data(dims, place); + } else if (input_data.dtype == DistModelDataType::INT32) { + input_tensor_ptr = input_tensor->mutable_data(dims, place); + } else { + // Q(fleet exe dev): for input/output, should we support fp16 + LOG(ERROR) << "unsupported feed type " << input_data.dtype; + return false; + } + + PADDLE_ENFORCE_NOT_NULL( + input_tensor_ptr, + paddle::platform::errors::Fatal( + "LoDTensor creation failed. DistModel loaded data failed.")); + PADDLE_ENFORCE_NOT_NULL(input_data.data.data(), + paddle::platform::errors::InvalidArgument( + "DistModelTensor contains no data.")); + + if (platform::is_cpu_place(place)) { + std::memcpy(static_cast(input_tensor_ptr), input_data.data.data(), + input_data.data.length()); + } else if (platform::is_gpu_place(place)) { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto *dev_ctx = + dynamic_cast(pool.Get(place)); + auto gpu_place = place; + memory::Copy(gpu_place, static_cast(input_tensor_ptr), + platform::CPUPlace(), input_data.data.data(), + input_data.data.length(), dev_ctx->stream()); + } else { + PADDLE_THROW(paddle::platform::errors::InvalidArgument( + "DistModel only supports CPU and GPU.")); + } + + framework::LoD dst_lod; + for (auto &src_lod : input_data.lod) { + dst_lod.emplace_back(src_lod); + } + input_tensor->set_lod(dst_lod); + return true; +} + +template +int VectorReducer(const std::vector &vec) { + return std::accumulate(vec.begin(), vec.end(), 1, + [](T a, T b) { return a * b; }); +} + } // namespace bool DistModel::Init() { - /* TODO(fleet exe dev): implement this funct */ + carrier_id_ = "inference"; bool init_method = (!config_.model_dir.empty() || config_.program_desc); PADDLE_ENFORCE_EQ(init_method, true, platform::errors::InvalidArgument( @@ -326,7 +384,7 @@ bool DistModel::PrepareFleetExe() { id_to_rank.insert({i, i}); } fleet_exe.reset(new FleetExecutor(executor_desc_)); - fleet_exe->Init("inference", *(program_.get()), scope_.get(), place_, 1, + fleet_exe->Init(carrier_id_, *(program_.get()), scope_.get(), place_, 1, {task_node_.get()}, id_to_rank); return true; } @@ -349,15 +407,107 @@ bool DistModel::PrepareFeedAndFetch() { fetches_.resize(idx + 1); } fetches_[idx] = op; - id_to_fetches_[idx] = op->Input("X")[0]; + idx_to_fetches_[idx] = op->Input("X")[0]; } } return true; } -void DistModel::Run(const std::vector &input_data, +bool DistModel::FeedData(const std::vector &input_data, + framework::Scope *scope) { + VLOG(3) << "DistModel is feeding data."; + if (input_data.size() != feeds_.size()) { + LOG(ERROR) << "Should provide " << feeds_.size() << " feeds, but got " + << input_data.size() << "data."; + return false; + } + feed_tensors_.resize(feeds_.size()); + for (size_t i = 0; i < input_data.size(); ++i) { + // feed each data separately + framework::LoDTensor *input_tensor = &(feed_tensors_[i]); + if (!LoadDataFromDistModelTensor(input_data[i], input_tensor, place_)) { + LOG(ERROR) << "Fail to load data from tensor " << input_data[i].name; + return false; + } + std::string target_name = input_data[i].name; + if (feed_names_.find(target_name) == feed_names_.end()) { + LOG(ERROR) << "Wrong input name: " << target_name + << " DistModel load data failed."; + return false; + } + int feed_idx = feed_names_[target_name]; + framework::SetFeedVariable(scope, *input_tensor, "feed", feed_idx); + } + return true; +} + +bool DistModel::FetchResults(std::vector *output_data, + framework::Scope *scope) { + VLOG(3) << "DistModel is fetch results."; + output_data->resize(fetches_.size()); + for (size_t i = 0; i < fetches_.size(); ++i) { + int idx = BOOST_GET_CONST(int, fetches_[i]->GetAttr("col")); + VLOG(3) << "Fetching data for " << idx_to_fetches_[idx]; + PADDLE_ENFORCE_EQ( + static_cast(idx), i, + platform::errors::InvalidArgument( + "Fetch op's col attr(%d) should be equal to the index(%d)", idx, + i)); + framework::FetchType &fetch_var = + framework::GetFetchVariable(*scope, "fetch", idx); + auto &fetch = BOOST_GET(framework::LoDTensor, fetch_var); + auto type = fetch.type(); + auto output = &(output_data->at(i)); + output->name = idx_to_fetches_[idx]; + if (type == framework::proto::VarType::FP32) { + FetchResult(fetch, output); + output->dtype = DistModelDataType::FLOAT32; + } else if (type == framework::proto::VarType::INT64) { + FetchResult(fetch, output); + output->dtype = DistModelDataType::INT64; + } else if (type == framework::proto::VarType::INT32) { + FetchResult(fetch, output); + output->dtype = DistModelDataType::INT32; + } else { + LOG(ERROR) << "DistModel meets unknow fetch data type. DistModel only " + "supports float32, int64 and int32 fetch type for now."; + } + } + return true; +} + +template +bool DistModel::FetchResult(const framework::LoDTensor &fetch, + DistModelTensor *output_data) { + auto shape = framework::vectorize(fetch.dims()); + output_data->shape.assign(shape.begin(), shape.end()); + const T *data = fetch.data(); + int num_elems = VectorReducer(shape); + output_data->data.Resize(num_elems * sizeof(T)); + // The output of fetch op is always on the cpu, no need switch on place + memcpy(output_data->data.data(), data, num_elems * sizeof(T)); + output_data->lod.clear(); + for (auto &level : fetch.lod()) { + output_data->lod.emplace_back(level.begin(), level.end()); + } +} + +bool DistModel::Run(const std::vector &input_data, std::vector *output_data) { - /* TODO(fleet exe dev): implement this funct */ + if (!FeedData(input_data, scope_.get())) { + LOG(ERROE) << "DistModel failed at feeding data."; + return false; + } + VLOG(3) << "Finish loading data."; + + fleet_exe->Run(carrier_id_); + + if (!FetchResults(output_data, scope_.get())) { + LOG(ERROR) << "DistModel failed at fetching result."; + return false; + } + VLOG(3) << "Finish fetching data."; + return true; } } // namespace distributed diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.h b/paddle/fluid/distributed/fleet_executor/dist_model.h index 96e9c018074b5..e6ee1ea92e388 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.h +++ b/paddle/fluid/distributed/fleet_executor/dist_model.h @@ -19,6 +19,7 @@ #include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" +#include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/macros.h" #include "paddle/fluid/platform/place.h" @@ -57,7 +58,7 @@ class DistModel { public: explicit DistModel(const DistModelConfig& config) : config_(config) {} bool Init(); - void Run(const std::vector& input_data, + bool Run(const std::vector& input_data, std::vector* output_data); ~DistModel() = default; @@ -75,12 +76,21 @@ class DistModel { void InsertCommOp(std::string tmp_var_name, int nranks, int rank, const std::vector& peer_endpoints, framework::BlockDesc* block, int ring_id); + bool FeedData(const std::vector& input_data, + framework::Scope* scope); + bool FetchResults(std::vector* output_data, + framework::Scope* scope); + template + bool FetchResult(const framework::LoDTensor& fetch, + DistModelTensor* output_data); + std::string carrier_id_; + std::vector feed_tensors_; std::vector feeds_; std::map feed_names_; std::map idx_to_feeds_; std::vector fetches_; - std::map id_to_fetches_; + std::map idx_to_fetches_; DistModelConfig config_; FleetExecutorDesc executor_desc_; std::shared_ptr fleet_exe; From 1779f2363bafc0d5021853121df5b151d2e13531 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 16:45:58 +0800 Subject: [PATCH 02/23] bug fix --- .../distributed/fleet_executor/dist_model.cc | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 0610daef14d19..598554a7763f5 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -431,8 +431,9 @@ bool DistModel::FeedData(const std::vector &input_data, } std::string target_name = input_data[i].name; if (feed_names_.find(target_name) == feed_names_.end()) { - LOG(ERROR) << "Wrong input name: " << target_name - << " DistModel load data failed."; + LOG(ERROR) << "The input name: " << target_name + << " cannot be found in the program." + << " DistModel loads data failed."; return false; } int feed_idx = feed_names_[target_name]; @@ -459,19 +460,24 @@ bool DistModel::FetchResults(std::vector *output_data, auto type = fetch.type(); auto output = &(output_data->at(i)); output->name = idx_to_fetches_[idx]; + bool rst = false; if (type == framework::proto::VarType::FP32) { - FetchResult(fetch, output); + rst = FetchResult(fetch, output); output->dtype = DistModelDataType::FLOAT32; } else if (type == framework::proto::VarType::INT64) { - FetchResult(fetch, output); + rst = FetchResult(fetch, output); output->dtype = DistModelDataType::INT64; } else if (type == framework::proto::VarType::INT32) { - FetchResult(fetch, output); + rst = FetchResult(fetch, output); output->dtype = DistModelDataType::INT32; } else { - LOG(ERROR) << "DistModel meets unknow fetch data type. DistModel only " + LOG(ERROR) << "DistModel meets unknown fetch data type. DistModel only " "supports float32, int64 and int32 fetch type for now."; } + if (!rst) { + LOG(ERROR) << "DistModel fails to fetch result " << idx_to_fetches_[idx]; + return false; + } } return true; } @@ -490,6 +496,7 @@ bool DistModel::FetchResult(const framework::LoDTensor &fetch, for (auto &level : fetch.lod()) { output_data->lod.emplace_back(level.begin(), level.end()); } + return true; } bool DistModel::Run(const std::vector &input_data, From e9be30c072b9981969ae6c2d2e3a3ed6b1353ac3 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 17:09:05 +0800 Subject: [PATCH 03/23] bug fix --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 598554a7763f5..a45786f7a95cf 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -502,7 +502,7 @@ bool DistModel::FetchResult(const framework::LoDTensor &fetch, bool DistModel::Run(const std::vector &input_data, std::vector *output_data) { if (!FeedData(input_data, scope_.get())) { - LOG(ERROE) << "DistModel failed at feeding data."; + LOG(ERROR) << "DistModel failed at feeding data."; return false; } VLOG(3) << "Finish loading data."; From a510c7bb75f86157a22777d7e524f14d70ef21ff Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 17:50:11 +0800 Subject: [PATCH 04/23] fix the init value of memory hold --- .../distributed/fleet_executor/dist_model_tensor_wrapper.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h index 4a04633388af2..6bdd858d6cf9e 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h +++ b/paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h @@ -62,7 +62,7 @@ class DistModelDataBuf { void Free(); void* data_{nullptr}; size_t length_{0}; - bool memory_owned_{false}; + bool memory_owned_{true}; }; struct DistModelTensor { From d17c1957f0ed9c111271272e6f797be79d489925 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 17:54:23 +0800 Subject: [PATCH 05/23] formate refine --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index a45786f7a95cf..7b063d48f5d57 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -418,7 +418,7 @@ bool DistModel::FeedData(const std::vector &input_data, VLOG(3) << "DistModel is feeding data."; if (input_data.size() != feeds_.size()) { LOG(ERROR) << "Should provide " << feeds_.size() << " feeds, but got " - << input_data.size() << "data."; + << input_data.size() << " data."; return false; } feed_tensors_.resize(feeds_.size()); From 0ae9a26e45915e723435fbafebdaa09ae4001360 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Fri, 21 Jan 2022 18:01:05 +0800 Subject: [PATCH 06/23] bug fix for pp comm init --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 7b063d48f5d57..3a1662158717d 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -185,7 +185,7 @@ bool DistModel::CommInit() { InsertCommOp("mp_comm_id", mp_group_nranks, mp_group_rank, peer_endpoints, comm_init_block, config_.mp_ring_id); } - if (config_.pp_degree) { + if (config_.pp_degree > 1) { // NOTE: the last pp stage doesn't need init pp comm VLOG(3) << "Init comm group for pp."; if (config_.local_rank - config_.mp_degree >= 0) { From 2b36ea7e201773e3091a0446f802d0e837a3891c Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 11:17:36 +0800 Subject: [PATCH 07/23] bug fix --- .../distributed/fleet_executor/dist_model.cc | 17 +++++++++-------- paddle/fluid/pybind/bind_fleet_executor.cc | 7 ++++++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 3a1662158717d..dd270cd4f4126 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -88,13 +88,6 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, input_tensor->set_lod(dst_lod); return true; } - -template -int VectorReducer(const std::vector &vec) { - return std::accumulate(vec.begin(), vec.end(), 1, - [](T a, T b) { return a * b; }); -} - } // namespace bool DistModel::Init() { @@ -410,6 +403,14 @@ bool DistModel::PrepareFeedAndFetch() { idx_to_fetches_[idx] = op->Input("X")[0]; } } + if (feeds_.size() == 0) { + LOG(ERROR) << "No feed ops in the inf program, please check the program."; + return false; + } + if (fetches_.size() == 0) { + LOG(ERROR) << "No fetch ops in the inf program, please check the program."; + return false; + } return true; } @@ -488,7 +489,7 @@ bool DistModel::FetchResult(const framework::LoDTensor &fetch, auto shape = framework::vectorize(fetch.dims()); output_data->shape.assign(shape.begin(), shape.end()); const T *data = fetch.data(); - int num_elems = VectorReducer(shape); + int64_t num_elems = fetch.numel(); output_data->data.Resize(num_elems * sizeof(T)); // The output of fetch op is always on the cpu, no need switch on place memcpy(output_data->data.data(), data, num_elems * sizeof(T)); diff --git a/paddle/fluid/pybind/bind_fleet_executor.cc b/paddle/fluid/pybind/bind_fleet_executor.cc index 450939dd0ff8b..72ee451fe7c31 100644 --- a/paddle/fluid/pybind/bind_fleet_executor.cc +++ b/paddle/fluid/pybind/bind_fleet_executor.cc @@ -162,7 +162,12 @@ void BindFleetExecutor(py::module* m) { py::class_(*m, "DistModel") .def(py::init()) .def("init", &DistModel::Init) - .def("run", &DistModel::Run, py::call_guard()); + .def("run", + [](DistModel& self, const std::vector& inputs) { + std::vector outputs; + self.Run(inputs, &outputs); + return outputs; + }); py::class_(*m, "DistModelDataBuf") .def(py::init()) From 6513ca6413c59af77640a1e8873a1e1c7a7285f4 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 14:31:42 +0800 Subject: [PATCH 08/23] add feed var dtype check --- .../distributed/fleet_executor/dist_model.cc | 40 ++++++++++++++++++- .../distributed/fleet_executor/dist_model.h | 1 + 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index dd270cd4f4126..b00f826463a36 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -88,6 +88,22 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, input_tensor->set_lod(dst_lod); return true; } + +std::string DistModelDTypeToString(DistModelDataType dtype) { + switch (dtype) { + case DistModelDataType::FLOAT32: + return "float32"; + case DistModelDataType::FLOAT16: + return "float16"; + case DistModelDataType::INT64: + return "int64"; + case DistModelDataType::INT32: + return "int32"; + case DistModelDataType::INT8: + return "int8"; + } +} + } // namespace bool DistModel::Init() { @@ -391,8 +407,21 @@ bool DistModel::PrepareFeedAndFetch() { feeds_.resize(idx + 1); } feeds_[idx] = op; - feed_names_[op->Output("Out")[0]] = idx; - idx_to_feeds_[idx] = op->Output("Out")[0]; + std::string var_name = op->Output("Out")[0]; + feed_names_[var_name] = idx; + idx_to_feeds_[idx] = var_name; + framework::VarDesc *real_var = program_->Block(0).FindVar(var_name); + switch (real_var->GetType()) { + case proto::VarType::FP32: + feeds_to_dtype_.insert({var_name, DistModelDataType::FLOAT32}); + break; + case proto::VarType::INT32: + feeds_to_dtype_.insert({var_name, DistModelDataType::INT32}); + break; + case proto::VarType::INT64: + feeds_to_dtype_.insert({var_name, DistModelDataType::INT64}); + break; + } } else if (op->Type() == "fetch") { VLOG(3) << "fetch op with fetch var: " << op->Input("X")[0]; int idx = BOOST_GET_CONST(int, op->GetAttr("col")); @@ -437,6 +466,13 @@ bool DistModel::FeedData(const std::vector &input_data, << " DistModel loads data failed."; return false; } + if (input_data[i].dtype != feeds_to_dtype_[target_name]) { + LOG(ERROR) << "Feed var: " << target_name << "'s expected dtype is: " + << DistModelDTypeToString(feeds_to_dtype_[target_name]) + << ". But received dtype is: " + << DistModelDTypeToString(input_data[i].dtype) << "."; + return false; + } int feed_idx = feed_names_[target_name]; framework::SetFeedVariable(scope, *input_tensor, "feed", feed_idx); } diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.h b/paddle/fluid/distributed/fleet_executor/dist_model.h index e6ee1ea92e388..e6ad94e266a96 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.h +++ b/paddle/fluid/distributed/fleet_executor/dist_model.h @@ -89,6 +89,7 @@ class DistModel { std::vector feeds_; std::map feed_names_; std::map idx_to_feeds_; + std::map feeds_to_dtype_; std::vector fetches_; std::map idx_to_fetches_; DistModelConfig config_; From 9a14a81fdbb4a6085298c9f6fca33d490c4849fe Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 14:39:52 +0800 Subject: [PATCH 09/23] bug fix --- .../distributed/fleet_executor/dist_model.cc | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index b00f826463a36..c1a743d52c416 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -102,6 +102,7 @@ std::string DistModelDTypeToString(DistModelDataType dtype) { case DistModelDataType::INT8: return "int8"; } + return "NOT SUPPORT DTYPE"; } } // namespace @@ -411,16 +412,16 @@ bool DistModel::PrepareFeedAndFetch() { feed_names_[var_name] = idx; idx_to_feeds_[idx] = var_name; framework::VarDesc *real_var = program_->Block(0).FindVar(var_name); - switch (real_var->GetType()) { - case proto::VarType::FP32: - feeds_to_dtype_.insert({var_name, DistModelDataType::FLOAT32}); - break; - case proto::VarType::INT32: - feeds_to_dtype_.insert({var_name, DistModelDataType::INT32}); - break; - case proto::VarType::INT64: - feeds_to_dtype_.insert({var_name, DistModelDataType::INT64}); - break; + if (real_var->GetDataType() == proto::VarType::FP32) { + feeds_to_dtype_.insert({var_name, DistModelDataType::FLOAT32}); + } else if (real_var->GetDataType() == proto::VarType::INT32) { + feeds_to_dtype_.insert({var_name, DistModelDataType::INT32}); + } else if (real_var->GetDataType() == proto::VarType::INT64) { + feeds_to_dtype_.insert({var_name, DistModelDataType::INT64}); + } else { + LOG(ERROR) << "Don't support feed var dtype for: " + << real_var->GetDataType(); + return false; } } else if (op->Type() == "fetch") { VLOG(3) << "fetch op with fetch var: " << op->Input("X")[0]; From 6f65dbd1596a00e9d07ea03f7785faa020f00a27 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 14:50:45 +0800 Subject: [PATCH 10/23] bug fix and update log --- .../fluid/distributed/fleet_executor/dist_model.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index c1a743d52c416..9239c47bfee89 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -412,11 +412,11 @@ bool DistModel::PrepareFeedAndFetch() { feed_names_[var_name] = idx; idx_to_feeds_[idx] = var_name; framework::VarDesc *real_var = program_->Block(0).FindVar(var_name); - if (real_var->GetDataType() == proto::VarType::FP32) { + if (real_var->GetDataType() == framework::proto::VarType::FP32) { feeds_to_dtype_.insert({var_name, DistModelDataType::FLOAT32}); - } else if (real_var->GetDataType() == proto::VarType::INT32) { + } else if (real_var->GetDataType() == framework::proto::VarType::INT32) { feeds_to_dtype_.insert({var_name, DistModelDataType::INT32}); - } else if (real_var->GetDataType() == proto::VarType::INT64) { + } else if (real_var->GetDataType() == framework::proto::VarType::INT64) { feeds_to_dtype_.insert({var_name, DistModelDataType::INT64}); } else { LOG(ERROR) << "Don't support feed var dtype for: " @@ -462,13 +462,13 @@ bool DistModel::FeedData(const std::vector &input_data, } std::string target_name = input_data[i].name; if (feed_names_.find(target_name) == feed_names_.end()) { - LOG(ERROR) << "The input name: " << target_name - << " cannot be found in the program." + LOG(ERROR) << "The input name [" << target_name + << "] cannot be found in the program." << " DistModel loads data failed."; return false; } if (input_data[i].dtype != feeds_to_dtype_[target_name]) { - LOG(ERROR) << "Feed var: " << target_name << "'s expected dtype is: " + LOG(ERROR) << "Feed var [" << target_name << "] expected dtype is: " << DistModelDTypeToString(feeds_to_dtype_[target_name]) << ". But received dtype is: " << DistModelDTypeToString(input_data[i].dtype) << "."; @@ -486,7 +486,7 @@ bool DistModel::FetchResults(std::vector *output_data, output_data->resize(fetches_.size()); for (size_t i = 0; i < fetches_.size(); ++i) { int idx = BOOST_GET_CONST(int, fetches_[i]->GetAttr("col")); - VLOG(3) << "Fetching data for " << idx_to_fetches_[idx]; + VLOG(3) << "Fetching data for [" << idx_to_fetches_[idx] << "]"; PADDLE_ENFORCE_EQ( static_cast(idx), i, platform::errors::InvalidArgument( From 884a8db9d44c7f45c49329373506d613fc2c7a69 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 15:29:06 +0800 Subject: [PATCH 11/23] add more error log --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 9239c47bfee89..cc4cfad43c701 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -412,6 +412,12 @@ bool DistModel::PrepareFeedAndFetch() { feed_names_[var_name] = idx; idx_to_feeds_[idx] = var_name; framework::VarDesc *real_var = program_->Block(0).FindVar(var_name); + if (!real_var) { + LOG(ERROR) + << "The output of feed ops [" << var_name + << "] cannot be found in the program. Check the inference program."; + return false; + } if (real_var->GetDataType() == framework::proto::VarType::FP32) { feeds_to_dtype_.insert({var_name, DistModelDataType::FLOAT32}); } else if (real_var->GetDataType() == framework::proto::VarType::INT32) { From 925d350f85b9195a5a663c36cd712cd65b8d2b19 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 15:51:51 +0800 Subject: [PATCH 12/23] add pp stage checker and feed fetch vars checkerw --- .../distributed/fleet_executor/dist_model.cc | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index cc4cfad43c701..cfc20bc26ac89 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -105,6 +105,14 @@ std::string DistModelDTypeToString(DistModelDataType dtype) { return "NOT SUPPORT DTYPE"; } +bool IsPPFirstStage(const DistModelConfig &config) { + return config.local_rank - config.mp_degree < 0; +} + +bool IsPPLastStage(const DistModelConfig &config) { + return config.local_rank + config.mp_degree >= config.nranks; +} + } // namespace bool DistModel::Init() { @@ -196,9 +204,8 @@ bool DistModel::CommInit() { comm_init_block, config_.mp_ring_id); } if (config_.pp_degree > 1) { - // NOTE: the last pp stage doesn't need init pp comm VLOG(3) << "Init comm group for pp."; - if (config_.local_rank - config_.mp_degree >= 0) { + if (!IsPPFirstStage(config_)) { PADDLE_ENFORCE_EQ(config_.pp_upstream_ring_id >= 0, true, platform::errors::InvalidArgument( "pp upstream ring id must be provided for " @@ -211,7 +218,7 @@ bool DistModel::CommInit() { comm_init_block, config_.pp_upstream_ring_id); } - if (config_.local_rank + config_.mp_degree < config_.nranks) { + if (!IsPPLastStage(config_)) { PADDLE_ENFORCE_EQ(config_.pp_downstream_ring_id >= 0, true, platform::errors::InvalidArgument( "pp downstream ring id must be provided for " @@ -439,13 +446,38 @@ bool DistModel::PrepareFeedAndFetch() { idx_to_fetches_[idx] = op->Input("X")[0]; } } - if (feeds_.size() == 0) { - LOG(ERROR) << "No feed ops in the inf program, please check the program."; - return false; - } - if (fetches_.size() == 0) { - LOG(ERROR) << "No fetch ops in the inf program, please check the program."; - return false; + + if (config_.pp_degree == 1) { + if (feeds_.size() == 0) { + LOG(ERROR) << "No feed ops in the inf program, please check the program."; + return false; + } + if (fetches_.size() == 0) { + LOG(ERROR) << "No fetch op in the inf program, please check the program."; + return false; + } + } else { + if (IsPPFirstStage(config_)) { + if (feeds_.size() == 0) { + LOG(ERROR) << "Feed ops are needed for the first pp stage."; + return false; + } else { + LOG(WARNING) << "No feed ops in non-first pp stage."; + } + } else if (IsPPLastStage(config_)) { + if (fetches_.size() == 0) { + LOG(ERROR) << "Fetch op is needed for the last pp stage."; + return false; + } else { + LOG(WARNING) << "No fetch op in non-last pp stage."; + } + } + if (!IsPPFirstStage(config_) && feeds_.size() > 0) { + LOG(WARNING) << "Feed op is found in the non-first stage of pp."; + } + if (!IsPPLastStage(config_) && fetches_.size() > 0) { + LOG(WARNING) << "Fetch op is found in the non-last stage of pp."; + } } return true; } From 9cce72cd023edd7a92e00bea643281c32627ffb5 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Mon, 24 Jan 2022 15:56:00 +0800 Subject: [PATCH 13/23] prune logic branch --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index cfc20bc26ac89..65a0fc28dd1b7 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -464,18 +464,17 @@ bool DistModel::PrepareFeedAndFetch() { } else { LOG(WARNING) << "No feed ops in non-first pp stage."; } - } else if (IsPPLastStage(config_)) { + } else if (feeds_.size() > 0) { + LOG(WARNING) << "Feed op is found in the non-first stage of pp."; + } + if (IsPPLastStage(config_)) { if (fetches_.size() == 0) { LOG(ERROR) << "Fetch op is needed for the last pp stage."; return false; } else { LOG(WARNING) << "No fetch op in non-last pp stage."; } - } - if (!IsPPFirstStage(config_) && feeds_.size() > 0) { - LOG(WARNING) << "Feed op is found in the non-first stage of pp."; - } - if (!IsPPLastStage(config_) && fetches_.size() > 0) { + } else if (fetches_.size() > 0) { LOG(WARNING) << "Fetch op is found in the non-last stage of pp."; } } From 139a28f71ea0ae39f4e38b7fbb54de4fcc996392 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 08:31:15 +0800 Subject: [PATCH 14/23] add timer --- .../distributed/fleet_executor/dist_model.cc | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index 65a0fc28dd1b7..ef2757019271b 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -13,6 +13,7 @@ // limitations under the License. #include +#include // NOLINT #include "paddle/fluid/distributed/fleet_executor/dist_model.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" @@ -66,9 +67,11 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, "DistModelTensor contains no data.")); if (platform::is_cpu_place(place)) { + VLOG(3) << "Loading data for CPU."; std::memcpy(static_cast(input_tensor_ptr), input_data.data.data(), input_data.data.length()); } else if (platform::is_gpu_place(place)) { + VLOG(3) << "Loading data for GPU."; platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto *dev_ctx = dynamic_cast(pool.Get(place)); @@ -113,6 +116,24 @@ bool IsPPLastStage(const DistModelConfig &config) { return config.local_rank + config.mp_degree >= config.nranks; } +class DistModelTimer { + public: + void tic() { tic_time = std::chrono::high_resolution_clock::now(); } + double toc() { + std::chrono::high_resolution_clock::time_point toc_time = + std::chrono::high_resolution_clock::now(); + std::chrono::duration time_elapse = + std::chrono::duration_cast>(toc_time - + tic_time); + double time_elapse_in_ms = + static_cast(time_elapse.count()) * 1000.0; + return time_elapse_in_ms; + } + + private: + std::chrono::high_resolution_clock::time_point tic_time; +}; + } // namespace bool DistModel::Init() { @@ -576,19 +597,32 @@ bool DistModel::FetchResult(const framework::LoDTensor &fetch, bool DistModel::Run(const std::vector &input_data, std::vector *output_data) { + // TODO(fleet exe dev): support pipeline inf mode + VLOG(3) << "DistModel run for once."; + + DistModelTimer timer; + timer.tic(); + if (!FeedData(input_data, scope_.get())) { LOG(ERROR) << "DistModel failed at feeding data."; return false; } - VLOG(3) << "Finish loading data."; + double feed_elapse = timer.toc(); + VLOG(3) << "Finish loading data, cost " << feed_elapse << "ms."; fleet_exe->Run(carrier_id_); + double fleet_exe_elapse = timer.toc(); + VLOG(3) << "Finish FleetExe running, cost " << fleet_exe_elapse - feed_elapse + << "ms."; if (!FetchResults(output_data, scope_.get())) { LOG(ERROR) << "DistModel failed at fetching result."; return false; } - VLOG(3) << "Finish fetching data."; + double fetch_elapse = timer.toc(); + VLOG(3) << "Finish fetching data, cost " << fetch_elapse - fleet_exe_elapse + << "ms."; + VLOG(3) << "DistModel finish inf, cost " << fetch_elapse << "ms"; return true; } From e5d607ec19488574bb96d324c6ed31a375beadfb Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:03:19 +0800 Subject: [PATCH 15/23] add ut for thw whole work flow --- .../fluid/tests/unittests/CMakeLists.txt | 1 + .../tests/unittests/test_dist_model_run.py | 81 +++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 python/paddle/fluid/tests/unittests/test_dist_model_run.py diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 2ac5e9404c1ba..d990ba6853495 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -157,6 +157,7 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_mapper) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_task_node) LIST(REMOVE_ITEM TEST_OPS test_dist_model_tensor) + LIST(REMOVE_ITEM TEST_OPS test_dist_model_run) endif() # Temporally disable test_deprecated_decorator diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_dist_model_run.py new file mode 100644 index 0000000000000..64b76760d3dc8 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_model_run.py @@ -0,0 +1,81 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import paddle +import numpy as np +import os +from paddle.fluid import core + +paddle.enable_static() + + +class TestDistModelRun(unittest.TestCase): + def test_dist_model_run(self): + path_prefix = './dist_model_run_test/inf' + + # saving the inference model and params + x = paddle.static.data(name='x', shape=[28, 28], dtype='float32') + y = paddle.static.data(name='y', shape=[28, 1], dtype='int64') + predict = paddle.static.nn.fc(x, 10, activation='softmax') + loss = paddle.nn.functional.cross_entropy(predict, y) + avg_loss = paddle.tensor.stat.mean(loss) + exe = paddle.static.Executor(paddle.CUDAPlace(0)) + exe.run(paddle.static.default_startup_program()) + x_data = np.random.randn(28, 28).astype('float32') + y_data = np.random.randint(0, 9, size=[28, 1]).astype('int64') + exe.run(paddle.static.default_main_program(), + feed={'img': x_data, + 'label': y_data}, + fetch_list=[avg_loss]) + paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) + + # data for the test + x_tensor = np.random.randn(28, 28).astype('float32') + y_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') + + # init the dist model and run + config = core.DistModelConfig() + config.model_dir = path_prefix + config.place = 'GPU' + dist = core.DistModel(config) + dist.init() + dist_x = core.DistModelTensor(x_tensor, 'x') + dist_y = core.DistModelTensor(y_tensor, 'y') + input_data = [dist_x, dist_y] + output_rst = dist.run(input_data) + dist_model_rst = output_rst[0] + print("dist model rst:", dist_model_rst) + + # use framework's api to inference + [inference_program, feed_target_names, fetch_targets] = ( + paddle.static.load_inference_model(path_prefix, exe)) + image_tensor = np.random.randn(28, 28).astype('float32') + label_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') + results = exe.run(inference_program, + feed={'x': image_tensor, + 'y': label_tensor}, + fetch_list=fetch_targets) + load_inference_model_rst = results[0] + print("laod inference model api rst:", load_inference_model_rst) + + # compare two results + self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst)) + + # delete the saved model and params + os.rmdir("./dist_model_run_test/") + + +if __name__ == '__main__': + unittest.main() From c4e30da157835f9fb827f6fe3d3e2cf38afd52c1 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:05:58 +0800 Subject: [PATCH 16/23] update ut --- .../tests/unittests/test_dist_model_run.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_dist_model_run.py index 64b76760d3dc8..348fcd88559a6 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_dist_model_run.py @@ -23,9 +23,12 @@ class TestDistModelRun(unittest.TestCase): def test_dist_model_run(self): - path_prefix = './dist_model_run_test/inf' + # step 0: declare folder to save the model and params + folder = './dist_model_run_test/' + file = 'inf' + path_prefix = folder + file - # saving the inference model and params + # step 1: saving the inference model and params x = paddle.static.data(name='x', shape=[28, 28], dtype='float32') y = paddle.static.data(name='y', shape=[28, 1], dtype='int64') predict = paddle.static.nn.fc(x, 10, activation='softmax') @@ -41,11 +44,11 @@ def test_dist_model_run(self): fetch_list=[avg_loss]) paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) - # data for the test + # step 2: prepare data for the test x_tensor = np.random.randn(28, 28).astype('float32') y_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') - # init the dist model and run + # step 3: init the dist model to inference with fake data config = core.DistModelConfig() config.model_dir = path_prefix config.place = 'GPU' @@ -58,7 +61,7 @@ def test_dist_model_run(self): dist_model_rst = output_rst[0] print("dist model rst:", dist_model_rst) - # use framework's api to inference + # step 4: use framework's api to inference with fake data [inference_program, feed_target_names, fetch_targets] = ( paddle.static.load_inference_model(path_prefix, exe)) image_tensor = np.random.randn(28, 28).astype('float32') @@ -68,13 +71,13 @@ def test_dist_model_run(self): 'y': label_tensor}, fetch_list=fetch_targets) load_inference_model_rst = results[0] - print("laod inference model api rst:", load_inference_model_rst) + print("load inference model api rst:", load_inference_model_rst) - # compare two results + # step 5: compare two results self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst)) - # delete the saved model and params - os.rmdir("./dist_model_run_test/") + # step 6: clean up the env, delete the saved model and params + os.rmdir(folder) if __name__ == '__main__': From 312f679499084477f0d55077a36dc32e6d26ac7b Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:06:46 +0800 Subject: [PATCH 17/23] update license --- python/paddle/fluid/tests/unittests/test_dist_model_run.py | 2 +- python/paddle/fluid/tests/unittests/test_dist_model_tensor.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_dist_model_run.py index 348fcd88559a6..943d037573b8a 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_dist_model_run.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py b/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py index da25550c4f47e..a74b4f0d224ef 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py +++ b/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From b9fa0bf53b21545df04cbf6e1f5bde1c9aded7d6 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:19:26 +0800 Subject: [PATCH 18/23] bug fix for ut --- .../fluid/tests/unittests/test_dist_model_run.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_dist_model_run.py index 943d037573b8a..e31aea6610585 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_dist_model_run.py @@ -39,8 +39,8 @@ def test_dist_model_run(self): x_data = np.random.randn(28, 28).astype('float32') y_data = np.random.randint(0, 9, size=[28, 1]).astype('int64') exe.run(paddle.static.default_main_program(), - feed={'img': x_data, - 'label': y_data}, + feed={'x': x_data, + 'y': y_data}, fetch_list=[avg_loss]) paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) @@ -58,17 +58,15 @@ def test_dist_model_run(self): dist_y = core.DistModelTensor(y_tensor, 'y') input_data = [dist_x, dist_y] output_rst = dist.run(input_data) - dist_model_rst = output_rst[0] + dist_model_rst = output_rst[0].as_ndarray().ravel().tolist() print("dist model rst:", dist_model_rst) # step 4: use framework's api to inference with fake data [inference_program, feed_target_names, fetch_targets] = ( paddle.static.load_inference_model(path_prefix, exe)) - image_tensor = np.random.randn(28, 28).astype('float32') - label_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') results = exe.run(inference_program, - feed={'x': image_tensor, - 'y': label_tensor}, + feed={'x': x_tensor, + 'y': y_tensor}, fetch_list=fetch_targets) load_inference_model_rst = results[0] print("load inference model api rst:", load_inference_model_rst) @@ -76,7 +74,9 @@ def test_dist_model_run(self): # step 5: compare two results self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst)) - # step 6: clean up the env, delete the saved model and params + # step 6: clean up the env, delete the saved model and params\ + os.remove(path_prefix + '.pdiparams') + os.remove(path_prefix + '.pdmodel') os.rmdir(folder) From 11deb1711ac30bcd69d33f1f98c9a34c1882d023 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:24:20 +0800 Subject: [PATCH 19/23] update ut --- python/paddle/fluid/tests/unittests/test_dist_model_run.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_dist_model_run.py index e31aea6610585..ab09725478dc0 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_dist_model_run.py @@ -43,6 +43,7 @@ def test_dist_model_run(self): 'y': y_data}, fetch_list=[avg_loss]) paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) + print('save model to', path_prefix) # step 2: prepare data for the test x_tensor = np.random.randn(28, 28).astype('float32') @@ -74,10 +75,11 @@ def test_dist_model_run(self): # step 5: compare two results self.assertTrue(np.allclose(dist_model_rst, load_inference_model_rst)) - # step 6: clean up the env, delete the saved model and params\ + # step 6: clean up the env, delete the saved model and params os.remove(path_prefix + '.pdiparams') os.remove(path_prefix + '.pdmodel') os.rmdir(folder) + print('cleaned up the env') if __name__ == '__main__': From eb8f265fa9a81d7ae941e67c053e84807b2d4d5d Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:37:49 +0800 Subject: [PATCH 20/23] rename file --- python/paddle/fluid/tests/unittests/CMakeLists.txt | 4 ++-- ...est_dist_model_run.py => test_fleet_exe_dist_model_run.py} | 0 ...st_model_tensor.py => test_fleet_exe_dist_model_tensor.py} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename python/paddle/fluid/tests/unittests/{test_dist_model_run.py => test_fleet_exe_dist_model_run.py} (100%) rename python/paddle/fluid/tests/unittests/{test_dist_model_tensor.py => test_fleet_exe_dist_model_tensor.py} (100%) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index d990ba6853495..2e35277d70cd6 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -156,8 +156,8 @@ if(((NOT WITH_ROCM) AND (NOT WITH_GPU)) OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_origin_scheduler) LIST(REMOVE_ITEM TEST_OPS test_auto_parallel_mapper) LIST(REMOVE_ITEM TEST_OPS test_fleet_executor_task_node) - LIST(REMOVE_ITEM TEST_OPS test_dist_model_tensor) - LIST(REMOVE_ITEM TEST_OPS test_dist_model_run) + LIST(REMOVE_ITEM TEST_OPS test_fleet_exe_dist_model_run) + LIST(REMOVE_ITEM TEST_OPS test_fleet_exe_dist_model_tensor) endif() # Temporally disable test_deprecated_decorator diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py similarity index 100% rename from python/paddle/fluid/tests/unittests/test_dist_model_run.py rename to python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py diff --git a/python/paddle/fluid/tests/unittests/test_dist_model_tensor.py b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_tensor.py similarity index 100% rename from python/paddle/fluid/tests/unittests/test_dist_model_tensor.py rename to python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_tensor.py From dcdd46f7037d21d126cadcabd51802d1e503745d Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:50:00 +0800 Subject: [PATCH 21/23] minor fix --- .../fluid/tests/unittests/test_fleet_exe_dist_model_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py index ab09725478dc0..5fd5646b1e1fe 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py @@ -45,7 +45,7 @@ def test_dist_model_run(self): paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) print('save model to', path_prefix) - # step 2: prepare data for the test + # step 2: prepare fake data for the test x_tensor = np.random.randn(28, 28).astype('float32') y_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') From de4c1cfa12fcf272a6e7307889bd996292b931a7 Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 09:54:31 +0800 Subject: [PATCH 22/23] for cla --- .../fluid/tests/unittests/test_fleet_exe_dist_model_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py index 5fd5646b1e1fe..544fe4dd43e6b 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_exe_dist_model_run.py @@ -45,7 +45,7 @@ def test_dist_model_run(self): paddle.static.save_inference_model(path_prefix, [x, y], [avg_loss], exe) print('save model to', path_prefix) - # step 2: prepare fake data for the test + # step 2: prepare fake data for the inference x_tensor = np.random.randn(28, 28).astype('float32') y_tensor = np.random.randint(0, 9, size=[28, 1]).astype('int64') From 27652137c854104327ba42885694836f0eaa9aba Mon Sep 17 00:00:00 2001 From: liuyuang Date: Tue, 25 Jan 2022 10:12:19 +0800 Subject: [PATCH 23/23] add compile flag --- paddle/fluid/distributed/fleet_executor/dist_model.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/paddle/fluid/distributed/fleet_executor/dist_model.cc b/paddle/fluid/distributed/fleet_executor/dist_model.cc index ef2757019271b..4b8483302378f 100644 --- a/paddle/fluid/distributed/fleet_executor/dist_model.cc +++ b/paddle/fluid/distributed/fleet_executor/dist_model.cc @@ -72,6 +72,7 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, input_data.data.length()); } else if (platform::is_gpu_place(place)) { VLOG(3) << "Loading data for GPU."; +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto *dev_ctx = dynamic_cast(pool.Get(place)); @@ -79,6 +80,10 @@ bool LoadDataFromDistModelTensor(const DistModelTensor &input_data, memory::Copy(gpu_place, static_cast(input_tensor_ptr), platform::CPUPlace(), input_data.data.data(), input_data.data.length(), dev_ctx->stream()); +#else + PADDLE_THROW(paddle::platform::errors::Fatal( + "Paddle wasn't compiled with CUDA, but place is GPU.")); +#endif } else { PADDLE_THROW(paddle::platform::errors::InvalidArgument( "DistModel only supports CPU and GPU."));