Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement master server client, remove unnecessary dummy variable #2429

Merged
merged 9 commits into from
Jun 14, 2017

Conversation

helinwang
Copy link
Contributor

No description provided.

@helinwang helinwang requested review from wangkuiyi and gongweibao June 9, 2017 01:44
@jacquesqiao jacquesqiao self-requested a review June 9, 2017 01:47
@@ -13,18 +15,15 @@ const (
targetTaskCount = 300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是在哪里用的?没找到用的地方!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao Thanks! Removed.

//
// SetDataset can be call multiple times. But only the first call will
// be honored.
func (s *Service) SetDataset(globPaths []string, dummy *int) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数有点长了,是否拆一下的好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Done.

// TODO(helin): client need to retry in this
// error case. Gotcha: RPC client can't
// compare returned error with predefined
// erros like io.EOF. Because interface don't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erros=>errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.


return c.client.Close()
}

// Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有一个疑问没有看懂:connect的过程为何不能用一次锁,而要分为两次?

c.mu.Lock()
defer c.mu.Unlock()

Copy link
Contributor Author

@helinwang helinwang Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

接下来的rpc.DialHTTP是很重的操作,不想让其Block其他的c.mu.Lock()。比如master addr一开始是arpc.DialHTTP正在进行。这时候master addr变成了b,那么我们不想让Connect("b")Connect("a") block。
以后这里会加上Dial失败的等待和重试,操作就更重了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了。谢谢!

return c
}

func (c *Client) monitorMaster(addr Addresser) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方有些疑惑请教一下:

  1. 这个函数应该是隔一段时间连接一下Master,这个调用RPC的时候出现网络的错误再重新连接是否更好一点?
  2. 地址只有一个,没看懂curMaster和lastMater的含义。

Copy link
Contributor Author

@helinwang helinwang Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

谢谢comment!非常好的建议!

  1. 这个可能不需要monitorMaster做:

    • RPC调用会有重试(TODO item)
    • conn.go中如果TCP连接断了,应该重连。(TODO item)
    • 如果master挂了,被重启到其他ip,Addresser可以从这个参数取得最新的ip,连接到最近的ip。
  2. curMaster和lastMater用来检测Addresser返回的地址是否改变,如果改变,表示master换地址了,会连接到最新的地址。

Copy link
Contributor

@gongweibao gongweibao Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果master挂了,被重启到其他ip,Addresser可以从这个参数取得最新的ip,连接到最近的ip。
多谢helin,看懂了。
建议把上边这句话加入到注释当中去。

 10 // Addresser provide the address of the master server.
 11 type Addresser interface {
 12     Address() string
 13 }

Copy link
Contributor Author

@helinwang helinwang Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

                // get the lastest address of the master server,                                                                                                                                   
                // connect to the new address once address changed.                                                                                                                                       
		curMaster := addr.Address()
                if curMaster != lastMaster {
                    ..

for _, s := range globPaths {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Master收到错误参数然后panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

多谢指出!不该panic。Done.

@@ -123,6 +214,8 @@ func (s *Service) GetTask(dummy int, task *Task) error {
return err
}

*task = t.Task

Copy link
Contributor

@gongweibao gongweibao Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

247行应该加出错的task日志等,这样方便出错误的时候统计和调试。
而且我们是否应该设定一个阈值,当失败的任务达到整体任务的一个比例的时候,这个Job失败?

Copy link
Contributor

@Yancey0623 Yancey0623 Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change type of taskQueues.Failed from []Task to []taskEntry, so that we can use numTimeout to record the failed times.


Thanks! I have print log mentioning how many times the task failed, because if we print the log, we can record what happens. But have not changed the type. Do you think if it's sufficient? :)
-- Helin


Cool, I think it's sufficient in this PR.
--Yanxu

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao 好主意!已经把出错日志加上。
Job失败我觉得就不必了,只要用户知道有task fail就行,让用户来决定什么情况下Job是失败的吧。

Copy link
Contributor Author

@helinwang helinwang Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Thanks! I have print log mentioning how many times the task failed, because if we print the log, we can record what happens. But have not changed the type. Do you think if it's sufficient? :)

select {
case <-s.ready:
}

s.mu.Lock()
defer s.mu.Unlock()

t, ok := s.taskQueues.Pending[taskID]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskID如果是不同Epoch的,这个地方会有问题。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果没有这个task在pending状态了,我们也不需要检查task是不是还没有完成了(已经不是pending,那就是完成或者出错,都不需要考虑了)。

return errors.New("no more available task")
}
s.taskQueues.Todo = s.taskQueues.Done
s.taskQueues.Todo = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo=nil? Done=nil?

Copy link
Contributor Author

@helinwang helinwang Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks! Done. Added this condition into test case as well.

s.initDone = true
return nil
}

// GetTask gets a new task from the service.
func (s *Service) GetTask(dummy int, task *Task) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数长且复杂。我们需要在另外的PR中提交测试用例。GetTask和TaskFinish。

Copy link
Contributor Author

@helinwang helinwang Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

谢谢!确实比较复杂了,我改了下,拆出来了一个函数。

已经有test case了,请看:https://github.com/PaddlePaddle/Paddle/pull/2429/files#diff-dc14b64eab9d49fd494527c071e6121aR82

