-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
executable file
·70 lines (61 loc) · 2.38 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
from __future__ import print_function
import rospy
from sensor_msgs.msg import JointState
from formant.sdk.agent.v1 import Client as FormantClient
class FormantExampleNode:
def __init__(self):
"""
Integration with Formant agent
"""
rospy.init_node("formant_example_node")
# Create ROS subscribers
self._subscriptions = [
rospy.Subscriber(
"/joint_states", JointState, self._joint_states_callback, queue_size=10,
),
]
# Ignore throttled or agent unavailable exceptions
self._formant_client = FormantClient(
agent_url="localhost:5501", ignore_throttled=True, ignore_unavailable=True
)
rospy.Timer(rospy.Duration(1), self._capture_state)
rospy.spin()
def _joint_states_callback(self, msg):
"""
Integration with sensor_msgs/JointState.
The turtlebot has two joints 'wheel_right_joint' and 'wheel_left_joint'.
This function parses joint states into Formant Numeric streams
which are sent to the agent.
"""
joint_state_positive = {}
joint_range = range(len(msg.name))
for i in joint_range:
# for each joint post to the numeric stream "wheel_joint_position"
# with a tag for the joint
self._formant_client.post_numeric(
"wheel_joint_position", msg.position[i], tags={"joint": msg.name[i]}
)
# set the state for each joint in the dict
joint_state_positive[msg.name[i]] = msg.position[i] > 0
# send the joint state
self._formant_client.post_bitset(
"wheel_joint_position_state_positive", joint_state_positive
)
def _capture_state(self, event=None):
"""
Use a timed callback method for data not available through topic callbacks,
or data that needs to be polled periodically.
"""
# send the system state on a text stream
self._formant_client.post_text("system_state.mode", "RUNNING")
# send a bitset of the system state
self._formant_client.post_bitset(
"system_state",
{"RUNNING": True, "STOPPED": False, "ERROR": False, "UNKNOWN": False},
)
if __name__ == "__main__":
try:
FormantExampleNode()
except rospy.ROSInterruptException:
pass