diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5230bfb4e..ed04eba50 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,8 +17,8 @@ jobs: sudo apt-get install -y capnproto libcapnp-dev - name: "Check" run: cargo check --all - - name: "Build" - run: cargo build --all + - name: "Build" + run: cargo build --all - name: "Test" run: cargo test --all diff --git a/Cargo.lock b/Cargo.lock index 52ad8549e..818069b9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,6 +696,18 @@ dependencies = [ "zenoh-config", ] +[[package]] +name = "dora-node-api-python" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "eyre", + "futures", + "pyo3", + "serde_yaml", + "tokio", +] + [[package]] name = "dora-operator-api" version = "0.1.0" @@ -713,14 +725,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dora-python-binding" -version = "0.1.0" -dependencies = [ - "eyre", - "pyo3", -] - [[package]] name = "dora-runtime" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a8e12adcc..ff2d58a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "apis/rust/node", "apis/rust/operator", "apis/rust/operator/macros", - "apis/python/binding", + "apis/python/node", "binaries/coordinator", "binaries/runtime", "libraries/extensions/message", diff --git a/apis/python/binding/Cargo.toml b/apis/python/binding/Cargo.toml deleted file mode 100644 index 49dacc3ec..000000000 --- a/apis/python/binding/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "dora-python-binding" -version = "0.1.0" -edition = "2021" -license = "Apache-2.0" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -pyo3 = "0.16.1" -eyre = "0.6.7" diff --git a/apis/python/binding/src/lib.rs b/apis/python/binding/src/lib.rs deleted file mode 100644 index 6500cbe90..000000000 --- a/apis/python/binding/src/lib.rs +++ /dev/null @@ -1,71 +0,0 @@ -use eyre::Context; -use pyo3::{ - prelude::*, - types::{PyBytes, PyDict, PyString}, -}; -use std::{collections::BTreeMap, sync::Arc}; - -#[derive(Clone)] -pub struct PythonBinding { - app: String, - func: String, - python_wrapper: Arc>, -} - -impl PythonBinding { - pub fn try_new(app: &str, function: &str) -> eyre::Result { - pyo3::prepare_freethreaded_python(); - Python::with_gil(|py| { - let file = py - .import(app) - .wrap_err(format!("Importing '{app}' did not succeed."))?; - // convert Function into a PyObject - let python_function = file - .getattr(function) - .wrap_err(format!("'{function}' was not found in '{app}'."))?; - Ok(Self { - app: app.to_string(), - func: function.to_string(), - python_wrapper: Arc::new(python_function.to_object(py)), - }) - }) - } - - pub fn call( - &self, - inputs: &BTreeMap>, - ) -> eyre::Result>> { - Python::with_gil(|py| { - let py_inputs = PyDict::new(py); - - for (k, value) in inputs.iter() { - py_inputs.set_item(k, PyBytes::new(py, value))?; - } - - let results = self - .python_wrapper - .call(py, (py_inputs,), None) - .wrap_err(format!( - "'{}.{}' call did not succeed.", - self.app, self.func - ))?; - - let py_outputs = results.cast_as::(py).unwrap(); - let mut outputs = BTreeMap::new(); - for (k, v) in py_outputs.into_iter() { - let slice = v - .cast_as::() - .or_else(|e| eyre::bail!("{e}"))? - .as_bytes() - .to_vec(); - let key = k - .cast_as::() - .or_else(|e| eyre::bail!("{e}"))? - .to_string(); - outputs.insert(key, slice); - } - - Ok(outputs) - }) - } -} diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml new file mode 100644 index 000000000..d20be2856 --- /dev/null +++ b/apis/python/node/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "dora-node-api-python" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { path = "../../rust/node" } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py38"] } +eyre = "0.6" +futures = "0.3.21" +tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } +serde_yaml = "0.8.23" + + +[lib] +name = "dora" +crate-type = ["cdylib"] diff --git a/apis/python/node/README.md b/apis/python/node/README.md new file mode 100644 index 000000000..408cd83c7 --- /dev/null +++ b/apis/python/node/README.md @@ -0,0 +1,12 @@ +This crate corresponds to the Node API for Dora. + +## Building + +To build the Python module for development: + +````bash +python3 -m venv .env +source .env/bin/activate +pip install maturin +maturin develop +```` diff --git a/apis/python/node/pyproject.toml b/apis/python/node/pyproject.toml new file mode 100644 index 000000000..e95999725 --- /dev/null +++ b/apis/python/node/pyproject.toml @@ -0,0 +1,9 @@ +[build-system] +requires = ["maturin>=0.12,<0.13"] +build-backend = "maturin" + +[project] +name = "dora" + +[tool.maturin] +features = ["pyo3/extension-module"] diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs new file mode 100644 index 000000000..9827e610d --- /dev/null +++ b/apis/python/node/src/lib.rs @@ -0,0 +1,108 @@ +use dora_node_api::config::{DataId, NodeId}; +use dora_node_api::{DoraNode, Input}; +use eyre::{Context, Result}; +use futures::StreamExt; +use pyo3::prelude::*; +use pyo3::types::PyBytes; +use std::sync::Arc; +use std::thread; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; + +#[pyclass] +pub struct Node { + id: NodeId, + pub rx_input: Receiver, + pub tx_output: Sender<(String, Vec)>, +} + +pub struct PyInput(Input); + +impl IntoPy for PyInput { + fn into_py(self, py: Python) -> PyObject { + (self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py) + } +} + +#[pymethods] +impl Node { + #[new] + pub fn new() -> Result { + let id = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + + let (tx_input, rx_input) = mpsc::channel(1); + let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(1); + + // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream + // through tokio channel. + // It would have been difficult to expose the FutureStream of Dora directly. + thread::spawn(move || -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread().build()?; + rt.block_on(async move { + let node = Arc::new(DoraNode::init_from_env().await?); + let _node = node.clone(); + let receive_handle = tokio::spawn(async move { + let mut inputs = _node.inputs().await.unwrap(); + while let Some(input) = inputs.next().await { + tx_input.send(input).await? + } + Result::<_, eyre::Error>::Ok(()) + }); + let send_handle = tokio::spawn(async move { + while let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice()).await? + } + Result::<_, eyre::Error>::Ok(()) + }); + let (receiver, sender) = tokio::join!(receive_handle, send_handle); + receiver + .wrap_err("Handle to the receiver failed")? + .wrap_err("Receiving messages from receiver channel failed")?; + sender + .wrap_err("Handle to the sender failed")? + .wrap_err("Sending messages using sender channel failed")?; + Ok(()) + }) + }); + + Ok(Node { + id, + rx_input, + tx_output, + }) + } + + pub fn next(&mut self) -> PyResult> { + self.__next__() + } + + pub fn __next__(&mut self) -> PyResult> { + Ok(self.rx_input.blocking_recv().map(PyInput)) + } + + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + pub fn send_output(&self, output_str: String, data: &PyBytes) -> Result<()> { + Ok(self + .tx_output + .blocking_send((output_str, data.as_bytes().to_vec())) + .wrap_err("Could not send output")?) + } + + pub fn id(&self) -> String { + self.id.to_string() + } +} + +#[pymodule] +fn dora(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::().unwrap(); + Ok(()) +} diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 3dcd9efa2..62af0ce7a 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -32,11 +32,11 @@ pub async fn init( } #[async_trait] -pub trait CommunicationLayer { +pub trait CommunicationLayer: Send + Sync { async fn subscribe<'a>( &'a self, topic: &str, - ) -> Result> + 'a>>, BoxError>; + ) -> Result> + Send + 'a>>, BoxError>; async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>; @@ -61,12 +61,12 @@ impl CommunicationLayer for ZenohCommunicationLayer { async fn subscribe<'a>( &'a self, topic: &str, - ) -> Result> + 'a>>, BoxError> { + ) -> Result> + Send + 'a>>, BoxError> { zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic)) .reliable() .await .map(|s| { - let trait_object: Pin> + 'a>> = + let trait_object: Pin> + Send + 'a>> = Box::pin(s.map(|s| s.value.payload.contiguous().into_owned())); trait_object }) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 340d4945a..21510d119 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -138,6 +138,7 @@ impl Drop for DoraNode { } } +#[derive(Debug)] pub struct Input { pub id: DataId, pub data: Vec, diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 1dcca35e2..03c359192 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -22,3 +22,15 @@ time = "0.3.9" futures-concurrency = "2.0.3" rand = "0.8.5" dora-core = { version = "0.1.0", path = "../../libraries/core" } + +[[example]] +name = "source_timer" +path = "examples/nodes/rust/source_timer.rs" + +[[example]] +name = "sink_logger" +path = "examples/nodes/rust/sink_logger.rs" + +[[example]] +name = "random_number" +path = "examples/nodes/rust/random_number.rs" diff --git a/binaries/coordinator/examples/graphs/python_test.yml b/binaries/coordinator/examples/graphs/python_test.yml new file mode 100644 index 000000000..66044ba7a --- /dev/null +++ b/binaries/coordinator/examples/graphs/python_test.yml @@ -0,0 +1,29 @@ +communication: + zenoh: + prefix: /foo + +nodes: + - id: static-string + custom: + run: python examples/nodes/python/static_string.py + outputs: + - string + + - id: python-printer + custom: + run: python examples/nodes/python/printer.py + inputs: + string: static-string/string + time2: rust-timer/time + + - id: rust-timer + custom: + run: cargo run --example source_timer + outputs: + - time + + - id: rust-logger + custom: + run: cargo run --example sink_logger + inputs: + time: static-string/string \ No newline at end of file diff --git a/binaries/coordinator/examples/nodes/python/printer.py b/binaries/coordinator/examples/nodes/python/printer.py new file mode 100644 index 000000000..45943ed06 --- /dev/null +++ b/binaries/coordinator/examples/nodes/python/printer.py @@ -0,0 +1,8 @@ +from dora import Node + +node = Node() + +for id, value in node: + print(f"From Python, id: {id}, value: {value}") if value is not [] else None + +print("printer finished") diff --git a/binaries/coordinator/examples/nodes/python/static_string.py b/binaries/coordinator/examples/nodes/python/static_string.py new file mode 100644 index 000000000..8b158fd04 --- /dev/null +++ b/binaries/coordinator/examples/nodes/python/static_string.py @@ -0,0 +1,11 @@ +import time + +from dora import Node + +node = Node() + +for i in range(100): + node.send_output("string", b"Hello World") + time.sleep(0.1) + +print("static string finished") diff --git a/binaries/coordinator/examples/random_number.rs b/binaries/coordinator/examples/nodes/rust/random_number.rs similarity index 100% rename from binaries/coordinator/examples/random_number.rs rename to binaries/coordinator/examples/nodes/rust/random_number.rs diff --git a/binaries/coordinator/examples/rate_limit.rs b/binaries/coordinator/examples/nodes/rust/rate_limit.rs similarity index 100% rename from binaries/coordinator/examples/rate_limit.rs rename to binaries/coordinator/examples/nodes/rust/rate_limit.rs diff --git a/binaries/coordinator/examples/example_sink_logger.rs b/binaries/coordinator/examples/nodes/rust/sink_logger.rs similarity index 100% rename from binaries/coordinator/examples/example_sink_logger.rs rename to binaries/coordinator/examples/nodes/rust/sink_logger.rs diff --git a/binaries/coordinator/examples/example_source_timer.rs b/binaries/coordinator/examples/nodes/rust/source_timer.rs similarity index 100% rename from binaries/coordinator/examples/example_source_timer.rs rename to binaries/coordinator/examples/nodes/rust/source_timer.rs