Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup and format dead code and README #10

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/python-publish-ezmsg-lsl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,3 @@ jobs:

- name: Publish package distributions to PyPI
run: uv publish
# uses: pypa/gh-action-pypi-publish@release/v1
# with:
# packages_dir: artifact/
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ Interface to [LSL](https://labstreaminglayer.readthedocs.io/) in [ezmsg](https:/

## Installation

TODO: `pip install ezmsg-lsl`

For now `pip install ezmsg-lsl@git+https://github.com/labstreaminglayer/ezmsg-lsl`
`pip install ezmsg-lsl`

## Dependencies
* `ezmsg`
* `pylsl`
* `numpy`

## Usage

See the `examples` folder for more details.

## Setup (Development)
## Developers

1. Install `ezmsg` either using `pip install ezmsg` or set up the repo for development as described in the `ezmsg` readme.
2. `cd` to this directory (`ezmsg-lsl`) and run `pip install -e .`
3. LSL modules are available under `import ezmsg.lsl`
We use [`uv`](https://docs.astral.sh/uv/getting-started/installation/) for development. It is not strictly required, but if you intend to contribute to ezmsg-lsl then using `uv` will lead to the smoothest collaboration.

See the `examples` folder for more details.
1. Install [`uv`](https://docs.astral.sh/uv/getting-started/installation/) if not already installed.
2. Fork ezmsg-lsl and clone your fork to your local computer.
3. Open a terminal and `cd` to the cloned folder.
4. `uv sync` to create a .venv and install dependencies.
5. After editing code and making commits, Run the test suite before making a PR: `uv run pytest tests`
* Currently, there are no substantial tests.
12 changes: 4 additions & 8 deletions examples/lsl_inlet_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,22 @@ def configure(self) -> None:
self.INLET.apply_settings(
LSLInletSettings(
stream_name=self.SETTINGS.stream_name,
stream_type=self.SETTINGS.stream_type
stream_type=self.SETTINGS.stream_type,
)
)
self.LOGGER.apply_settings(
DebugLogSettings(
name=self.SETTINGS.logger_name,
max_length=self.SETTINGS.logger_max_length
max_length=self.SETTINGS.logger_max_length,
)
)

def network(self) -> ez.NetworkDefinition:
return (
(self.INLET.OUTPUT_SIGNAL, self.LOGGER.INPUT),
)
return ((self.INLET.OUTPUT_SIGNAL, self.LOGGER.INPUT),)


if __name__ == "__main__":
# Run the websocket system
system = LSLDemoSystem()
system.apply_settings(
LSLDemoSystemSettings(stream_name="", stream_type="EEG")
)
system.apply_settings(LSLDemoSystemSettings(stream_name="", stream_type="EEG"))
ez.run(SYSTEM=system)
1 change: 1 addition & 0 deletions examples/lsl_outlet_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
In this example, we create a System comprising a simple graph from a
EEGSynth to an LSL Outlet.
"""

from typing import Tuple

import ezmsg.core as ez
Expand Down
69 changes: 43 additions & 26 deletions src/ezmsg/lsl/units.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
class LSLOutletSettings(ez.Settings):
stream_name: typing.Optional[str] = None
stream_type: typing.Optional[str] = None
map_file: typing.Optional[str] = None # Path to file containing a list of channel names and locations.
map_file: typing.Optional[str] = (
None # Path to file containing a list of channel names and locations.
)


class LSLOutletState(ez.State):
Expand Down Expand Up @@ -166,6 +168,7 @@ class LSLInletUnit(ez.Unit):
stream_name: The `name` of the created LSL outlet.
stream_type: The `type` of the created LSL outlet.
"""

SETTINGS = LSLInletSettings
STATE = LSLInletState

Expand All @@ -186,8 +189,10 @@ def __init__(self, *args, **kwargs) -> None:
if k.startswith("stream_"):
replace_keys.add(k)
if len(replace_keys) > 0:
ez.logger.warning(f"LSLInlet kwargs beginning with 'stream_' deprecated. Found {replace_keys}. "
f"See LSLInfo dataclass.")
ez.logger.warning(
f"LSLInlet kwargs beginning with 'stream_' deprecated. Found {replace_keys}. "
f"See LSLInfo dataclass."
)
for k in replace_keys:
kwargs[k[7:]] = kwargs.pop(k)

Expand All @@ -201,19 +206,26 @@ def __init__(self, *args, **kwargs) -> None:
async def initialize(self) -> None:
# If name, type, and host are all provided, then create the StreamInfo directly and
# create the inlet directly from that info.
if all([_ is not None for _ in [
self.SETTINGS.info.name,
self.SETTINGS.info.type,
self.SETTINGS.info.channel_count,
self.SETTINGS.info.channel_format
]]):
if all(
[
_ is not None
for _ in [
self.SETTINGS.info.name,
self.SETTINGS.info.type,
self.SETTINGS.info.channel_count,
self.SETTINGS.info.channel_format,
]
]
):
info = pylsl.StreamInfo(
name=self.SETTINGS.info.name,
type=self.SETTINGS.info.type,
channel_count=self.SETTINGS.info.channel_count,
channel_format=self.SETTINGS.info.channel_format
channel_format=self.SETTINGS.info.channel_format,
)
self.STATE.inlet = pylsl.StreamInlet(
info, max_chunklen=1, processing_flags=self.SETTINGS.processing_flags
)
self.STATE.inlet = pylsl.StreamInlet(info, max_chunklen=1, processing_flags=self.SETTINGS.processing_flags)
else:
# Build the predicate string. This uses XPATH syntax and can filter on anything in the stream info. e.g.,
# `"name='BioSemi'" or "type='EEG' and starts-with(name,'BioSemi') and count(info/desc/channel)=32"`
Expand Down Expand Up @@ -246,9 +258,7 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
results: list[pylsl.StreamInfo] = self.STATE.resolver.results()
if len(results):
self.STATE.inlet = pylsl.StreamInlet(
results[0],
max_chunklen=1,
processing_flags=pylsl.proc_ALL
results[0], max_chunklen=1, processing_flags=pylsl.proc_ALL
)
else:
await asyncio.sleep(0.5)
Expand All @@ -260,7 +270,9 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
n_ch = inlet_info.channel_count()
if fmt in fmt2npdtype:
dtype = fmt2npdtype[fmt]
n_buff = int(self.SETTINGS.local_buffer_dur * inlet_info.nominal_srate()) or 1000
n_buff = (
int(self.SETTINGS.local_buffer_dur * inlet_info.nominal_srate()) or 1000
)
self._fetch_buffer = np.zeros((n_buff, n_ch), dtype=dtype)
ch_labels = []
chans = inlet_info.desc().child("channels")
Expand All @@ -277,10 +289,12 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
data=np.empty((0, n_ch)),
dims=["time", "ch"],
axes={
"time": AxisArray.Axis.TimeAxis(fs=fs if fs else 1.0), # HACK: Use 1.0 for irregular rate.
"ch": AxisArray.Axis.SpaceAxis(labels=ch_labels)
"time": AxisArray.Axis.TimeAxis(
fs=fs if fs else 1.0
), # HACK: Use 1.0 for irregular rate.
"ch": AxisArray.Axis.SpaceAxis(labels=ch_labels),
},
key=inlet_info.name()
key=inlet_info.name(),
)

while self.clock_sync.count < 1000:
Expand All @@ -290,16 +304,19 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
while self.STATE.inlet is not None:
if self._fetch_buffer is not None:
samples, timestamps = self.STATE.inlet.pull_chunk(
max_samples=self._fetch_buffer.shape[0],
dest_obj=self._fetch_buffer
max_samples=self._fetch_buffer.shape[0], dest_obj=self._fetch_buffer
)
else:
samples, timestamps = self.STATE.inlet.pull_chunk()
samples = np.array(samples)

# Attempt to update the clock offset (shared across all instances)
if len(timestamps):
data = self._fetch_buffer[:len(timestamps)].copy() if samples is None else samples
data = (
self._fetch_buffer[: len(timestamps)].copy()
if samples is None
else samples
)
if self.SETTINGS.use_arrival_time:
# time.time() gives us NOW, but we want the timestamp of the 0th sample in the chunk
t0 = time.time() - (timestamps[-1] - timestamps[0])
Expand All @@ -315,9 +332,9 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
**msg_template.axes,
"time": replace(
msg_template.axes["time"],
offset=t0 + (ts - timestamps[0])
)
}
offset=t0 + (ts - timestamps[0]),
),
},
)
yield self.OUTPUT_SIGNAL, out_msg
else:
Expand All @@ -327,8 +344,8 @@ async def lsl_pull(self) -> typing.AsyncGenerator:
data=data,
axes={
**msg_template.axes,
"time": replace(msg_template.axes["time"], offset=t0)
}
"time": replace(msg_template.axes["time"], offset=t0),
},
)
yield self.OUTPUT_SIGNAL, out_msg
else:
Expand Down
4 changes: 3 additions & 1 deletion src/ezmsg/lsl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ class CustomAxis(AxisArray.Axis):
labels: typing.List[str] = field(default_factory=lambda: [])

