Skip to content

Commit b163402

Browse files
authored
Merge pull request #25 from ezmsg-org/dev
Dev
2 parents b39b85c + fbd2450 commit b163402

File tree

8 files changed

+198
-128
lines changed

8 files changed

+198
-128
lines changed

.github/workflows/python-tests.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
rm liblsl-1.16.0-OSX_arm64.tar.bz2
4949
5050
- name: Install uv
51-
uses: astral-sh/setup-uv@v2
51+
uses: astral-sh/setup-uv@v3
5252
with:
5353
enable-cache: true
5454
cache-dependency-glob: "uv.lock"
@@ -57,7 +57,7 @@ jobs:
5757
run: uv python install ${{ matrix.python-version }}
5858

5959
- name: Install the project
60-
run: uv sync --all-extras --dev
60+
run: uv sync --all-extras
6161

6262
- name: Ruff check
6363
uses: astral-sh/ruff-action@v1

pyproject.toml

+5-9
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ dynamic = ["version"]
1515

1616
[project.optional-dependencies]
1717
test = [
18-
"ezmsg-sigproc>=1.5.0",
1918
"flake8>=7.1.1",
2019
"pytest-cov>=5.0.0",
2120
"pytest>=8.3.3",
21+
"ezmsg-sigproc>=1.5.0",
22+
]
23+
dev = [
24+
"ruff>=0.6.6",
25+
"typer>=0.13.0",
2226
]
2327

2428
[build-system]
@@ -33,11 +37,3 @@ version-file = "src/ezmsg/lsl/__version__.py"
3337

3438
[tool.hatch.build.targets.wheel]
3539
packages = ["src/ezmsg"]
36-
37-
[tool.uv]
38-
dev-dependencies = [
39-
"ruff>=0.6.6",
40-
"typer>=0.13.0",
41-
]
42-
43-
[tool.uv.sources]

src/ezmsg/lsl/inlet.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def __init__(self, *args, **kwargs) -> None:
103103
super().__init__(*args, **kwargs)
104104
self._msg_template: typing.Optional[AxisArray] = None
105105
self._fetch_buffer: typing.Optional[npt.NDArray] = None
106-
self._clock_sync = ClockSync()
106+
self._clock_sync = ClockSync(run_thread=False)
107107

108108
def _reset_resolver(self) -> None:
109109
self.STATE.resolver = pylsl.ContinuousResolver(pred=None, forget_after=30.0)
@@ -218,6 +218,13 @@ def shutdown(self) -> None:
218218
del self.STATE.resolver
219219
self.STATE.resolver = None
220220

221+
@ez.task
222+
async def update_clock(self) -> None:
223+
while True:
224+
if self.STATE.inlet is not None:
225+
self._clock_sync.run_once()
226+
await asyncio.sleep(0.1)
227+
221228
@ez.subscriber(INPUT_SETTINGS)
222229
async def on_settings(self, msg: LSLInletSettings) -> None:
223230
# The message may be full LSLInletSettings, a dict of settings, just the info, or dict of just info.

src/ezmsg/lsl/outlet.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import typing
23

34
import ezmsg.core as ez
@@ -52,12 +53,19 @@ class LSLOutletUnit(ez.Unit):
5253

5354
async def initialize(self) -> None:
5455
self._stream_created = False
55-
self._clock_sync = ClockSync()
56+
self._clock_sync = ClockSync(run_thread=False)
5657

5758
def shutdown(self) -> None:
5859
del self.STATE.outlet
5960
self.STATE.outlet = None
6061

62+
@ez.task
63+
async def update_clock(self) -> None:
64+
while True:
65+
if self.STATE.outlet is not None:
66+
self._clock_sync.run_once()
67+
await asyncio.sleep(0.1)
68+
6169
@ez.subscriber(INPUT_SIGNAL, zero_copy=True)
6270
async def lsl_outlet(self, msg: AxisArray) -> None:
6371
if self.STATE.outlet is None:

src/ezmsg/lsl/util.py

+24-10
Original file line numberDiff line numberDiff line change
@@ -33,27 +33,41 @@ def __new__(cls, *args, **kwargs):
3333
cls._instance = super().__new__(cls)
3434
return cls._instance
3535

36-
def __init__(self, alpha: float = 0.1, min_interval: float = 0.1):
36+
def __init__(self, alpha: float = 0.1, min_interval: float = 0.1, run_thread: bool = True):
3737
if not hasattr(self, "_initialized"):
3838
self._alpha = alpha
3939
self._interval = min_interval
40-
40+
self._initialized = True
41+
self._last_time = time.time() - 1e9
42+
self._running = False
43+
self._thread: typing.Optional[threading.Thread] = None
4144
# Do first burst so we have a real offset even before the thread starts.
4245
xs, ys = collect_timestamp_pairs(100)
4346
self._offset: float = np.mean(ys - xs)
4447

45-
self._thread = threading.Thread(target=self._run)
46-
self._thread.daemon = True
47-
self._initialized = True
48-
self._running = True
49-
self._thread.start()
48+
if run_thread:
49+
self.start()
50+
51+
def run_once(self, n: int = 4, force: bool = False):
52+
if force or (time.time() - self._last_time) > self._interval:
53+
xs, ys = collect_timestamp_pairs(n)
54+
offset = np.mean(ys - xs)
55+
self._offset = (1 - self._alpha) * self._offset + self._alpha * offset
56+
self._last_time = time.time()
5057

