forked from ivcosla/dmsg-1
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
123 lines (105 loc) · 2.04 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package dmsg
import (
"net"
"sync"
"github.com/SkycoinProject/dmsg/cipher"
)
// Listener listens for remote-initiated transports.
type Listener struct {
pk cipher.PubKey
port uint16
mx sync.Mutex // protects 'accept'
accept chan *Transport
done chan struct{}
once sync.Once
}
func newListener(pk cipher.PubKey, port uint16) *Listener {
return &Listener{
pk: pk,
port: port,
accept: make(chan *Transport, AcceptBufferSize),
done: make(chan struct{}),
}
}
// Accept accepts a connection.
func (l *Listener) Accept() (net.Conn, error) {
return l.AcceptTransport()
}
// Close closes the listener.
func (l *Listener) Close() error {
if l.close() {
return nil
}
return ErrClientClosed
}
func (l *Listener) close() (closed bool) {
l.once.Do(func() {
closed = true
l.mx.Lock()
defer l.mx.Unlock()
close(l.done)
for {
select {
case <-l.accept:
default:
close(l.accept)
return
}
}
})
return closed
}
func (l *Listener) isClosed() bool {
select {
case <-l.done:
return true
default:
return false
}
}
// Addr returns the listener's address.
func (l *Listener) Addr() net.Addr {
return Addr{
PK: l.pk,
Port: l.port,
}
}
// AcceptTransport accepts a transport connection.
func (l *Listener) AcceptTransport() (*Transport, error) {
select {
case <-l.done:
return nil, ErrClientClosed
case tp, ok := <-l.accept:
if !ok {
return nil, ErrClientClosed
}
return tp, nil
}
}
// Type returns the transport type.
func (l *Listener) Type() string {
return Type
}
// IntroduceTransport handles a transport after receiving a REQUEST frame.
func (l *Listener) IntroduceTransport(tp *Transport) error {
l.mx.Lock()
defer l.mx.Unlock()
if l.isClosed() {
return ErrClientClosed
}
select {
case <-l.done:
return ErrClientClosed
case l.accept <- tp:
if err := tp.WriteAccept(); err != nil {
return err
}
go tp.Serve()
return nil
default:
if err := tp.Close(); err != nil {
log.WithError(err).Warn("Failed to close transport")
}
return ErrClientAcceptMaxed
}
}