Skip to content

Commit

Permalink
Merge pull request #54 from dora-rs/dora-node-python
Browse files Browse the repository at this point in the history
Python Node API
  • Loading branch information
phil-opp authored Aug 5, 2022
2 parents ae9043c + e661324 commit b6a8d5d
Show file tree
Hide file tree
Showing 19 changed files with 229 additions and 97 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ jobs:
sudo apt-get install -y capnproto libcapnp-dev
- name: "Check"
run: cargo check --all
- name: "Build"
run: cargo build --all
- name: "Build"
run: cargo build --all
- name: "Test"
run: cargo test --all

Expand Down
20 changes: 12 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = [
"apis/rust/node",
"apis/rust/operator",
"apis/rust/operator/macros",
"apis/python/binding",
"apis/python/node",
"binaries/coordinator",
"binaries/runtime",
"libraries/extensions/message",
Expand Down
11 changes: 0 additions & 11 deletions apis/python/binding/Cargo.toml

This file was deleted.

71 changes: 0 additions & 71 deletions apis/python/binding/src/lib.rs

This file was deleted.

20 changes: 20 additions & 0 deletions apis/python/node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "dora-node-api-python"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { path = "../../rust/node" }
pyo3 = { version = "0.16", features = ["eyre", "abi3-py38"] }
eyre = "0.6"
futures = "0.3.21"
tokio = { version = "1.17.0", features = ["rt", "sync", "macros"] }
serde_yaml = "0.8.23"


[lib]
name = "dora"
crate-type = ["cdylib"]
12 changes: 12 additions & 0 deletions apis/python/node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
This crate corresponds to the Node API for Dora.

## Building

To build the Python module for development:

````bash
python3 -m venv .env
source .env/bin/activate
pip install maturin
maturin develop
````
9 changes: 9 additions & 0 deletions apis/python/node/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[build-system]
requires = ["maturin>=0.12,<0.13"]
build-backend = "maturin"

[project]
name = "dora"

[tool.maturin]
features = ["pyo3/extension-module"]
108 changes: 108 additions & 0 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use dora_node_api::config::{DataId, NodeId};
use dora_node_api::{DoraNode, Input};
use eyre::{Context, Result};
use futures::StreamExt;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};

#[pyclass]
pub struct Node {
id: NodeId,
pub rx_input: Receiver<Input>,
pub tx_output: Sender<(String, Vec<u8>)>,
}

pub struct PyInput(Input);

impl IntoPy<PyObject> for PyInput {
fn into_py(self, py: Python) -> PyObject {
(self.0.id.to_string(), PyBytes::new(py, &self.0.data)).into_py(py)
}
}

#[pymethods]
impl Node {
#[new]
pub fn new() -> Result<Self> {
let id = {
let raw =
std::env::var("DORA_NODE_ID").wrap_err("env variable DORA_NODE_ID must be set")?;
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};

let (tx_input, rx_input) = mpsc::channel(1);
let (tx_output, mut rx_output) = mpsc::channel::<(String, Vec<u8>)>(1);

// Dispatching a tokio threadpool enables us to conveniently use Dora Future stream
// through tokio channel.
// It would have been difficult to expose the FutureStream of Dora directly.
thread::spawn(move || -> Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread().build()?;
rt.block_on(async move {
let node = Arc::new(DoraNode::init_from_env().await?);
let _node = node.clone();
let receive_handle = tokio::spawn(async move {
let mut inputs = _node.inputs().await.unwrap();
while let Some(input) = inputs.next().await {
tx_input.send(input).await?
}
Result::<_, eyre::Error>::Ok(())
});
let send_handle = tokio::spawn(async move {
while let Some((output_str, data)) = rx_output.recv().await {
let output_id = DataId::from(output_str);
node.send_output(&output_id, data.as_slice()).await?
}
Result::<_, eyre::Error>::Ok(())
});
let (receiver, sender) = tokio::join!(receive_handle, send_handle);
receiver
.wrap_err("Handle to the receiver failed")?
.wrap_err("Receiving messages from receiver channel failed")?;
sender
.wrap_err("Handle to the sender failed")?
.wrap_err("Sending messages using sender channel failed")?;
Ok(())
})
});

Ok(Node {
id,
rx_input,
tx_output,
})
}

