Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
MingMingShangTian committed Oct 14, 2020
2 parents e634fa6 + a820871 commit 2119342
Show file tree
Hide file tree
Showing 60 changed files with 2,921 additions and 212 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ repos:
entry: shellcheck
language: system
files: .sh$
exclude: (paddle_build.sh|fast_install.sh|check_file_diff_approvals.sh)
3 changes: 2 additions & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy collective_helper
fast_threaded_ssa_graph_executor variable_helper)

cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS executor)
cc_test(dist_multi_trainer_test SRCS dist_multi_trainer_test.cc DEPS
conditional_block_op executor)
cc_library(prune SRCS prune.cc DEPS framework_proto boost)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
cc_test(var_type_inference_test SRCS var_type_inference_test.cc DEPS op_registry
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License. */
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <set>
#include <string>
#include <thread> // NOLINT
#include <unordered_map> // NOLINT
Expand Down Expand Up @@ -313,6 +314,10 @@ class DownpourWorker : public HogwildWorker {
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
std::map<uint64_t, uint64_t> table_dependency_;
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
// multitask
std::map<int32_t, uint64_t> cond2table_map_;
std::set<uint64_t> condvalue_set_;
bool flag_partial_push_;

private:
// std::vector<std::string> dump_param_;
Expand Down
51 changes: 44 additions & 7 deletions paddle/fluid/framework/downpour_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <cstdlib>
#include <ctime>
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/platform/cpu_helper.h"

Expand Down Expand Up @@ -65,6 +67,13 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
}
}

flag_partial_push_ = false;
for (auto& m : param_.program_config(0).partial_pushdense_condtable_map()) {
cond2table_map_[m.key()] = m.value();
condvalue_set_.insert(m.value());
flag_partial_push_ = true;
}

skip_ops_.resize(param_.skip_ops_size());
for (int i = 0; i < param_.skip_ops_size(); ++i) {
skip_ops_[i] = param_.skip_ops(i);
Expand Down Expand Up @@ -876,14 +885,42 @@ void DownpourWorker::TrainFiles() {
#endif

if (need_to_push_dense_) {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
if (flag_partial_push_) {
Variable* var = (*thread_scope_).FindVar("cond_tag");
LoDTensor* tensor = var->GetMutable<LoDTensor>();
// check type in python code
int64_t* cond_value_batch = tensor->data<int64_t>();

for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));
if (condvalue_set_.find(tid) != condvalue_set_.end()) {
// common dense table must push dense
if (cond2table_map_[cond_value_batch[0]] != tid) {
// can't push dense
continue;
}
}

VLOG(3) << "push multitask dense gradient " << tid;
fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
}

} else {
for (int i = 0; i < param_.program_config(0).push_dense_table_id_size();
++i) {
uint64_t tid = static_cast<uint64_t>(
param_.program_config(0).push_dense_table_id(i));

fleet_ptr_->PushDenseVarsAsync(
*thread_scope_, tid, dense_grad_names_[tid], &push_sparse_status_,
scale_datanorm_, cur_batch);
}
}

VLOG(3) << "push dense gradient done.";

// the following code should be more precise and clean
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/framework/hogwild_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License. */
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
#include "paddle/fluid/operators/controlflow/conditional_block_op_helper.h"
#include "paddle/fluid/operators/distributed/distributed.h"
#include "paddle/fluid/platform/cpu_helper.h"
#include "paddle/fluid/platform/lodtensor_printer.h"
Expand Down Expand Up @@ -47,6 +48,8 @@ void HogwildWorker::CreateThreadOperators(const ProgramDesc &program) {
ops_.push_back(local_op_ptr);
continue;
}
operators::PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
program, 0, ops_);
}