5158
def _run(self):
5259
while self._running:
5360
time.sleep(self._interval)
54-
xs, ys = collect_timestamp_pairs(4)
55-
offset = np.mean(ys - xs)
56-
self._offset = (1 - self._alpha) * self._offset + self._alpha * offset
61+
self.run_once(4, True)
62+
63+
def start(self):
64+
self._running = True
65+
self._thread = threading.Thread(target=self._run)
66+
self._thread.daemon = True
67+
self._thread.start()
68+
69+
def stop(self):
70+
self._running = False
5771

5872
@property
5973
def offset(self) -> float:

tests/test_inlet.py

+46-3
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
import pylsl
1313
import pytest
1414
import ezmsg.core as ez
15+
from ezmsg.sigproc.synth import Clock, ClockSettings
16+
from ezmsg.util.debuglog import DebugLog, DebugLogSettings
1517
from ezmsg.util.messages.axisarray import AxisArray
1618
from ezmsg.util.messagelogger import MessageLogger
1719
from ezmsg.util.messagecodec import message_log
18-
from ezmsg.util.terminate import TerminateOnTotal
20+
from ezmsg.util.terminate import TerminateOnTotal, TerminateOnTimeout, TerminateOnTimeoutSettings, \
21+
TerminateOnTotalSettings
1922

2023
from ezmsg.lsl.units import LSLInfo, LSLInletSettings, LSLInletUnit
2124

@@ -38,7 +41,10 @@ class DummyOutlet(ez.Unit):
3841
@ez.task
3942
async def run_dummy(self) -> None:
4043
info = pylsl.StreamInfo(
41-
name="dummy", type="dummy", channel_count=self.SETTINGS.n_chans, nominal_srate=self.SETTINGS.rate
44+
name="dummy",
45+
type="dummy",
46+
channel_count=self.SETTINGS.n_chans,
47+
nominal_srate=self.SETTINGS.rate,
4248
)
4349
outlet = pylsl.StreamOutlet(info)
4450
eff_rate = self.SETTINGS.rate or 100.0
@@ -54,8 +60,45 @@ async def run_dummy(self) -> None:
5460
n_pushed += n_interval
5561

5662

63+
def test_inlet_collection():
64+
"""The primary purpose of this test is to verify that LSLInletUnit can be included in a collection."""
65+
66+
class LSLTestSystemSettings(ez.Settings):
67+
stream_name: str = "dummy"
68+
stream_type: str = "dummy"
69+
70+
class LSLTestSystem(ez.Collection):
71+
SETTINGS = LSLTestSystemSettings
72+
73+
DUMMY = DummyOutlet()
74+
INLET = LSLInletUnit()
75+
LOGGER = DebugLog()
76+
TERM = TerminateOnTotal()
77+
78+
def configure(self) -> None:
79+
self.INLET.apply_settings(
80+
LSLInletSettings(
81+
LSLInfo(
82+
name=self.SETTINGS.stream_name, type=self.SETTINGS.stream_type
83+
)
84+
)
85+
)
86+
self.LOGGER.apply_settings(DebugLogSettings(name="test_inlet_collection"))
87+
self.TERM.apply_settings(TerminateOnTotalSettings(total=10))
88+
89+
def network(self) -> ez.NetworkDefinition:
90+
return (
91+
(self.INLET.OUTPUT_SIGNAL, self.LOGGER.INPUT),
92+
(self.LOGGER.OUTPUT, self.TERM.INPUT_MESSAGE),
93+
)
94+
95+
# This next line raises an error if the ClockSync object runs its own thread.
96+
system = LSLTestSystem()
97+
ez.run(SYSTEM=system)
98+
99+
57100
@pytest.mark.parametrize("rate", [100.0, 0.0])
58-
def test_inlet_system(rate: float):
101+
def test_inlet_comps_conns(rate: float):
59102
n_messages = 20
60103
file_path = Path(tempfile.gettempdir())
61104
file_path = file_path / Path("test_inlet_system.txt")

tests/test_util.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
import time
22

33
import numpy as np
4+
import pytest
45

56
from ezmsg.lsl.util import ClockSync, collect_timestamp_pairs
67

78

8-
def test_clock_sync():
9+
@pytest.mark.parametrize("own_thread", [True, False])
10+
def test_clock_sync(own_thread: bool):
911
tol = 10e-3 # 1 msec
1012

11-
clock_sync = ClockSync()
12-
# Let it run a bit to get a stable estimate.
13-
time.sleep(1.0)
13+
clock_sync = ClockSync(run_thread=own_thread)
14+
if own_thread:
15+
# Let it run a bit to get a stable estimate.
16+
time.sleep(1.0)
17+
clock_sync.stop()
18+
else:
19+
for ix in range(10):
20+
clock_sync.run_once()
21+
time.sleep(0.1)
1422

1523
offsets = []
1624
for _ in range(10):

0 commit comments

Comments
 (0)