-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection_manager.go
58 lines (48 loc) · 1.25 KB
/
connection_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package danube
import (
"sync"
)
type BrokerAddress struct {
ConnectURL string
BrokerURL string
Proxy bool
}
type connectionStatus struct {
Connected *rpcConnection
Disconnected bool
}
type connectionManager struct {
connections map[BrokerAddress]*connectionStatus
connection_options []DialOption
connectionsMutex sync.Mutex
}
// NewConnectionManager creates a new ConnectionManager.
func newConnectionManager(options []DialOption) *connectionManager {
return &connectionManager{
connections: make(map[BrokerAddress]*connectionStatus),
connection_options: options,
}
}
func (cm *connectionManager) getConnection(brokerURL, connectURL string, options ...DialOption) (*rpcConnection, error) {
cm.connectionsMutex.Lock()
defer cm.connectionsMutex.Unlock()
proxy := brokerURL == connectURL
broker := BrokerAddress{
ConnectURL: connectURL,
BrokerURL: brokerURL,
Proxy: proxy,
}
status, exists := cm.connections[broker]
if exists && status.Connected != nil {
return status.Connected, nil
}
rpcConn, err := newRpcConnection(connectURL, options...)
if err != nil {
return nil, err
}
if !exists {
cm.connections[broker] = &connectionStatus{}
}
cm.connections[broker].Connected = rpcConn
return rpcConn, nil
}