void HogwildWorker::CreateThreadScope(const ProgramDesc &program) {
Expand Down
5 changes: 5 additions & 0 deletions paddle/fluid/framework/trainer_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ message CopyTableConfig {
repeated TableDependencyMap table_denpendency_map = 12;
}

message CondTableMap {
required int32 key = 1;
required int32 value = 2;
}
message ProgramConfig {
required string program_id = 1;
repeated int32 push_sparse_table_id = 2;
repeated int32 push_dense_table_id = 3;
repeated int32 pull_sparse_table_id = 4;
repeated int32 pull_dense_table_id = 5;
repeated CondTableMap partial_pushdense_condtable_map = 10;
}

message PullDenseWorkerParameter {
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/operators/controlflow/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
include(operators)
register_operators(DEPS naive_executor)
register_operators(EXCLUDES conditional_block_op DEPS naive_executor)

cc_library(conditional_block_op SRCS conditional_block_op.cc DEPS executor)
cc_library(op_variant SRCS op_variant.cc DEPS operator proto_desc)
cc_library(conditional_block_op_helper SRCS conditional_block_op_helper.cc DEPS operator op_variant conditional_block_op)
cc_library(recurrent_op_helper SRCS recurrent_op_helper.cc DEPS operator op_variant recurrent_op)
Expand Down
26 changes: 26 additions & 0 deletions paddle/fluid/operators/controlflow/conditional_block_op_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,32 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(
program, &fwd_ops, &bwd_ops);
}
void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<framework::OperatorBase *> &all_ops) {
// If block_id is not 0, returns
// This is because all conditional_block_ops and conditional_block_grad_ops
// in the whole program would be processed when block_id is 0 (i.e.
// when Executor::Run() or ParallelExecutor constructs).

// What's more, all conditional_block_ops and conditional_block_grad_ops
// must be processed when block_id is zero. If not, conditional_block_op
// may run first and erase variables used in conditional_block_grad_op,
// and in this moment, conditional_block_grad_ops may be not constructed yet.
if (block_id != 0) return;

std::vector<OpVariant> fwd_ops, bwd_ops;
for (auto *op : all_ops) {
if (op->Type() == "conditional_block") {
fwd_ops.emplace_back(op);
} else if (op->Type() == "conditional_block_grad") {
bwd_ops.emplace_back(op);
}
}

PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOpImpl(
program, &fwd_ops, &bwd_ops);
}

void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<std::unique_ptr<framework::OperatorBase>> &all_ops);

void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program, int block_id,
const std::vector<framework::OperatorBase *> &all_ops);

void PrepareSafeEagerDeletionOnConditionalOpAndConditionalGradOp(
const framework::ProgramDesc &program,
const std::vector<framework::OperatorBase *> &ifelse_ops,
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/operators/detail/strided_memcpy.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ struct StridedMemcpyFunctor<T, 0> {
memory::Copy(gpu_place, dst, gpu_place, src, sizeof(T),
cuda_ctx.stream());
#else
PADDLE_THROW("Paddle is not compiled with GPU");
PADDLE_THROW(
platform::errors::Unavailable("Paddle is not compiled with GPU."));
#endif
}
}
Expand All @@ -64,7 +65,8 @@ struct StridedMemcpyFunctor<T, 1> {
memory::Copy(gpu_place, dst, gpu_place, src, sizeof(T) * dst_dim[0],
cuda_ctx.stream());
#else
PADDLE_THROW("Paddle is not compiled with GPU");
PADDLE_THROW(
platform::errors::Unavailable("Paddle is not compiled with GPU."));
#endif
}
}
Expand Down
15 changes: 6 additions & 9 deletions paddle/fluid/operators/distributed/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ class RequestSend final : public RequestBase {
::grpc::ServerCompletionQueue* cq,
RequestHandler* request_handler, int req_id)
: RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) {
request_.reset(new GRPCVariableResponse(
request_handler->scope(), request_handler->dev_ctx(),
request_handler->distributed_mode()));
request_.reset(new GRPCVariableResponse(request_handler->scope(),
request_handler->dev_ctx(), true));
int method_id = static_cast<int>(distributed::GrpcMethod::kSendVariable);
service_->RequestAsyncUnary(
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
Expand Down Expand Up @@ -420,9 +419,8 @@ class RequestNotify final : public RequestBase {
::grpc::ServerCompletionQueue* cq,
RequestHandler* request_handler, int req_id)
: RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) {
request_.reset(new GRPCVariableResponse(
request_handler->scope(), request_handler->dev_ctx(),
request_handler->distributed_mode()));
request_.reset(new GRPCVariableResponse(request_handler->scope(),
request_handler->dev_ctx(), true));
int method_id = static_cast<int>(distributed::GrpcMethod::kRequestNotify);
service_->RequestAsyncUnary(
method_id, &ctx_, request_.get(), &responder_, cq_, cq_,
Expand Down Expand Up @@ -455,9 +453,8 @@ class RequestSendAndRecv final : public RequestBase {
::grpc::ServerCompletionQueue* cq,
RequestHandler* request_handler, int req_id)
: RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) {
request_.reset(new GRPCVariableResponse(
request_handler->scope(), request_handler->dev_ctx(),
request_handler->distributed_mode()));
request_.reset(new GRPCVariableResponse(request_handler->scope(),
request_handler->dev_ctx(), true));

int method_id =
static_cast<int>(distributed::GrpcMethod::kRequestSendAndRecv);
Expand Down
17 changes: 11 additions & 6 deletions paddle/fluid/operators/distributed/parameter_recv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,25 @@ void RecvSparseLodTensor(const CommContext &rpc_ctx,
std::unique_ptr<framework::Scope> local_scope = scope.NewTmpScope();
std::vector<const float *> tensors;
std::vector<distributed::VarHandlePtr> rets;
std::vector<std::string> recv_varnames;
for (size_t i = 0; i < rpc_ctx.splited_varnames.size(); i++) {
auto &recv_var_name = rpc_ctx.splited_varnames[i];
auto *local_var = local_scope->Var(recv_var_name);
VLOG(4) << "recv " << recv_var_name << " from " << rpc_ctx.epmap[i];
local_scope->Var(recv_var_name);
// sparse param in recv_scope is LoDTensor
rets.push_back(rpc_client->AsyncGetVarNoBarrier(
rpc_ctx.epmap[i], cpu_ctx, *local_scope.get(), recv_var_name,
recv_var_name));

const auto *value = local_var->Get<framework::LoDTensor>().data<float>();
tensors.push_back(value);
recv_varnames.push_back(recv_var_name);
}

for (size_t i = 0; i < rets.size(); i++) {
PADDLE_ENFORCE_NE(rets[i]->Wait(), 0U, platform::errors::ExecutionTimeout(
"internal error in RPCClient"));
auto &recv_var_name = recv_varnames[i];
auto *local_var = local_scope->FindVar(recv_var_name);
const auto *value = local_var->Get<framework::LoDTensor>().data<float>();
tensors.push_back(value);
}

auto *merged_var = scope.FindVar(rpc_ctx.var_name);
Expand All @@ -83,8 +86,10 @@ void RecvSparseLodTensor(const CommContext &rpc_ctx,
height += splited_var->Get<framework::LoDTensor>().dims()[0];
}

PADDLE_ENFORCE_EQ(merged_var->Get<framework::LoDTensor>().dims()[0], height,
"recved var must has same dims with local var");
PADDLE_ENFORCE_EQ(
merged_var->Get<framework::LoDTensor>().dims()[0], height,
platform::errors::InvalidArgument(
"Received variable must has same dimension with local variable."));

auto *merged_t = merged_var->GetMutable<framework::LoDTensor>();
auto *merged_d = merged_t->mutable_data<float>(cpu_place);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,27 @@ class LargeScaleFuseAdamOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel;

void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("Grad"),
"Input(Grad) of LargeScaleFuseAdamOp should not be null.");
PADDLE_ENFORCE(
ctx->HasInput("Grad"),
platform::errors::InvalidArgument(
"Input(Grad) of LargeScaleFuseAdamOp should not be null."));
PADDLE_ENFORCE(
ctx->HasInput("LearningRate"),
"Input(LearningRate) of LargeScaleFuseAdamOp should not be null.");
platform::errors::InvalidArgument(
"Input(LearningRate) of LargeScaleFuseAdamOp should not be null."));

auto lr_dims = ctx->GetInputDim("LearningRate");

PADDLE_ENFORCE_NE(framework::product(lr_dims), 0,
"Maybe the Input variable LearningRate has not "
"been initialized. You may need to confirm "
"if you put exe.run(startup_program) "
"after optimizer.minimize function.");
platform::errors::InvalidArgument(
"Maybe the Input variable LearningRate has not "
"been initialized. You may need to confirm "
"if you put exe.run(startup_program) "
"after optimizer.minimize function."));

PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1,
"Learning rate should have 1 element");
platform::errors::InvalidArgument(
"Learning rate should have 1 element"));
}

