-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathduplex.py
133 lines (102 loc) · 4.12 KB
/
duplex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
import logging
log = logging.getLogger('Duplex')
class DuplexProtocol(asyncio.Protocol):
def __init__(self, inbox: asyncio.Queue):
self.inbox = inbox
self.transport = None
self.connected = asyncio.Future()
self.disconnected = asyncio.Future()
async def get_transport(self):
await self.connected
return self.transport
def connection_made(self, transport):
self.transport = transport
self.connected.set_result(transport.get_extra_info('peername'))
def data_received(self, data):
self.inbox.put_nowait(data)
def eof_received(self):
self.inbox.put_nowait(None)
self.transport.close()
def connection_lost(self, exception):
if self.disconnected.done():
return
if not self.transport.is_closing():
self.transport.close()
if exception is not None:
self.disconnected.set_exception(exception)
else:
self.disconnected.set_result(None)
class Duplex:
def __init__(self, inbox: asyncio.Queue, outbox: asyncio.Queue, *args):
self.inbox = inbox
self.outbox = outbox
self.transport = None
self.protocol = None
self.send_job = None
if ... not in args:
raise NotImplementedError("Duplex should be instanciated with `new()`.")
@property
def connected(self):
"""Resolves with (host, port) when the connection is established."""
return self.protocol.connected
@property
def disconnected(self):
"""Resolves when the connection is closed or lost. Raises if an error occurred."""
return self.protocol.disconnected
def is_running(self):
"""Check if connected and exchanging data."""
started = self.protocol.connected.done() and self.transport is not None
ended = self.protocol.disconnected.done() or self.transport.is_closing()
return started and not ended
def close(self):
"""Close the connection."""
self.transport.close()
async def closing(self):
"""Wait for the connection to close."""
self.close()
return await self.disconnected
def abort(self):
"""Interrupt the connection without notifying or cleaning up."""
self.transport.abort()
async def aborting(self):
"""Wait for the connection to be interrupted."""
self.abort()
return await self.disconnected
async def _send_job(self):
await self.connected
try:
await self._send_loop()
except Exception as e:
log.error(e)
self.protocol.disconnected.set_exception(e)
async def _send_loop(self):
await self.connected
while self.is_running():
data = await self.outbox.get()
self.transport.write(data)
@classmethod
def new(cls, inbox: asyncio.Queue | None = None, outbox: asyncio.Queue | None = None):
"""
Create a new Duplex instance.
Will use provided queues or create new ones if not provided.
"""
recv_queue = inbox or asyncio.Queue()
send_queue = outbox or asyncio.Queue()
duplex = cls(recv_queue, send_queue, ...)
return duplex, duplex.inbox, duplex.outbox
async def connect(self, host: str = '127.0.0.1', port: int = 8765):
"""Connect to a Duplex server."""
self.protocol = DuplexProtocol(self.inbox)
loop = asyncio.get_running_loop()
self.transport, _ = await loop.create_connection(lambda: self.protocol, host, port)
self.send_job = loop.create_task(self._send_job())
return await self.connected
async def listen(self, host: str = '0.0.0.0', port: int = 8765):
"""Listen for incoming connections."""
self.protocol = DuplexProtocol(self.inbox)
loop = asyncio.get_running_loop()
await loop.create_server(lambda: self.protocol, host, port)
self.transport = await self.protocol.get_transport()
self.send_job = loop.create_task(self._send_job())
return await self.connected