From f7d288ba69f15dc896c839dad551c72f408c2058 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 28 Jul 2022 09:49:15 +0200 Subject: [PATCH 01/18] init `dora-node-api-python` --- Cargo.lock | 9 ++++++++- Cargo.toml | 1 + apis/python/node/Cargo.toml | 10 ++++++++++ apis/python/node/src/lib.rs | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 apis/python/node/Cargo.toml create mode 100644 apis/python/node/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 52ad8549e..88c7d6a89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -696,6 +696,14 @@ dependencies = [ "zenoh-config", ] +[[package]] +name = "dora-node-api-python" +version = "0.1.0" +dependencies = [ + "dora-node-api", + "pyo3", +] + [[package]] name = "dora-operator-api" version = "0.1.0" @@ -748,7 +756,6 @@ version = "0.1.0" dependencies = [ "opentelemetry", "opentelemetry-jaeger", - "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a8e12adcc..f7c977b5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "apis/rust/operator", "apis/rust/operator/macros", "apis/python/binding", + "apis/python/node", "binaries/coordinator", "binaries/runtime", "libraries/extensions/message", diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml new file mode 100644 index 000000000..0ebf4111b --- /dev/null +++ b/apis/python/node/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "dora-node-api-python" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { path = "../../rust/node" } +pyo3 = "0.16" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs new file mode 100644 index 000000000..3f487b5e2 --- /dev/null +++ b/apis/python/node/src/lib.rs @@ -0,0 +1,15 @@ +use dora_node_api::DoraNode; +use pyo3::prelude::*; + +#[pyclass] +#[repr(transparent)] +pub struct PyDoraNode { + pub node: DoraNode, +} + +/// This module is implemented in Rust. +#[pymodule] +fn wonnx(_py: Python, m: &PyModule) -> PyResult<()> { + m.add_class::().unwrap(); + Ok(()) +} From edbd10e410d781124666fd1071b4e0d651af31c7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 28 Jul 2022 09:49:59 +0200 Subject: [PATCH 02/18] add `Send` trait to the communicationLayer The `Send` Trait is required by pyo3 for the python binding --- apis/rust/node/src/communication.rs | 2 +- apis/rust/node/src/lib.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 3dcd9efa2..9d1bbf194 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -12,7 +12,7 @@ use crate::{config::CommunicationConfig, BoxError}; pub async fn init( communication_config: &CommunicationConfig, -) -> eyre::Result> { +) -> eyre::Result> { match communication_config { CommunicationConfig::Zenoh { config: zenoh_config, diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 340d4945a..d11871003 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -14,7 +14,7 @@ pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, - communication: Box, + communication: Box, } impl DoraNode { @@ -138,6 +138,7 @@ impl Drop for DoraNode { } } +#[derive(Debug)] pub struct Input { pub id: DataId, pub data: Vec, From 515ae91151a0e4202461d9e0c7bd00c56cb96202 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 11:41:09 +0200 Subject: [PATCH 03/18] making `CommunicationLayer` Send and Sync --- apis/rust/node/src/communication.rs | 10 +++++----- apis/rust/node/src/lib.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 9d1bbf194..62af0ce7a 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -12,7 +12,7 @@ use crate::{config::CommunicationConfig, BoxError}; pub async fn init( communication_config: &CommunicationConfig, -) -> eyre::Result> { +) -> eyre::Result> { match communication_config { CommunicationConfig::Zenoh { config: zenoh_config, @@ -32,11 +32,11 @@ pub async fn init( } #[async_trait] -pub trait CommunicationLayer { +pub trait CommunicationLayer: Send + Sync { async fn subscribe<'a>( &'a self, topic: &str, - ) -> Result> + 'a>>, BoxError>; + ) -> Result> + Send + 'a>>, BoxError>; async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>; @@ -61,12 +61,12 @@ impl CommunicationLayer for ZenohCommunicationLayer { async fn subscribe<'a>( &'a self, topic: &str, - ) -> Result> + 'a>>, BoxError> { + ) -> Result> + Send + 'a>>, BoxError> { zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic)) .reliable() .await .map(|s| { - let trait_object: Pin> + 'a>> = + let trait_object: Pin> + Send + 'a>> = Box::pin(s.map(|s| s.value.payload.contiguous().into_owned())); trait_object }) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index d11871003..21510d119 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -14,7 +14,7 @@ pub const STOP_TOPIC: &str = "__dora_rs_internal__operator_stopped"; pub struct DoraNode { id: NodeId, node_config: NodeRunConfig, - communication: Box, + communication: Box, } impl DoraNode { From 128bf6ca43e9e03eed9696a265bc360c4de3d010 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 11:41:54 +0200 Subject: [PATCH 04/18] Adding `next`, `send_output` for Python node API Adding `next` and `send_output` requires an async threadpool as the communication Layer defined by the Middleware Layer returns an async Future stream. I solve this issue by adding a tokio runtime on a separater thread that is connected with two channels. One for sending data and one for receiving data. Those channel are then exposed synchronously to Python. This should not be cause for concern as channel are really fast. Looking at Zenoh Python client, they are heavily using `pyo3-asyncio` implementation of futures to pass Rust futures into Python. This can be a solution as well, but, from previous experiment, I'm concerned about performance on such solution. I have experienced that putting futures from Rust into the `asyncio` queue to be slow. I'm concerned also by mixing `async` and `sync` code in Python, as it might be blocking. This might requires 2 threadpool in Python. This might seem as heavy overhead for some operations. --- apis/python/node/Cargo.toml | 10 +++++ apis/python/node/README.md | 12 +++++ apis/python/node/src/lib.rs | 88 ++++++++++++++++++++++++++++++++++--- 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 apis/python/node/README.md diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 0ebf4111b..64512ab8a 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,3 +8,13 @@ edition = "2021" [dependencies] dora-node-api = { path = "../../rust/node" } pyo3 = "0.16" +eyre = "0.6" +pollster = "0.2" +futures = "0.3.21" +tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } +serde_yaml = "0.8.23" + + +[lib] +name = "dora" +crate-type = ["cdylib"] diff --git a/apis/python/node/README.md b/apis/python/node/README.md new file mode 100644 index 000000000..408cd83c7 --- /dev/null +++ b/apis/python/node/README.md @@ -0,0 +1,12 @@ +This crate corresponds to the Node API for Dora. + +## Building + +To build the Python module for development: + +````bash +python3 -m venv .env +source .env/bin/activate +pip install maturin +maturin develop +```` diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 3f487b5e2..7035d46be 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,15 +1,93 @@ -use dora_node_api::DoraNode; +use dora_node_api::config::DataId; +use dora_node_api::{DoraNode, Input}; +use eyre::Context; +use futures::StreamExt; use pyo3::prelude::*; - +use pyo3::types::PyBytes; +use std::sync::Arc; +use std::thread; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; #[pyclass] -#[repr(transparent)] +// #[repr(transparent)] pub struct PyDoraNode { - pub node: DoraNode, + // pub node: DoraNode, + pub rx_input: Receiver, + pub tx_output: Sender<(String, Vec)>, +} + +pub struct PyInput(Input); + +impl IntoPy for PyInput { + fn into_py(self, py: Python) -> PyObject { + (self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py) + } +} + +#[pymethods] +impl PyDoraNode { + #[staticmethod] + pub fn init_from_env() -> Self { + let (tx_input, rx_input) = mpsc::channel(10); + let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(10); + + // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream + // through tokio channel. + // It would have been difficult to expose the FutureStream of Dora directly. + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + rt.block_on(async move { + let node = Arc::new(DoraNode::init_from_env().await.unwrap()); + let _node = node.clone(); + let receive_handle = tokio::spawn(async move { + let mut inputs = _node.inputs().await.unwrap(); + loop { + if let Some(input) = inputs.next().await { + tx_input.send(input).await.unwrap() + }; + } + }); + let send_handle = tokio::spawn(async move { + loop { + if let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice()).await.unwrap() + }; + } + }); + let (_, _) = tokio::join!(receive_handle, send_handle); + }); + }); + + PyDoraNode { + rx_input, + tx_output, + } + } + + pub fn next(&mut self) -> PyResult> { + self.__next__() + } + + pub fn __next__(&mut self) -> PyResult> { + if let Some(input) = self.rx_input.blocking_recv() { + Ok(Some(PyInput(input))) + } else { + Ok(None) + } + } + + pub fn send_output(&self, output_str: String, data: Vec) -> () { + self.tx_output + .blocking_send((output_str, data)) + .wrap_err("Could not send output") + .unwrap() + } } /// This module is implemented in Rust. #[pymodule] -fn wonnx(_py: Python, m: &PyModule) -> PyResult<()> { +fn dora(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::().unwrap(); Ok(()) } From 1903c636cbe844f45e48cbae33a4c4febb037658 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 13:52:13 +0200 Subject: [PATCH 05/18] Refactoring `python-node-api` --- apis/python/node/Cargo.toml | 2 +- apis/python/node/src/lib.rs | 27 +++++++++------------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 64512ab8a..93eba50b8 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -2,6 +2,7 @@ name = "dora-node-api-python" version = "0.1.0" edition = "2021" +license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -9,7 +10,6 @@ edition = "2021" dora-node-api = { path = "../../rust/node" } pyo3 = "0.16" eyre = "0.6" -pollster = "0.2" futures = "0.3.21" tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } serde_yaml = "0.8.23" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 7035d46be..39283a5c8 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -8,8 +8,8 @@ use std::sync::Arc; use std::thread; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; + #[pyclass] -// #[repr(transparent)] pub struct PyDoraNode { // pub node: DoraNode, pub rx_input: Receiver, @@ -28,8 +28,8 @@ impl IntoPy for PyInput { impl PyDoraNode { #[staticmethod] pub fn init_from_env() -> Self { - let (tx_input, rx_input) = mpsc::channel(10); - let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(10); + let (tx_input, rx_input) = mpsc::channel(1); + let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(1); // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream // through tokio channel. @@ -41,18 +41,14 @@ impl PyDoraNode { let _node = node.clone(); let receive_handle = tokio::spawn(async move { let mut inputs = _node.inputs().await.unwrap(); - loop { - if let Some(input) = inputs.next().await { - tx_input.send(input).await.unwrap() - }; + while let Some(input) = inputs.next().await { + tx_input.send(input).await.unwrap() } }); let send_handle = tokio::spawn(async move { - loop { - if let Some((output_str, data)) = rx_output.recv().await { - let output_id = DataId::from(output_str); - node.send_output(&output_id, data.as_slice()).await.unwrap() - }; + while let Some((output_str, data)) = rx_output.recv().await { + let output_id = DataId::from(output_str); + node.send_output(&output_id, data.as_slice()).await.unwrap() } }); let (_, _) = tokio::join!(receive_handle, send_handle); @@ -70,11 +66,7 @@ impl PyDoraNode { } pub fn __next__(&mut self) -> PyResult> { - if let Some(input) = self.rx_input.blocking_recv() { - Ok(Some(PyInput(input))) - } else { - Ok(None) - } + Ok(self.rx_input.blocking_recv().map(PyInput)) } pub fn send_output(&self, output_str: String, data: Vec) -> () { @@ -85,7 +77,6 @@ impl PyDoraNode { } } -/// This module is implemented in Rust. #[pymodule] fn dora(_py: Python, m: &PyModule) -> PyResult<()> { m.add_class::().unwrap(); From c6b671b4ee1ad645655b9133ed82b4b043b2756e Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 13:35:36 +0200 Subject: [PATCH 06/18] moving `node` examples into a specific folder --- binaries/coordinator/Cargo.toml | 12 ++++++++++++ .../coordinator/examples/nodes/python/printer.py | 9 +++++++++ binaries/coordinator/examples/nodes/python/timer.py | 10 ++++++++++ .../examples/{ => nodes/rust}/random_number.rs | 0 .../examples/{ => nodes/rust}/rate_limit.rs | 0 .../rust/sink_logger.rs} | 0 .../rust/source_timer.rs} | 0 7 files changed, 31 insertions(+) create mode 100644 binaries/coordinator/examples/nodes/python/printer.py create mode 100644 binaries/coordinator/examples/nodes/python/timer.py rename binaries/coordinator/examples/{ => nodes/rust}/random_number.rs (100%) rename binaries/coordinator/examples/{ => nodes/rust}/rate_limit.rs (100%) rename binaries/coordinator/examples/{example_sink_logger.rs => nodes/rust/sink_logger.rs} (100%) rename binaries/coordinator/examples/{example_source_timer.rs => nodes/rust/source_timer.rs} (100%) diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 1dcca35e2..03c359192 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -22,3 +22,15 @@ time = "0.3.9" futures-concurrency = "2.0.3" rand = "0.8.5" dora-core = { version = "0.1.0", path = "../../libraries/core" } + +[[example]] +name = "source_timer" +path = "examples/nodes/rust/source_timer.rs" + +[[example]] +name = "sink_logger" +path = "examples/nodes/rust/sink_logger.rs" + +[[example]] +name = "random_number" +path = "examples/nodes/rust/random_number.rs" diff --git a/binaries/coordinator/examples/nodes/python/printer.py b/binaries/coordinator/examples/nodes/python/printer.py new file mode 100644 index 000000000..1a7828475 --- /dev/null +++ b/binaries/coordinator/examples/nodes/python/printer.py @@ -0,0 +1,9 @@ +from dora import PyDoraNode + +node = PyDoraNode.init_from_env() + +for i in range(100): + value = node.next() + print(value) if value is not [] else None + +print("printer finished") diff --git a/binaries/coordinator/examples/nodes/python/timer.py b/binaries/coordinator/examples/nodes/python/timer.py new file mode 100644 index 000000000..4ac1596cf --- /dev/null +++ b/binaries/coordinator/examples/nodes/python/timer.py @@ -0,0 +1,10 @@ +from dora import PyDoraNode + +node = PyDoraNode.init_from_env() +import time + +for i in range(100): + node.send_output("time", b"awef") + time.sleep(0.1) + +print("printer finished") diff --git a/binaries/coordinator/examples/random_number.rs b/binaries/coordinator/examples/nodes/rust/random_number.rs similarity index 100% rename from binaries/coordinator/examples/random_number.rs rename to binaries/coordinator/examples/nodes/rust/random_number.rs diff --git a/binaries/coordinator/examples/rate_limit.rs b/binaries/coordinator/examples/nodes/rust/rate_limit.rs similarity index 100% rename from binaries/coordinator/examples/rate_limit.rs rename to binaries/coordinator/examples/nodes/rust/rate_limit.rs diff --git a/binaries/coordinator/examples/example_sink_logger.rs b/binaries/coordinator/examples/nodes/rust/sink_logger.rs similarity index 100% rename from binaries/coordinator/examples/example_sink_logger.rs rename to binaries/coordinator/examples/nodes/rust/sink_logger.rs diff --git a/binaries/coordinator/examples/example_source_timer.rs b/binaries/coordinator/examples/nodes/rust/source_timer.rs similarity index 100% rename from binaries/coordinator/examples/example_source_timer.rs rename to binaries/coordinator/examples/nodes/rust/source_timer.rs From 9000e681117bc73db95c3ee3d7d81f7da7922aa6 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 14:42:41 +0200 Subject: [PATCH 07/18] Adding `__iter__` function --- Cargo.lock | 5 +++++ apis/python/node/src/lib.rs | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 88c7d6a89..9ba95fa66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -701,7 +701,11 @@ name = "dora-node-api-python" version = "0.1.0" dependencies = [ "dora-node-api", + "eyre", + "futures", "pyo3", + "serde_yaml", + "tokio", ] [[package]] @@ -756,6 +760,7 @@ version = "0.1.0" dependencies = [ "opentelemetry", "opentelemetry-jaeger", + "tokio", ] [[package]] diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 39283a5c8..208e66266 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -69,6 +69,10 @@ impl PyDoraNode { Ok(self.rx_input.blocking_recv().map(PyInput)) } + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + pub fn send_output(&self, output_str: String, data: Vec) -> () { self.tx_output .blocking_send((output_str, data)) From d69776a85f156a34f183e3afab769f6a5822598c Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 16:04:49 +0200 Subject: [PATCH 08/18] Adding `eyre` Error handling derived into `PyErr` --- apis/python/node/Cargo.toml | 2 +- apis/python/node/src/lib.rs | 56 +++++++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 93eba50b8..cd355f3b9 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" [dependencies] dora-node-api = { path = "../../rust/node" } -pyo3 = "0.16" +pyo3 = { version = "0.16", features = ["eyre"] } eyre = "0.6" futures = "0.3.21" tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 208e66266..125317e8e 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,6 +1,6 @@ -use dora_node_api::config::DataId; +use dora_node_api::config::{DataId, NodeId}; use dora_node_api::{DoraNode, Input}; -use eyre::Context; +use eyre::{Context, Result}; use futures::StreamExt; use pyo3::prelude::*; use pyo3::types::PyBytes; @@ -11,7 +11,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; #[pyclass] pub struct PyDoraNode { - // pub node: DoraNode, + id: NodeId, pub rx_input: Receiver, pub tx_output: Sender<(String, Vec)>, } @@ -26,39 +26,55 @@ impl IntoPy for PyInput { #[pymethods] impl PyDoraNode { - #[staticmethod] - pub fn init_from_env() -> Self { + #[new] + pub fn new() -> Result { + let id = { + let raw = + std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?; + serde_yaml::from_str(&raw).context("failed to deserialize operator config")? + }; + let (tx_input, rx_input) = mpsc::channel(1); let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec)>(1); // Dispatching a tokio threadpool enables us to conveniently use Dora Future stream // through tokio channel. // It would have been difficult to expose the FutureStream of Dora directly. - thread::spawn(move || { - let rt = tokio::runtime::Builder::new_multi_thread().build().unwrap(); + thread::spawn(move || -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread().build()?; rt.block_on(async move { - let node = Arc::new(DoraNode::init_from_env().await.unwrap()); + let node = Arc::new(DoraNode::init_from_env().await?); let _node = node.clone(); let receive_handle = tokio::spawn(async move { let mut inputs = _node.inputs().await.unwrap(); while let Some(input) = inputs.next().await { - tx_input.send(input).await.unwrap() + tx_input.send(input).await? } + Result::<_, eyre::Error>::Ok(()) }); let send_handle = tokio::spawn(async move { while let Some((output_str, data)) = rx_output.recv().await { let output_id = DataId::from(output_str); - node.send_output(&output_id, data.as_slice()).await.unwrap() + node.send_output(&output_id, data.as_slice()).await? } + Result::<_, eyre::Error>::Ok(()) }); - let (_, _) = tokio::join!(receive_handle, send_handle); - }); + let (receiver, sender) = tokio::join!(receive_handle, send_handle); + receiver + .wrap_err("Handle to the receiver failed")? + .wrap_err("Receiving messages from receiver channel failed")?; + sender + .wrap_err("Handle to the sender failed")? + .wrap_err("Sending messages using sender channel failed")?; + Ok(()) + }) }); - PyDoraNode { + Ok(PyDoraNode { + id, rx_input, tx_output, - } + }) } pub fn next(&mut self) -> PyResult> { @@ -73,11 +89,15 @@ impl PyDoraNode { slf } - pub fn send_output(&self, output_str: String, data: Vec) -> () { - self.tx_output + pub fn send_output(&self, output_str: String, data: Vec) -> Result<()> { + Ok(self + .tx_output .blocking_send((output_str, data)) - .wrap_err("Could not send output") - .unwrap() + .wrap_err("Could not send output")?) + } + + pub fn id(&self) -> String { + self.id.to_string() } } From 79da342b62cc1557422969e045a99a8d3a89c965 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 16:05:10 +0200 Subject: [PATCH 09/18] Adding test graph for python node --- .../examples/graphs/python_test.yml | 29 +++++++++++++++++++ .../examples/nodes/python/printer.py | 7 ++--- .../examples/nodes/python/static_string.py | 11 +++++++ .../examples/nodes/python/timer.py | 10 ------- 4 files changed, 43 insertions(+), 14 deletions(-) create mode 100644 binaries/coordinator/examples/graphs/python_test.yml create mode 100644 binaries/coordinator/examples/nodes/python/static_string.py delete mode 100644 binaries/coordinator/examples/nodes/python/timer.py diff --git a/binaries/coordinator/examples/graphs/python_test.yml b/binaries/coordinator/examples/graphs/python_test.yml new file mode 100644 index 000000000..6d80c651a --- /dev/null +++ b/binaries/coordinator/examples/graphs/python_test.yml @@ -0,0 +1,29 @@ +communication: + zenoh: + prefix: /foo + +nodes: + - id: static-string + custom: + run: python examples/nodes/python/static_string.py + outputs: + - string + + - id: python-printer + custom: + run: python examples/nodes/python/printer.py + inputs: + time: static-string/string + time2: rust-timer/time + + - id: rust-timer + custom: + run: cargo run --example source_timer + outputs: + - time + + - id: rust-logger + custom: + run: cargo run --example sink_logger + inputs: + time: static-string/string \ No newline at end of file diff --git a/binaries/coordinator/examples/nodes/python/printer.py b/binaries/coordinator/examples/nodes/python/printer.py index 1a7828475..b2fa18c7c 100644 --- a/binaries/coordinator/examples/nodes/python/printer.py +++ b/binaries/coordinator/examples/nodes/python/printer.py @@ -1,9 +1,8 @@ from dora import PyDoraNode -node = PyDoraNode.init_from_env() +node = PyDoraNode() -for i in range(100): - value = node.next() - print(value) if value is not [] else None +for id, value in node: + print(f"From Python, id: {id}, value: {value}") if value is not [] else None print("printer finished") diff --git a/binaries/coordinator/examples/nodes/python/static_string.py b/binaries/coordinator/examples/nodes/python/static_string.py new file mode 100644 index 000000000..ee8c7aa7b --- /dev/null +++ b/binaries/coordinator/examples/nodes/python/static_string.py @@ -0,0 +1,11 @@ +import time + +from dora import PyDoraNode + +node = PyDoraNode() + +for i in range(100): + node.send_output("string", b"Hello World") + time.sleep(0.1) + +print("static string finished") diff --git a/binaries/coordinator/examples/nodes/python/timer.py b/binaries/coordinator/examples/nodes/python/timer.py deleted file mode 100644 index 4ac1596cf..000000000 --- a/binaries/coordinator/examples/nodes/python/timer.py +++ /dev/null @@ -1,10 +0,0 @@ -from dora import PyDoraNode - -node = PyDoraNode.init_from_env() -import time - -for i in range(100): - node.send_output("time", b"awef") - time.sleep(0.1) - -print("printer finished") From 6b2b082e71b04f6b6b5f342175422aa40d157677 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 29 Jul 2022 16:11:16 +0200 Subject: [PATCH 10/18] Renaming `PyDoraNode` to `Node` --- apis/python/binding/Cargo.toml | 11 --- apis/python/binding/src/lib.rs | 71 ------------------- apis/python/node/src/lib.rs | 8 +-- .../examples/nodes/python/printer.py | 4 +- .../examples/nodes/python/static_string.py | 4 +- 5 files changed, 8 insertions(+), 90 deletions(-) delete mode 100644 apis/python/binding/Cargo.toml delete mode 100644 apis/python/binding/src/lib.rs diff --git a/apis/python/binding/Cargo.toml b/apis/python/binding/Cargo.toml deleted file mode 100644 index 49dacc3ec..000000000 --- a/apis/python/binding/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "dora-python-binding" -version = "0.1.0" -edition = "2021" -license = "Apache-2.0" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -pyo3 = "0.16.1" -eyre = "0.6.7" diff --git a/apis/python/binding/src/lib.rs b/apis/python/binding/src/lib.rs deleted file mode 100644 index 6500cbe90..000000000 --- a/apis/python/binding/src/lib.rs +++ /dev/null @@ -1,71 +0,0 @@ -use eyre::Context; -use pyo3::{ - prelude::*, - types::{PyBytes, PyDict, PyString}, -}; -use std::{collections::BTreeMap, sync::Arc}; - -#[derive(Clone)] -pub struct PythonBinding { - app: String, - func: String, - python_wrapper: Arc>, -} - -impl PythonBinding { - pub fn try_new(app: &str, function: &str) -> eyre::Result { - pyo3::prepare_freethreaded_python(); - Python::with_gil(|py| { - let file = py - .import(app) - .wrap_err(format!("Importing '{app}' did not succeed."))?; - // convert Function into a PyObject - let python_function = file - .getattr(function) - .wrap_err(format!("'{function}' was not found in '{app}'."))?; - Ok(Self { - app: app.to_string(), - func: function.to_string(), - python_wrapper: Arc::new(python_function.to_object(py)), - }) - }) - } - - pub fn call( - &self, - inputs: &BTreeMap>, - ) -> eyre::Result>> { - Python::with_gil(|py| { - let py_inputs = PyDict::new(py); - - for (k, value) in inputs.iter() { - py_inputs.set_item(k, PyBytes::new(py, value))?; - } - - let results = self - .python_wrapper - .call(py, (py_inputs,), None) - .wrap_err(format!( - "'{}.{}' call did not succeed.", - self.app, self.func - ))?; - - let py_outputs = results.cast_as::(py).unwrap(); - let mut outputs = BTreeMap::new(); - for (k, v) in py_outputs.into_iter() { - let slice = v - .cast_as::() - .or_else(|e| eyre::bail!("{e}"))? - .as_bytes() - .to_vec(); - let key = k - .cast_as::() - .or_else(|e| eyre::bail!("{e}"))? - .to_string(); - outputs.insert(key, slice); - } - - Ok(outputs) - }) - } -} diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 125317e8e..bb8fb7620 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -10,7 +10,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; #[pyclass] -pub struct PyDoraNode { +pub struct Node { id: NodeId, pub rx_input: Receiver, pub tx_output: Sender<(String, Vec)>, @@ -25,7 +25,7 @@ impl IntoPy for PyInput { } #[pymethods] -impl PyDoraNode { +impl Node { #[new] pub fn new() -> Result { let id = { @@ -70,7 +70,7 @@ impl PyDoraNode { }) }); - Ok(PyDoraNode { + Ok(Node { id, rx_input, tx_output, @@ -103,6 +103,6 @@ impl PyDoraNode { #[pymodule] fn dora(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_class::().unwrap(); + m.add_class::().unwrap(); Ok(()) } diff --git a/binaries/coordinator/examples/nodes/python/printer.py b/binaries/coordinator/examples/nodes/python/printer.py index b2fa18c7c..45943ed06 100644 --- a/binaries/coordinator/examples/nodes/python/printer.py +++ b/binaries/coordinator/examples/nodes/python/printer.py @@ -1,6 +1,6 @@ -from dora import PyDoraNode +from dora import Node -node = PyDoraNode() +node = Node() for id, value in node: print(f"From Python, id: {id}, value: {value}") if value is not [] else None diff --git a/binaries/coordinator/examples/nodes/python/static_string.py b/binaries/coordinator/examples/nodes/python/static_string.py index ee8c7aa7b..8b158fd04 100644 --- a/binaries/coordinator/examples/nodes/python/static_string.py +++ b/binaries/coordinator/examples/nodes/python/static_string.py @@ -1,8 +1,8 @@ import time -from dora import PyDoraNode +from dora import Node -node = PyDoraNode() +node = Node() for i in range(100): node.send_output("string", b"Hello World") From 7813615b2ca904b86e95d81a8113f439e5657296 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 30 Jul 2022 15:53:20 +0200 Subject: [PATCH 11/18] Remove legacy python binding from `Cargo.toml` --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f7c977b5b..ff2d58a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ members = [ "apis/rust/node", "apis/rust/operator", "apis/rust/operator/macros", - "apis/python/binding", "apis/python/node", "binaries/coordinator", "binaries/runtime", From 366ec10b1a0f73cd2c8bb757e6ec8ae5ba47e715 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 1 Aug 2022 10:32:07 +0200 Subject: [PATCH 12/18] Adding `extension-module` for pyo3 extension --- Cargo.lock | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ba95fa66..818069b9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,14 +725,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dora-python-binding" -version = "0.1.0" -dependencies = [ - "eyre", - "pyo3", -] - [[package]] name = "dora-runtime" version = "0.1.0" From 9f53dc6904a35b43d324d0831d817ed302ff4b71 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 1 Aug 2022 10:33:42 +0200 Subject: [PATCH 13/18] Adding feature flag to pyo3 --- apis/python/node/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index cd355f3b9..bd2af15be 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" [dependencies] dora-node-api = { path = "../../rust/node" } -pyo3 = { version = "0.16", features = ["eyre"] } +pyo3 = { version = "0.16", features = ["eyre", "extension-module"] } eyre = "0.6" futures = "0.3.21" tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } From 44713538e7f210d432782180547f79d8e636ada4 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 1 Aug 2022 19:16:34 +0200 Subject: [PATCH 14/18] adding `--no-default-features` for pyo3 extension --- .github/workflows/ci.yml | 8 +++++--- apis/python/node/Cargo.toml | 7 ++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5230bfb4e..a43c12c3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,10 +17,12 @@ jobs: sudo apt-get install -y capnproto libcapnp-dev - name: "Check" run: cargo check --all - - name: "Build" - run: cargo build --all + - name: "Build" + # Has to use `--no-default-features` to avoid crash. + # See: https://pyo3.rs/latest/faq.html#i-cant-run-cargo-test-im-having-linker-issues-like-symbol-not-found-or-undefined-reference-to-_pyexc_systemerror + run: cargo build --all --no-default-features - name: "Test" - run: cargo test --all + run: cargo test --all --no-default-features clippy: name: "Clippy" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index bd2af15be..124615c6f 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" [dependencies] dora-node-api = { path = "../../rust/node" } -pyo3 = { version = "0.16", features = ["eyre", "extension-module"] } +pyo3 = { version = "0.16", features = ["eyre"] } eyre = "0.6" futures = "0.3.21" tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } @@ -18,3 +18,8 @@ serde_yaml = "0.8.23" [lib] name = "dora" crate-type = ["cdylib"] + + +[features] +extension-module = ["pyo3/extension-module"] +default = ["extension-module"] From f2b3bd5a215ff28e74e9fc4fe594d82b05e05db9 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 3 Aug 2022 09:40:24 +0200 Subject: [PATCH 15/18] adding `abi3-py38` to pyo3 for best target support --- apis/python/node/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 124615c6f..99d6bb047 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" [dependencies] dora-node-api = { path = "../../rust/node" } -pyo3 = { version = "0.16", features = ["eyre"] } +pyo3 = { version = "0.16", features = ["eyre", "abi3-py38"] } eyre = "0.6" futures = "0.3.21" tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] } From 4ff9a59ad732a339d0431e28c7ccfd0fffbfcc58 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 3 Aug 2022 12:16:32 +0200 Subject: [PATCH 16/18] Use `PyBytes` as it is faster than Vec --- apis/python/node/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index bb8fb7620..9827e610d 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -89,10 +89,10 @@ impl Node { slf } - pub fn send_output(&self, output_str: String, data: Vec) -> Result<()> { + pub fn send_output(&self, output_str: String, data: &PyBytes) -> Result<()> { Ok(self .tx_output - .blocking_send((output_str, data)) + .blocking_send((output_str, data.as_bytes().to_vec())) .wrap_err("Could not send output")?) } From 4158b1b0de26ac0498e12ad28f461d87df42c444 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 5 Aug 2022 14:54:56 +0200 Subject: [PATCH 17/18] use `pyproject.toml` for activating `pyo3/extension-module` Since the merge of https://github.com/PyO3/maturin/pull/605 , we can add features flag when maturin build the python project, removing the need to always enabling `extension-module` flag when building. The `extension-module` flag creates crash when building and testing outside of maturin. Previous fix required to disable default feature flag when building and testing which is not ideal. --- .github/workflows/ci.yml | 6 ++---- apis/python/node/Cargo.toml | 5 ----- apis/python/node/pyproject.toml | 9 +++++++++ 3 files changed, 11 insertions(+), 9 deletions(-) create mode 100644 apis/python/node/pyproject.toml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a43c12c3d..ed04eba50 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,11 +18,9 @@ jobs: - name: "Check" run: cargo check --all - name: "Build" - # Has to use `--no-default-features` to avoid crash. - # See: https://pyo3.rs/latest/faq.html#i-cant-run-cargo-test-im-having-linker-issues-like-symbol-not-found-or-undefined-reference-to-_pyexc_systemerror - run: cargo build --all --no-default-features + run: cargo build --all - name: "Test" - run: cargo test --all --no-default-features + run: cargo test --all clippy: name: "Clippy" diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 99d6bb047..d20be2856 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -18,8 +18,3 @@ serde_yaml = "0.8.23" [lib] name = "dora" crate-type = ["cdylib"] - - -[features] -extension-module = ["pyo3/extension-module"] -default = ["extension-module"] diff --git a/apis/python/node/pyproject.toml b/apis/python/node/pyproject.toml new file mode 100644 index 000000000..e95999725 --- /dev/null +++ b/apis/python/node/pyproject.toml @@ -0,0 +1,9 @@ +[build-system] +requires = ["maturin>=0.12,<0.13"] +build-backend = "maturin" + +[project] +name = "dora" + +[tool.maturin] +features = ["pyo3/extension-module"] From e661324253920216e03cae18476b1cd04ec2b882 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 5 Aug 2022 15:09:03 +0200 Subject: [PATCH 18/18] change test graph input name `time` to` string` --- binaries/coordinator/examples/graphs/python_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/coordinator/examples/graphs/python_test.yml b/binaries/coordinator/examples/graphs/python_test.yml index 6d80c651a..66044ba7a 100644 --- a/binaries/coordinator/examples/graphs/python_test.yml +++ b/binaries/coordinator/examples/graphs/python_test.yml @@ -13,7 +13,7 @@ nodes: custom: run: python examples/nodes/python/printer.py inputs: - time: static-string/string + string: static-string/string time2: rust-timer/time - id: rust-timer