Skip to content

Commit e9e2f47

Browse files
author
timothy.tamm
committed
add unsubscribe mechanism
1 parent 4f29455 commit e9e2f47

7 files changed

+83
-9
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*.pyc
2+
*/__pycache__

comm/asyncClient.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def connect(self):
3333
def connection_made(self, transport):
3434
super().connection_made(transport)
3535
if len(self.subscriptions) > 0:
36-
self.subscribe(self.subscriptions)
36+
self.subscribe(self.subscriptions, Subscribe.SUBSCRIBE)
3737

3838
def msg_received(self, data, msg_type):
3939
msg = message_buffers[msg_type]()
@@ -43,10 +43,11 @@ def msg_received(self, data, msg_type):
4343
def write(self, msg, msg_type):
4444
super().write(msg, msg_type)
4545

46-
def subscribe(self, msg_types):
46+
def subscribe(self, msg_types, direction):
4747
msg = Subscribe()
4848
for msg_type in msg_types:
4949
msg.msg_types.append(msg_type.value)
50+
msg.dir = direction
5051
self.write(msg.SerializeToString(), MsgType.SUBSCRIBE)
5152

5253

comm/subscribe.proto

+6
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,10 @@ package mateROV;
44

55
message Subscribe {
66
repeated int32 msg_types = 1;
7+
required Direction dir = 2;
8+
9+
enum Direction {
10+
SUBSCRIBE = 0;
11+
UNSUBSCRIBE = 1;
12+
}
713
}

comm/subscribe_pb2.py

+34-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mockGuiModule.py

+15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ class MockGuiModule(ProtoModule):
1414
def __init__(self, loop):
1515
super().__init__(loop, ADDRESS, PORT,[MsgType.MOCK_MSG])
1616
self.value = -1
17+
self.sub_ticks = 0
18+
self.subbed = True
1719

1820
def msg_received(self, msg, msg_type):
1921
# This gets called whenever any message is received
@@ -27,6 +29,19 @@ def tick(self):
2729
# for this mock module we will print out the current value
2830
print('Current value: {}'.format(self.value))
2931

32+
# to demonstrate subscription and unsubscription,
33+
# we will periodically unsubscribe and resubscribe
34+
if self.sub_ticks > 100:
35+
if self.subbed:
36+
print('Unsubscribed!')
37+
self.unsubscribe([MsgType.MOCK_MSG])
38+
else:
39+
print('Subscribed!')
40+
self.subscribe([MsgType.MOCK_MSG])
41+
self.subbed = not self.subbed
42+
self.sub_ticks = 0
43+
self.sub_ticks += 1
44+
3045

3146
def main():
3247
loop = asyncio.get_event_loop()

modules/protoModule.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from comm.asyncClient import AsyncClient
2+
from comm.subscribe_pb2 import Subscribe
23

34
class ProtoModule:
45
def __init__(self, loop, addr, port, subscriptions):
@@ -13,7 +14,10 @@ def msg_received(self, msg, msg_type):
1314
raise NotImplementedError()
1415

1516
def subscribe(self, msg_types):
16-
self.client.subscribe(msg_types)
17+
self.client.subscribe(msg_types, Subscribe.SUBSCRIBE)
18+
19+
def unsubscribe(self, msg_types):
20+
self.client.subscribe(msg_types, Subscribe.UNSUBSCRIBE)
1721

1822
def write(self, msg, msg_type):
1923
self.client.write(msg, msg_type)

rovController.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import time, os, sys, logging
44
import asyncio # minimum Python 3.4, changed in 3.5.1
55
from comm.serverProto import ServerProto
6+
from comm.subscribe_pb2 import Subscribe
67
from comm.constants import *
78

89
ADDRESS = os.environ.get("BIND_ADDRESS","localhost")
@@ -17,12 +18,25 @@ def __init__(self, loop):
1718
coro = loop.create_server(lambda: ServerProto(self), ADDRESS, PORT)
1819
self.server = loop.run_until_complete(coro)
1920

21+
def _handle_subscriptions(self, protocol, data):
22+
if data.dir == Subscribe.SUBSCRIBE:
23+
self._add_subscriptions(protocol, data)
24+
else:
25+
self._remove_subscriptions(protocol, data)
26+
27+
def _remove_subscriptions(self, protocol, data):
28+
for msg_type in data.msg_types:
29+
m_type = MsgType(msg_type)
30+
if m_type in self.subs:
31+
self.subs[m_type].remove(protocol)
32+
2033
def _add_subscriptions(self, protocol, data):
2134
for msg_type in data.msg_types:
22-
if msg_type in self.subs:
23-
self.subs[MsgType(msg_type)].append(protocol)
35+
m_type = MsgType(msg_type)
36+
if m_type in self.subs:
37+
self.subs[m_type].append(protocol)
2438
else:
25-
self.subs[MsgType(msg_type)] = [protocol]
39+
self.subs[m_type] = [protocol]
2640

2741
def _forward_msg(self, msg, msg_type):
2842
if msg_type in self.subs:
@@ -34,7 +48,7 @@ def msg_received(self, protocol, msg, msg_type):
3448
if msg_type == MsgType.SUBSCRIBE:
3549
data = message_buffers[msg_type]()
3650
data.ParseFromString(msg)
37-
self._add_subscriptions(protocol, data)
51+
self._handle_subscriptions(protocol, data)
3852
else:
3953
self._forward_msg(msg, msg_type)
4054

0 commit comments

Comments
 (0)