-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathnet_mgr.go
132 lines (117 loc) · 3.66 KB
/
net_mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package gnet
import (
"context"
"net"
"sync"
)
var (
// singleton
_netMgr = &NetMgr{}
)
// 网络管理类,提供对外接口
//
// manager class
type NetMgr struct {
listenerMap map[uint32]Listener
listenerMapLock sync.RWMutex
connectorMap map[uint32]Connection
connectorMapLock sync.RWMutex
initOnce sync.Once
wg sync.WaitGroup
}
// 单例模式,在调用的时候才会执行初始化一次
//
// singleton mode, init once
func GetNetMgr() *NetMgr {
_netMgr.initOnce.Do(func() {
_netMgr.init()
})
return _netMgr
}
func (m *NetMgr) init() {
m.listenerMap = make(map[uint32]Listener)
m.connectorMap = make(map[uint32]Connection)
m.wg = sync.WaitGroup{}
}
func (m *NetMgr) NewListener(ctx context.Context, address string, listenerConfig *ListenerConfig) Listener {
if listenerConfig.AcceptConnectionCreator == nil {
listenerConfig.AcceptConnectionCreator = func(conn net.Conn, config *ConnectionConfig) Connection {
return NewTcpConnectionAccept(conn, config)
}
}
newListener := NewTcpListener(listenerConfig)
newListener.netMgrWg = &m.wg
if !newListener.Start(ctx, address) {
logger.Error("NewListener Start Failed")
return nil
}
m.listenerMapLock.Lock()
m.listenerMap[newListener.GetListenerId()] = newListener
m.listenerMapLock.Unlock()
newListener.onClose = func(listener Listener) {
m.listenerMapLock.Lock()
delete(m.listenerMap, listener.GetListenerId())
m.listenerMapLock.Unlock()
}
return newListener
}
func (m *NetMgr) NewWsListener(ctx context.Context, address string, listenerConfig *ListenerConfig) Listener {
newListener := NewWsListener(listenerConfig)
newListener.netMgrWg = &m.wg
if !newListener.Start(ctx, address) {
logger.Error("NewWsListener Start Failed")
return nil
}
m.listenerMapLock.Lock()
m.listenerMap[newListener.GetListenerId()] = newListener
m.listenerMapLock.Unlock()
newListener.onClose = func(listener Listener) {
m.listenerMapLock.Lock()
delete(m.listenerMap, listener.GetListenerId())
m.listenerMapLock.Unlock()
}
return newListener
}
// create a new TcpConnection
func (m *NetMgr) NewConnector(ctx context.Context, address string, connectionConfig *ConnectionConfig,
tag interface{}) Connection {
return m.NewConnectorCustom(ctx, address, connectionConfig, tag, func(_config *ConnectionConfig) Connection {
return NewTcpConnector(_config)
})
}
func (m *NetMgr) NewWsConnector(ctx context.Context, address string, connectionConfig *ConnectionConfig,
tag interface{}) Connection {
return m.NewConnectorCustom(ctx, address, connectionConfig, tag, func(_config *ConnectionConfig) Connection {
return NewWsConnection(_config)
})
}
// create a new Connection, with custom connectionCreator
func (m *NetMgr) NewConnectorCustom(ctx context.Context, address string, connectionConfig *ConnectionConfig,
tag interface{}, connectionCreator ConnectionCreator) Connection {
newConnector := connectionCreator(connectionConfig)
newConnector.SetTag(tag)
if !newConnector.Connect(address) {
newConnector.Close()
return nil
}
m.connectorMapLock.Lock()
m.connectorMap[newConnector.GetConnectionId()] = newConnector
m.connectorMapLock.Unlock()
newConnector.Start(ctx, &m.wg, func(connection Connection) {
m.connectorMapLock.Lock()
delete(m.connectorMap, connection.GetConnectionId())
m.connectorMapLock.Unlock()
})
return newConnector
}
// waitForAllNetGoroutine:是否阻塞等待所有网络协程结束
//
// waitForAllNetGoroutine: wait blocks until all goroutine end
func (m *NetMgr) Shutdown(waitForAllNetGoroutine bool) {
logger.Debug("Shutdown %v", waitForAllNetGoroutine)
if waitForAllNetGoroutine {
// 等待所有网络协程结束
m.wg.Wait()
logger.Debug("all net goroutine closed")
}
}