@@ -123,6 +214,8 @@ func (s *Service) GetTask(dummy int, task *Task) error {
return err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

211行是否加一个assert,保证Pending中不应该有task t?
当然,理论上是不应该有的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得不需要了。这里的正确性我认为用unit test保证就好。

@@ -162,17 +255,27 @@ func (s *Service) GetTask(dummy int, task *Task) error {

Copy link
Contributor

@gongweibao gongweibao Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得是否还应该有一个TaskFailed接口,这样,有些trainer知道自己错了就可以直接上报错误,而不用等待超时?
另外:@typhoonzero, Kubernetes的continer如果退出,master是否需要、能不能及时感知?以前经常碰到程序一启动很快就死(比如core)的情况,如果可以感知,等待的时间可以大幅减少。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao

Kubernetes的continer如果退出,master是否需要、能不能及时感知?以前经常碰到程序一启动很快就死(比如core)的情况,如果可以感知,等待的时间可以大幅减少。

等待的时间可以大幅减少是只trainer还是master。container退出我理解是trainer的container退出。trainer是以Kubernetes的Job启动的,任意一个container退出非0返回值,Kubernetes都会自动重启这个container。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master需要判断task是否失败了。比如core。现在用的是timeout判断的。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是用kubernetes + docker,应该不会出现马上core掉的现象,运行稳定版本的镜像是没有依赖,库版本不一致的问题。如果是core类的问题应该是在运行一段时间之后出现FPE之类的。其他的错误倒是可能引起trainer启动之后就失败,比如用户的train.py出错。

但此类的问题我认为应该交给kubernetes处理,而不是master。比如kubernetes控制一个job中的trainer如果50%都fail了,那这个job就整体fail了,此时对应的master和pserver也不再有运行的意义。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果core的话也没法调用函数通知master了吧。
建议还是要是这个问题出现且严重再优化,否则只能把系统变得复杂,难以维护。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的。后续根据执行的情况来优化这些问题。另外也同意增加TaskFail这样的master的RPC接口,防止必然失败的task重新被分发。

@@ -123,6 +214,8 @@ func (s *Service) GetTask(dummy int, task *Task) error {
return err
}

*task = t.Task

Copy link
Contributor

@gongweibao gongweibao Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得在gettask,taskfinish,retry task,task to fail的地方都应该有日志记录task信息,这样我们在调试的时候如果发现了问题就可以通过task的生命周期来找其中的问题,而且最好方便用关键字过滤:)。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done.

s.taskQueues.Todo = append(s.taskQueues.Todo, s.taskQueues.Done...)
s.taskQueues.Done = nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要记录日志。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

}

// task finished, reset timeout
t.NumTimeout = 0
s.taskQueues.Done = append(s.taskQueues.Done, t)
delete(s.taskQueues.Pending, taskID)

if len(s.taskQueues.Pending) == 0 {
s.taskQueues.Todo = append(s.taskQueues.Todo, s.taskQueues.Done...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要记录日志。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

s.mu.Lock()
defer s.mu.Unlock()

t, ok := s.taskQueues.Pending[taskID]
if !ok {
return ErrPendingTaskNotFound
return errors.New("pending task not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要记录日志。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When building with recordio (github.com/PaddlePaddle/recordio) it says

../../../recordio/chunk.go:95: undefined: snappy.NewBufferedWriter

@typhoonzero It works on my machine,

✝➜  master git:(master_client) ✗ pwd            
/root/gopath/src/github.com/PaddlePaddle/Paddle/go/cmd/master
✝➜  master git:(master_client) ✗ go build master.go
✝➜  master git:(master_client) ✗ 

Can you try

go get ./...

-- Helin


It seems that my github.com/golang/snappy is not on the master branch for some reason. Update this repo fix the problem. Thanks!

-- Wuyi

for i := range knownServers {
if knownServers[i].Addr != curServers[i].Addr {
for i := range lastServers {
if lastServers[i].Addr != curServers[i].Addr {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果改成下边的模式是不是会好一些:

if lastServers[i].Addr == curServers[i].Addr{
    continue
}
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Good idea. Done.

}

if len(paths) == 0 {
return nil, errors.New("no valid datset specified")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datset => dataset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

@@ -123,6 +214,8 @@ func (s *Service) GetTask(dummy int, task *Task) error {
return err
}

*task = t.Task

Copy link
Contributor

@Yancey0623 Yancey0623 Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change type of taskQueues.Failed from []Task to []taskEntry, so that we can use numTimeout to record the failed times.


Thanks! I have print log mentioning how many times the task failed, because if we print the log, we can record what happens. But have not changed the type. Do you think if it's sufficient? :)
-- Helin


Cool, I think it's sufficient in this PR.
--Yanxu

Copy link
Contributor

@gongweibao gongweibao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@typhoonzero typhoonzero left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM++

Copy link
Contributor

@Yancey0623 Yancey0623 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@helinwang helinwang merged commit 5f5e128 into PaddlePaddle:develop Jun 14, 2017
)

func main() {
port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we follow wangyi's suggestion here? a strategy that used to avoid port occupied. server listen on port number of "0", then the operate system(k8s) will assign a idle port, program can query for that port.
I'm not familiar with k8s, is it have different port namespace in each pod? I have googled, but still not sure about that. if so, please ignore this comment.

Copy link
Contributor Author

@helinwang helinwang Jun 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dzhwinter That strategy of using 0 for port is mostly used for unit tests when we don't have control over the testing environment. E.g., https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client_test.go#L20

For non-container based production use, admin will know which port is free. And if we are using container (k8s is based on container technology) this is not a problem, because typically there is only one program running inside the container, so all ports not reserved by OS will be free.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants