Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement master server client, remove unnecessary dummy variable #2429

Merged
merged 9 commits into from
Jun 14, 2017
52 changes: 2 additions & 50 deletions go/cmd/master/master.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,32 @@
package main

import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/namsral/flag"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)

func main() {
port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we follow wangyi's suggestion here? a strategy that used to avoid port occupied. server listen on port number of "0", then the operate system(k8s) will assign a idle port, program can query for that port.
I'm not familiar with k8s, is it have different port namespace in each pod? I have googled, but still not sure about that. if so, please ignore this comment.

Copy link
Contributor Author

@helinwang helinwang Jun 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dzhwinter That strategy of using 0 for port is mostly used for unit tests when we don't have control over the testing environment. E.g., https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client_test.go#L20

For non-container based production use, admin will know which port is free. And if we are using container (k8s is based on container technology) this is not a problem, because typically there is only one program running inside the container, so all ports not reserved by OS will be free.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it. Thanks!

faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()

if *dataset == "" {
panic("no dataset specified.")
}

if *faultTolerance {
panic("fault tolernance not implemented.")
}

var chunks []master.Chunk
var paths []string
ss := strings.Split(*dataset, ",")
fmt.Println(ss)
for _, s := range ss {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
}
paths = append(paths, match...)
}

if len(paths) == 0 {
panic("no valid datset specified.")
}

idx := 0
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
panic(err)
}

index, err := recordio.LoadIndex(f)
if err != nil {
panic(err)
}
f.Close()

count := index.NumChunks()
for i := 0; i < count; i++ {
chunk := master.Chunk{
Idx: idx,
Path: path,
Index: *index.ChunkIndex(i),
}
chunks = append(chunks, chunk)
}
}

s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
if err != nil {
panic(err)
Expand Down
21 changes: 21 additions & 0 deletions go/pserver/internal/connection/conn.go → go/connection/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connection

import (
"errors"
"log"
"net/rpc"
"sync"
)
Expand All @@ -21,6 +22,18 @@ func New() *Conn {
return c
}

// Close closes the connection.
func (c *Conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()

if c.client == nil {
return nil
}

return c.client.Close()
}

// Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有一个疑问没有看懂:connect的过程为何不能用一次锁,而要分为两次?

c.mu.Lock()
defer c.mu.Unlock()

Copy link
Contributor Author

@helinwang helinwang Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

接下来的rpc.DialHTTP是很重的操作,不想让其Block其他的c.mu.Lock()。比如master addr一开始是arpc.DialHTTP正在进行。这时候master addr变成了b,那么我们不想让Connect("b")Connect("a") block。
以后这里会加上Dial失败的等待和重试,操作就更重了。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

明白了。谢谢!

c.mu.Lock()
Expand Down Expand Up @@ -50,12 +63,20 @@ func (c *Conn) Connect(addr string) error {
c.waitConn = nil
}
} else {
err := client.Close()
if err != nil {
log.Println(err)
}

return errors.New("client already set from a concurrent goroutine")
}

return nil
}

// TODO(helin): refactor Call to be able to perform given retry
// policy.

// Call make a RPC call.
//
// Call will be blocked until the connection to remote RPC service
Expand Down
82 changes: 82 additions & 0 deletions go/master/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package master

import (
"log"
"time"

"github.com/PaddlePaddle/Paddle/go/connection"
)

// Addresser provide the address of the master server.
type Addresser interface {
Address() string
}

// Client is the client of the master server.
type Client struct {
conn *connection.Conn
}

// NewClient creates a new Client.
func NewClient(addr Addresser) *Client {
c := &Client{}
c.conn = connection.New()
go c.monitorMaster(addr)
return c
}

func (c *Client) monitorMaster(addr Addresser) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方有些疑惑请教一下:

  1. 这个函数应该是隔一段时间连接一下Master,这个调用RPC的时候出现网络的错误再重新连接是否更好一点?
  2. 地址只有一个,没看懂curMaster和lastMater的含义。

Copy link
Contributor Author

@helinwang helinwang Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

谢谢comment!非常好的建议!

  1. 这个可能不需要monitorMaster做:

    • RPC调用会有重试(TODO item)
    • conn.go中如果TCP连接断了,应该重连。(TODO item)
    • 如果master挂了,被重启到其他ip,Addresser可以从这个参数取得最新的ip,连接到最近的ip。
  2. curMaster和lastMater用来检测Addresser返回的地址是否改变,如果改变,表示master换地址了,会连接到最新的地址。

Copy link
Contributor

@gongweibao gongweibao Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果master挂了,被重启到其他ip,Addresser可以从这个参数取得最新的ip,连接到最近的ip。
多谢helin,看懂了。
建议把上边这句话加入到注释当中去。

 10 // Addresser provide the address of the master server.
 11 type Addresser interface {
 12     Address() string
 13 }

Copy link
Contributor Author

@helinwang helinwang Jun 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

                // get the lastest address of the master server,                                                                                                                                   
                // connect to the new address once address changed.                                                                                                                                       
		curMaster := addr.Address()
                if curMaster != lastMaster {
                    ..

lastMaster := ""
monitor := func() {
// get the lastest address of the master server,
// connect to the new address once address changed.
curMaster := addr.Address()
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Println(err)
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Println(err)

// connect to addr failed, set
// to last known addr in order
// to retry next time.
curMaster = lastMaster
}

}
}

lastMaster = curMaster
}

monitor()
ticker := time.NewTicker(10 * time.Second)
for _ = range ticker.C {
monitor()
}
}

// SetDataset set dataset for the master server to dispatch.
//
// SetDataset can be call multiple times from different nodes. But
// only the first call will be honored.
func (c *Client) SetDataset(globPaths []string) error {
return c.conn.Call("Service.SetDataset", globPaths, nil)
}

// GetTask gets a new task from the master server.
func (c *Client) GetTask() (Task, error) {
var t Task
err := c.conn.Call("Service.GetTask", 0, &t)
return t, err
}

// TaskFinished tells the master server a task is finished.
func (c *Client) TaskFinished(taskID int) error {
return c.conn.Call("Service.TaskFinished", taskID, nil)
}
120 changes: 120 additions & 0 deletions go/master/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package master_test

import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"testing"
"time"

log "github.com/sirupsen/logrus"

"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)

const (
totalTask = 20
chunkPerTask = 10
)

var port int

func init() {
log.SetLevel(log.ErrorLevel)

l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}

ss := strings.Split(l.Addr().String(), ":")
p, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
panic(err)
}
port = p

go func(l net.Listener) {
s := master.NewService(chunkPerTask, time.Second, 1)
server := rpc.NewServer()
err := server.Register(s)
if err != nil {
panic(err)
}

mux := http.NewServeMux()
mux.Handle(rpc.DefaultRPCPath, server)
err = http.Serve(l, mux)
if err != nil {
panic(err)
}
}(l)
}

type addresser string

func (a addresser) Address() string {
return string(a)
}

func TestClientFull(t *testing.T) {
const p = "/tmp/master_client_test_0"
f, err := os.Create(p)
if err != nil {
panic(err)
}

for i := 0; i < totalTask*chunkPerTask; i++ {
w := recordio.NewWriter(f, -1, -1)
w.Write(nil)
// call Close to force RecordIO writing a chunk.
w.Close()
}
f.Close()

c := master.NewClient(addresser(fmt.Sprintf(":%d", port)))
c.SetDataset([]string{p})

checkOnePass := func(i int) {
var tasks []master.Task
for i := 0; i < totalTask; i++ {
task, err := c.GetTask()
if err != nil {
t.Fatal(i, err)
}
tasks = append(tasks, task)
}

_, err = c.GetTask()
if err == nil {
t.Fatal(i, "should get error.")
}

err = c.TaskFinished(tasks[0].ID)
if err != nil {
t.Fatal(err)
}
tasks = tasks[1:]
task, err := c.GetTask()
if err != nil {
t.Fatal(err)
}
tasks = append(tasks, task)

for _, task := range tasks {
err = c.TaskFinished(task.ID)
if err != nil {
t.Fatal(i, err)
}
}
}

for i := 0; i < 10; i++ {
checkOnePass(i)
}
}
Loading