@classmethod
def SpaceAxis(cls, labels: typing.List[str]): # , locs: typing.Optional[npt.NDArray] = None):
def SpaceAxis(
cls, labels: typing.List[str]
): # , locs: typing.Optional[npt.NDArray] = None):
return cls(unit="mm", labels=labels)


Expand Down
13 changes: 8 additions & 5 deletions tests/test_inlet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
These unit tests aren't really testable in a runner without a complicated setup with inlets and outlets.
This code exists mostly to use during development and debugging.
"""
import os
import json
from pathlib import Path
Expand All @@ -12,7 +16,8 @@


def test_inlet_init_defaults():
my_inlet = LSLInletUnit()
settings = LSLInletSettings(name="", type="")
_ = LSLInletUnit(settings)
assert True


Expand Down Expand Up @@ -50,11 +55,9 @@ def test_inlet_init_with_settings():

comps = {
"SRC": LSLInletUnit(info=LSLInfo(name="BrainVision RDA", type="EEG")),
"SINK": AxarrReceiver(num_msgs=10_000, output_fn=file_path)
"SINK": AxarrReceiver(num_msgs=10_000, output_fn=file_path),
}
conns = (
(comps["SRC"].OUTPUT_SIGNAL, comps["SINK"].INPUT_SIGNAL),
)
conns = ((comps["SRC"].OUTPUT_SIGNAL, comps["SINK"].INPUT_SIGNAL),)
ez.run(components=comps, connections=conns)

tvecs = []
Expand Down
Loading