protected:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,27 @@ class LargeScaleFuseSGDOp : public framework::OperatorWithKernel {
using framework::OperatorWithKernel::OperatorWithKernel;

void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("Grad"),
"Input(Grad) of LargeScaleFuseSGDOp should not be null.");
PADDLE_ENFORCE(
ctx->HasInput("Grad"),
platform::errors::InvalidArgument(
"Input(Grad) of LargeScaleFuseSGDOp should not be null."));
PADDLE_ENFORCE(
ctx->HasInput("LearningRate"),
"Input(LearningRate) of LargeScaleFuseSGDOp should not be null.");
platform::errors::InvalidArgument(
"Input(LearningRate) of LargeScaleFuseSGDOp should not be null."));

auto lr_dims = ctx->GetInputDim("LearningRate");

PADDLE_ENFORCE_NE(framework::product(lr_dims), 0,
"Maybe the Input variable LearningRate has not "
"been initialized. You may need to confirm "
"if you put exe.run(startup_program) "
"after optimizer.minimize function.");
platform::errors::InvalidArgument(
"Maybe the Input variable LearningRate has not "
"been initialized. You may need to confirm "
"if you put exe.run(startup_program) "
"after optimizer.minimize function."));

PADDLE_ENFORCE_EQ(framework::product(lr_dims), 1,
"Learning rate should have 1 element");
platform::errors::InvalidArgument(
"Learning rate should have 1 element"));
}

protected:
Expand Down
Loading

0 comments on commit 2119342

Please sign in to comment.