1
1
import asyncio
2
- from dataclasses import dataclass , field , fields , replace
2
+ from dataclasses import dataclass , field , fields
3
3
import time
4
4
import typing
5
5
6
6
import ezmsg .core as ez
7
+ from ezmsg .util .messages .axisarray import AxisArray
8
+ from ezmsg .util .messages .util import replace
7
9
import numpy as np
8
10
import numpy .typing as npt
9
11
import pylsl
10
12
11
- from .util import AxisArray
13
+ from .util import ClockSync
12
14
13
15
14
16
fmt2npdtype = {
@@ -76,42 +78,6 @@ class LSLInletState(ez.State):
76
78
clock_offset : float = 0.0
77
79
78
80
79
- class ClockSync :
80
- def __init__ (self , alpha : float = 0.1 , min_interval : float = 0.5 ):
81
- self .alpha = alpha
82
- self .min_interval = min_interval
83
-
84
- self .offset = 0.0
85
- self .last_update = 0.0
86
- self .count = 0
87
-
88
- async def update (self , force : bool = False , burst : int = 4 ) -> None :
89
- dur_since_last = time .time () - self .last_update
90
- dur_until_next = self .min_interval - dur_since_last
91
- if force or dur_until_next <= 0 :
92
- offsets = []
93
- for _ in range (burst ):
94
- if self .count % 2 :
95
- y , x = time .time (), pylsl .local_clock ()
96
- else :
97
- x , y = pylsl .local_clock (), time .time ()
98
- offsets .append (y - x )
99
- self .last_update = y
100
- await asyncio .sleep (0.001 )
101
- offset = np .mean (offsets )
102
-
103
- if self .count > 0 :
104
- # Exponential decay smoothing
105
- offset = (1 - self .alpha ) * self .offset + self .alpha * offset
106
- self .offset = offset
107
- self .count += burst
108
- else :
109
- await asyncio .sleep (dur_until_next )
110
-
111
- def convert_timestamp (self , lsl_timestamp : float ) -> float :
112
- return lsl_timestamp + self .offset
113
-
114
-
115
81
class LSLInletUnit (ez .Unit ):
116
82
"""
117
83
Represents a node in a graph that creates an LSL inlet and
@@ -223,22 +189,29 @@ def _reset_inlet(self) -> None:
223
189
ch_labels .append (str (len (ch_labels ) + 1 ))
224
190
# Pre-allocate a message template.
225
191
fs = inlet_info .nominal_srate ()
192
+ time_ax = (
193
+ AxisArray .TimeAxis (fs = fs )
194
+ if fs
195
+ else AxisArray .CoordinateAxis (
196
+ data = np .array ([]), dims = ["time" ], unit = "s"
197
+ )
198
+ )
226
199
self ._msg_template = AxisArray (
227
200
data = np .empty ((0 , n_ch )),
228
201
dims = ["time" , "ch" ],
229
202
axes = {
230
- "time" : AxisArray . Axis . TimeAxis (
231
- fs = fs if fs else 1.0
232
- ), # HACK: Use 1.0 for irregular rate.
233
- "ch" : AxisArray . Axis . SpaceAxis ( labels = ch_labels ),
203
+ "time" : time_ax ,
204
+ "ch" : AxisArray . CoordinateAxis (
205
+ data = np . array ( ch_labels ), dims = [ "ch" ]
206
+ ),
234
207
},
235
208
key = inlet_info .name (),
236
209
)
237
210
238
211
async def initialize (self ) -> None :
239
212
self ._reset_resolver ()
240
213
self ._reset_inlet ()
241
- # TODO: Let the clock_sync task do its job at the beginning.
214
+ await self . clock_sync . update ( force = True , burst = 1000 )
242
215
243
216
def shutdown (self ) -> None :
244
217
if self .STATE .inlet is not None :
@@ -252,8 +225,7 @@ def shutdown(self) -> None:
252
225
@ez .task
253
226
async def clock_sync_task (self ) -> None :
254
227
while True :
255
- force = self .clock_sync .count < 1000
256
- await self .clock_sync .update (force = force , burst = 1000 if force else 4 )
228
+ await self .clock_sync .update (force = False , burst = 4 )
257
229
258
230
@ez .subscriber (INPUT_SETTINGS )
259
231
async def on_settings (self , msg : LSLInletSettings ) -> None :
@@ -292,36 +264,32 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
292
264
if samples is None
293
265
else samples
294
266
)
267
+
295
268
if self .SETTINGS .use_arrival_time :
296
- # time.time() gives us NOW, but we want the timestamp of the 0th sample in the chunk
297
- t0 = time .time () - (timestamps [- 1 ] - timestamps [0 ])
269
+ timestamps = time .time () - (timestamps - timestamps [0 ])
298
270
else :
299
- t0 = self .clock_sync .convert_timestamp (timestamps [0 ])
271
+ timestamps = self .clock_sync .lsl2system (timestamps )
272
+
300
273
if self .SETTINGS .info .nominal_srate <= 0.0 :
301
- # Irregular rate streams need to be streamed sample-by-sample
302
- for ts , samp in zip (timestamps , data ):
303
- out_msg = replace (
304
- self ._msg_template ,
305
- data = samp [None , ...],
306
- axes = {
307
- ** self ._msg_template .axes ,
308
- "time" : replace (
309
- self ._msg_template .axes ["time" ],
310
- offset = t0 + (ts - timestamps [0 ]),
311
- ),
312
- },
313
- )
314
- yield self .OUTPUT_SIGNAL , out_msg
274
+ # Irregular rate stream uses CoordinateAxis for time so each sample has a timestamp.
275
+ out_time_ax = replace (
276
+ self ._msg_template .axes ["time" ],
277
+ data = np .array (timestamps ),
278
+ )
315
279
else :
316
- # Regular-rate streams can go in a chunk
317
- out_msg = replace (
318
- self ._msg_template ,
319
- data = data ,
320
- axes = {
321
- ** self ._msg_template .axes ,
322
- "time" : replace (self ._msg_template .axes ["time" ], offset = t0 ),
323
- },
280
+ # Regular rate uses a LinearAxis for time so we only need the time of the first sample.
281
+ out_time_ax = replace (
282
+ self ._msg_template .axes ["time" ], offset = timestamps [0 ]
324
283
)
325
- yield self .OUTPUT_SIGNAL , out_msg
284
+
285
+ out_msg = replace (
286
+ self ._msg_template ,
287
+ data = data ,
288
+ axes = {
289
+ ** self ._msg_template .axes ,
290
+ "time" : out_time_ax ,
291
+ },
292
+ )
293
+ yield self .OUTPUT_SIGNAL , out_msg
326
294
else :
327
295
await asyncio .sleep (0.001 )
0 commit comments