pub fn next(&mut self) -> PyResult<Option<PyInput>> {
self.__next__()
}

pub fn __next__(&mut self) -> PyResult<Option<PyInput>> {
Ok(self.rx_input.blocking_recv().map(PyInput))
}

fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}

pub fn send_output(&self, output_str: String, data: &PyBytes) -> Result<()> {
Ok(self
.tx_output
.blocking_send((output_str, data.as_bytes().to_vec()))
.wrap_err("Could not send output")?)
}

pub fn id(&self) -> String {
self.id.to_string()
}
}

#[pymodule]
fn dora(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<Node>().unwrap();
Ok(())
}
8 changes: 4 additions & 4 deletions apis/rust/node/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ pub async fn init(
}

#[async_trait]
pub trait CommunicationLayer {
pub trait CommunicationLayer: Send + Sync {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>>, BoxError>;
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>>, BoxError>;

async fn publish(&self, topic: &str, data: &[u8]) -> Result<(), BoxError>;

Expand All @@ -61,12 +61,12 @@ impl CommunicationLayer for ZenohCommunicationLayer {
async fn subscribe<'a>(
&'a self,
topic: &str,
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>>, BoxError> {
) -> Result<Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>>, BoxError> {
zenoh::Session::subscribe(&self.zenoh, self.prefixed(topic))
.reliable()
.await
.map(|s| {
let trait_object: Pin<Box<dyn futures::Stream<Item = Vec<u8>> + 'a>> =
let trait_object: Pin<Box<dyn futures::Stream<Item = Vec<u8>> + Send + 'a>> =
Box::pin(s.map(|s| s.value.payload.contiguous().into_owned()));
trait_object
})
Expand Down
1 change: 1 addition & 0 deletions apis/rust/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl Drop for DoraNode {
}
}

#[derive(Debug)]
pub struct Input {
pub id: DataId,
pub data: Vec<u8>,
Expand Down
12 changes: 12 additions & 0 deletions binaries/coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,15 @@ time = "0.3.9"
futures-concurrency = "2.0.3"
rand = "0.8.5"
dora-core = { version = "0.1.0", path = "../../libraries/core" }

[[example]]
name = "source_timer"
path = "examples/nodes/rust/source_timer.rs"

[[example]]
name = "sink_logger"
path = "examples/nodes/rust/sink_logger.rs"

[[example]]
name = "random_number"
path = "examples/nodes/rust/random_number.rs"
29 changes: 29 additions & 0 deletions binaries/coordinator/examples/graphs/python_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
communication:
zenoh:
prefix: /foo

nodes:
- id: static-string
custom:
run: python examples/nodes/python/static_string.py
outputs:
- string

- id: python-printer
custom:
run: python examples/nodes/python/printer.py
inputs:
string: static-string/string
time2: rust-timer/time

- id: rust-timer
custom:
run: cargo run --example source_timer
outputs:
- time

- id: rust-logger
custom:
run: cargo run --example sink_logger
inputs:
time: static-string/string
8 changes: 8 additions & 0 deletions binaries/coordinator/examples/nodes/python/printer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from dora import Node

node = Node()

for id, value in node:
print(f"From Python, id: {id}, value: {value}") if value is not [] else None

print("printer finished")
11 changes: 11 additions & 0 deletions binaries/coordinator/examples/nodes/python/static_string.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import time

from dora import Node

node = Node()

for i in range(100):
node.send_output("string", b"Hello World")
time.sleep(0.1)

print("static string finished")
File renamed without changes.

0 comments on commit b6a8d5d

Please sign in to comment.