From 77f488c1067451221b3d0f1308030454e543e597 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sat, 12 Oct 2024 21:28:11 +0200 Subject: [PATCH 1/2] queryable builder moved to builders --- .../src/api/{builders.rs => builders/mod.rs} | 1 + zenoh/src/api/builders/queryable.rs | 271 ++++++++++++++++++ zenoh/src/api/queryable.rs | 239 --------------- zenoh/src/api/session.rs | 11 +- zenoh/src/lib.rs | 9 +- 5 files changed, 284 insertions(+), 247 deletions(-) rename zenoh/src/api/{builders.rs => builders/mod.rs} (95%) create mode 100644 zenoh/src/api/builders/queryable.rs diff --git a/zenoh/src/api/builders.rs b/zenoh/src/api/builders/mod.rs similarity index 95% rename from zenoh/src/api/builders.rs rename to zenoh/src/api/builders/mod.rs index 5327dabe90..04fc7f1a57 100644 --- a/zenoh/src/api/builders.rs +++ b/zenoh/src/api/builders/mod.rs @@ -13,4 +13,5 @@ // pub(crate) mod publisher; +pub(crate) mod queryable; pub(crate) mod sample; diff --git a/zenoh/src/api/builders/queryable.rs b/zenoh/src/api/builders/queryable.rs new file mode 100644 index 0000000000..24da21f025 --- /dev/null +++ b/zenoh/src/api/builders/queryable.rs @@ -0,0 +1,271 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, +}; + +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; +#[zenoh_macros::unstable] +use { + crate::api::queryable::Query, crate::api::queryable::Queryable, + crate::api::queryable::QueryableInner, +}; + +use crate::{ + api::{ + handlers::{locked, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::Locality, + }, + handlers::Callback, + Session, +}; + +/// A builder for initializing a [`Queryable`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); +/// let queryable = session.declare_queryable("key/expression").await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { + pub(crate) session: &'a Session, + pub(crate) key_expr: ZResult>, + pub(crate) complete: bool, + pub(crate) origin: Locality, + pub(crate) handler: Handler, +} + +impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { + /// Receive the queries for this queryable with a callback. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let queryable = session + /// .declare_queryable("key/expression") + /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> + where + F: Fn(Query) + Send + Sync + 'static, + { + self.with(Callback::new(Arc::new(callback))) + } + + /// Receive the queries for this Queryable with a mutable callback. + /// + /// Using this guarantees that your callback will never be called concurrently. + /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let mut n = 0; + /// let queryable = session + /// .declare_queryable("key/expression") + /// .callback_mut(move |query| {n += 1;}) + /// .await + /// .unwrap(); + /// # } + /// ``` + #[inline] + pub fn callback_mut(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> + where + F: FnMut(Query) + Send + Sync + 'static, + { + self.callback(locked(callback)) + } + + /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler). + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let queryable = session + /// .declare_queryable("key/expression") + /// .with(flume::bounded(32)) + /// .await + /// .unwrap(); + /// while let Ok(query) = queryable.recv_async().await { + /// println!(">> Handling query '{}'", query.selector()); + /// } + /// # } + /// ``` + #[inline] + pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> + where + Handler: IntoHandler, + { + let QueryableBuilder { + session, + key_expr, + complete, + origin, + handler: _, + } = self; + QueryableBuilder { + session, + key_expr, + complete, + origin, + handler, + } + } +} + +impl<'a, 'b> QueryableBuilder<'a, 'b, Callback> { + /// Register the queryable callback to be run in background until the session is closed. + /// + /// Background builder doesn't return a `Queryable` object anymore. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// // no need to assign and keep a variable with a background queryable + /// session + /// .declare_queryable("key/expression") + /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) + /// .background() + /// .await + /// .unwrap(); + /// # } + /// ``` + pub fn background(self) -> QueryableBuilder<'a, 'b, Callback, true> { + QueryableBuilder { + session: self.session, + key_expr: self.key_expr, + complete: self.complete, + origin: self.origin, + handler: self.handler, + } + } +} + +impl QueryableBuilder<'_, '_, Handler, BACKGROUND> { + /// Change queryable completeness. + #[inline] + pub fn complete(mut self, complete: bool) -> Self { + self.complete = complete; + self + } + + /// + /// + /// Restrict the matching queries that will be receive by this [`Queryable`] + /// to the ones that have the given [`Locality`](Locality). + #[inline] + #[zenoh_macros::unstable] + pub fn allowed_origin(mut self, origin: Locality) -> Self { + self.origin = origin; + self + } +} + +impl Resolvable for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type To = ZResult>; +} + +impl Wait for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + fn wait(self) -> ::To { + let session = self.session; + let (callback, receiver) = self.handler.into_handler(); + session + .0 + .declare_queryable_inner( + &self.key_expr?.to_wire(&session.0), + self.complete, + self.origin, + callback, + ) + .map(|qable_state| Queryable { + inner: QueryableInner { + session: self.session.downgrade(), + id: qable_state.id, + undeclare_on_drop: true, + }, + handler: receiver, + }) + } +} + +impl IntoFuture for QueryableBuilder<'_, '_, Handler> +where + Handler: IntoHandler + Send, + Handler::Handler: Send, +{ + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl Resolvable for QueryableBuilder<'_, '_, Callback, true> { + type To = ZResult<()>; +} + +impl Wait for QueryableBuilder<'_, '_, Callback, true> { + fn wait(self) -> ::To { + self.session.0.declare_queryable_inner( + &self.key_expr?.to_wire(&self.session.0), + self.complete, + self.origin, + self.handler, + )?; + Ok(()) + } +} + +impl IntoFuture for QueryableBuilder<'_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 3aafb067f2..de7c0dc6c3 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -44,7 +44,6 @@ use crate::{ }, bytes::{OptionZBytes, ZBytes}, encoding::Encoding, - handlers::{locked, DefaultHandler, IntoHandler}, key_expr::KeyExpr, publisher::Priority, sample::{Locality, QoSBuilder, Sample, SampleKind}, @@ -55,7 +54,6 @@ use crate::{ }, handlers::Callback, net::primitives::Primitives, - Session, }; pub(crate) struct QueryInner { @@ -583,170 +581,6 @@ impl IntoFuture for QueryableUndeclaration { std::future::ready(self.wait()) } } - -/// A builder for initializing a [`Queryable`]. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// -/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); -/// let queryable = session.declare_queryable("key/expression").await.unwrap(); -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct QueryableBuilder<'a, 'b, Handler, const BACKGROUND: bool = false> { - pub(crate) session: &'a Session, - pub(crate) key_expr: ZResult>, - pub(crate) complete: bool, - pub(crate) origin: Locality, - pub(crate) handler: Handler, -} - -impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { - /// Receive the queries for this queryable with a callback. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let queryable = session - /// .declare_queryable("key/expression") - /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> - where - F: Fn(Query) + Send + Sync + 'static, - { - self.with(Callback::new(Arc::new(callback))) - } - - /// Receive the queries for this Queryable with a mutable callback. - /// - /// Using this guarantees that your callback will never be called concurrently. - /// If your callback is also accepted by the [`callback`](QueryableBuilder::callback) method, we suggest you use it instead of `callback_mut`. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let mut n = 0; - /// let queryable = session - /// .declare_queryable("key/expression") - /// .callback_mut(move |query| {n += 1;}) - /// .await - /// .unwrap(); - /// # } - /// ``` - #[inline] - pub fn callback_mut(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> - where - F: FnMut(Query) + Send + Sync + 'static, - { - self.callback(locked(callback)) - } - - /// Receive the queries for this Queryable with a [`Handler`](crate::handlers::IntoHandler). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// let queryable = session - /// .declare_queryable("key/expression") - /// .with(flume::bounded(32)) - /// .await - /// .unwrap(); - /// while let Ok(query) = queryable.recv_async().await { - /// println!(">> Handling query '{}'", query.selector()); - /// } - /// # } - /// ``` - #[inline] - pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> - where - Handler: IntoHandler, - { - let QueryableBuilder { - session, - key_expr, - complete, - origin, - handler: _, - } = self; - QueryableBuilder { - session, - key_expr, - complete, - origin, - handler, - } - } -} - -impl<'a, 'b> QueryableBuilder<'a, 'b, Callback> { - /// Register the queryable callback to be run in background until the session is closed. - /// - /// Background builder doesn't return a `Queryable` object anymore. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// - /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - /// // no need to assign and keep a variable with a background queryable - /// session - /// .declare_queryable("key/expression") - /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) - /// .background() - /// .await - /// .unwrap(); - /// # } - /// ``` - pub fn background(self) -> QueryableBuilder<'a, 'b, Callback, true> { - QueryableBuilder { - session: self.session, - key_expr: self.key_expr, - complete: self.complete, - origin: self.origin, - handler: self.handler, - } - } -} - -impl QueryableBuilder<'_, '_, Handler, BACKGROUND> { - /// Change queryable completeness. - #[inline] - pub fn complete(mut self, complete: bool) -> Self { - self.complete = complete; - self - } - - /// - /// - /// Restrict the matching queries that will be receive by this [`Queryable`] - /// to the ones that have the given [`Locality`](Locality). - #[inline] - #[zenoh_macros::unstable] - pub fn allowed_origin(mut self, origin: Locality) -> Self { - self.origin = origin; - self - } -} - /// A queryable that provides data through a [`Handler`](crate::handlers::IntoHandler). /// /// Queryables can be created from a zenoh [`Session`](crate::Session) @@ -911,76 +745,3 @@ impl DerefMut for Queryable { self.handler_mut() } } - -impl Resolvable for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type To = ZResult>; -} - -impl Wait for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - fn wait(self) -> ::To { - let session = self.session; - let (callback, receiver) = self.handler.into_handler(); - session - .0 - .declare_queryable_inner( - &self.key_expr?.to_wire(&session.0), - self.complete, - self.origin, - callback, - ) - .map(|qable_state| Queryable { - inner: QueryableInner { - session: self.session.downgrade(), - id: qable_state.id, - undeclare_on_drop: true, - }, - handler: receiver, - }) - } -} - -impl IntoFuture for QueryableBuilder<'_, '_, Handler> -where - Handler: IntoHandler + Send, - Handler::Handler: Send, -{ - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -impl Resolvable for QueryableBuilder<'_, '_, Callback, true> { - type To = ZResult<()>; -} - -impl Wait for QueryableBuilder<'_, '_, Callback, true> { - fn wait(self) -> ::To { - self.session.0.declare_queryable_inner( - &self.key_expr?.to_wire(&self.session.0), - self.complete, - self.origin, - self.handler, - )?; - Ok(()) - } -} - -impl IntoFuture for QueryableBuilder<'_, '_, Callback, true> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 42a6fcf8b0..47102c3960 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -70,9 +70,12 @@ use zenoh_task::TaskController; use super::{ admin, - builders::publisher::{ - PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, SessionDeleteBuilder, - SessionPutBuilder, + builders::{ + publisher::{ + PublicationBuilderDelete, PublicationBuilderPut, PublisherBuilder, + SessionDeleteBuilder, SessionPutBuilder, + }, + queryable::QueryableBuilder, }, bytes::ZBytes, encoding::Encoding, @@ -83,7 +86,7 @@ use super::{ query::{ ConsolidationMode, QueryConsolidation, QueryState, QueryTarget, Reply, SessionGetBuilder, }, - queryable::{Query, QueryInner, QueryableBuilder, QueryableState}, + queryable::{Query, QueryInner, QueryableState}, sample::{DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind}, selector::Selector, subscriber::{SubscriberBuilder, SubscriberKind, SubscriberState}, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 737b5587b4..010575134e 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -243,16 +243,17 @@ pub mod query { #[zenoh_macros::internal] pub use crate::api::queryable::ReplySample; - #[zenoh_macros::unstable] - pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters}; pub use crate::api::{ + builders::queryable::QueryableBuilder, query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError}, queryable::{ - Query, Queryable, QueryableBuilder, QueryableUndeclaration, ReplyBuilder, - ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder, + Query, Queryable, QueryableUndeclaration, ReplyBuilder, ReplyBuilderDelete, + ReplyBuilderPut, ReplyErrBuilder, }, selector::Selector, }; + #[zenoh_macros::unstable] + pub use crate::api::{query::ReplyKeyExpr, selector::ZenohParameters}; } /// Callback handler trait From 7ff8df09aa898180118cc757a2796bdbc4cd48ee Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sat, 12 Oct 2024 22:23:39 +0200 Subject: [PATCH 2/2] reply builders moved to builders --- zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/builders/reply.rs | 286 ++++++++++++++++++++++++++++++++ zenoh/src/api/queryable.rs | 238 +------------------------- zenoh/src/lib.rs | 6 +- 4 files changed, 298 insertions(+), 233 deletions(-) create mode 100644 zenoh/src/api/builders/reply.rs diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 04fc7f1a57..4a9ae98f9d 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -14,4 +14,5 @@ pub(crate) mod publisher; pub(crate) mod queryable; +pub(crate) mod reply; pub(crate) mod sample; diff --git a/zenoh/src/api/builders/reply.rs b/zenoh/src/api/builders/reply.rs new file mode 100644 index 0000000000..d97119da99 --- /dev/null +++ b/zenoh/src/api/builders/reply.rs @@ -0,0 +1,286 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::future::{IntoFuture, Ready}; + +use uhlc::Timestamp; +use zenoh_core::{Resolvable, Wait}; +use zenoh_protocol::{ + core::{CongestionControl, WireExpr}, + network::{response, Mapping, Response}, + zenoh::{self, ResponseBody}, +}; +use zenoh_result::ZResult; + +#[zenoh_macros::unstable] +use crate::api::sample::SourceInfo; +use crate::api::{ + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, + TimestampBuilderTrait, + }, + bytes::{OptionZBytes, ZBytes}, + encoding::Encoding, + key_expr::KeyExpr, + publisher::Priority, + queryable::Query, + sample::QoSBuilder, + value::Value, +}; + +#[derive(Debug)] +pub struct ReplyBuilderPut { + payload: ZBytes, + encoding: Encoding, +} +#[derive(Debug)] +pub struct ReplyBuilderDelete; + +/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct ReplyBuilder<'a, 'b, T> { + query: &'a Query, + key_expr: ZResult>, + kind: T, + timestamp: Option, + qos: QoSBuilder, + #[cfg(feature = "unstable")] + source_info: SourceInfo, + attachment: Option, +} + +impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderPut> { + pub(crate) fn new( + query: &'a Query, + key_expr: TryIntoKeyExpr, + payload: IntoZBytes, + ) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + IntoZBytes: Into, + { + Self { + query, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderPut { + payload: payload.into(), + encoding: Encoding::default(), + }, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + attachment: None, + } + } +} + +impl<'a, 'b> ReplyBuilder<'a, 'b, ReplyBuilderDelete> { + pub(crate) fn new(query: &'a Query, key_expr: TryIntoKeyExpr) -> Self + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + Self { + query, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderDelete, + timestamp: None, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + attachment: None, + } + } +} + +#[zenoh_macros::internal_trait] +impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { + fn timestamp>>(self, timestamp: U) -> Self { + Self { + timestamp: timestamp.into(), + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { + fn attachment>(self, attachment: U) -> Self { + let attachment: OptionZBytes = attachment.into(); + Self { + attachment: attachment.into(), + ..self + } + } + + #[cfg(feature = "unstable")] + fn source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info, + ..self + } + } +} + +#[zenoh_macros::internal_trait] +impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } + } + + fn priority(self, priority: Priority) -> Self { + let qos = self.qos.priority(priority); + Self { qos, ..self } + } + + fn express(self, is_express: bool) -> Self { + let qos = self.qos.express(is_express); + Self { qos, ..self } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn encoding>(self, encoding: T) -> Self { + Self { + kind: ReplyBuilderPut { + encoding: encoding.into(), + ..self.kind + }, + ..self + } + } +} + +impl Resolvable for ReplyBuilder<'_, '_, T> { + type To = ZResult<()>; +} + +impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn wait(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::put(key_expr, self.kind.payload) + .encoding(self.kind.encoding) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> { + fn wait(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::delete(key_expr) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + +/// A builder returned by [`Query::reply_err()`](Query::reply_err). +#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] +#[derive(Debug)] +pub struct ReplyErrBuilder<'a> { + query: &'a Query, + value: Value, +} + +impl<'a> ReplyErrBuilder<'a> { + pub(crate) fn new(query: &'a Query, payload: IntoZBytes) -> ReplyErrBuilder<'_> + where + IntoZBytes: Into, + { + Self { + query, + value: Value::new(payload, Encoding::default()), + } + } +} + +#[zenoh_macros::internal_trait] +impl EncodingBuilderTrait for ReplyErrBuilder<'_> { + fn encoding>(self, encoding: T) -> Self { + let mut value = self.value.clone(); + value.encoding = encoding.into(); + Self { value, ..self } + } +} + +impl<'a> Resolvable for ReplyErrBuilder<'a> { + type To = ZResult<()>; +} + +impl Wait for ReplyErrBuilder<'_> { + fn wait(self) -> ::To { + self.query.inner.primitives.send_response(Response { + rid: self.query.inner.qid, + wire_expr: WireExpr { + scope: 0, + suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), + mapping: Mapping::Sender, + }, + payload: ResponseBody::Err(zenoh::Err { + encoding: self.value.encoding.into(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: self.value.payload.into(), + }), + ext_qos: response::ext::QoSType::RESPONSE, + ext_tstamp: None, + ext_respid: Some(response::ext::ResponderIdType { + zid: self.query.inner.zid, + eid: self.query.eid, + }), + }); + Ok(()) + } +} + +impl<'a> IntoFuture for ReplyErrBuilder<'a> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index de7c0dc6c3..613d1fa1d8 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -19,18 +19,16 @@ use std::{ }; use tracing::error; -use uhlc::Timestamp; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::{ - core::{CongestionControl, EntityId, Parameters, WireExpr, ZenohIdProto}, + core::{EntityId, Parameters, WireExpr, ZenohIdProto}, network::{response, Mapping, RequestId, Response, ResponseFinal}, zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody}, }; use zenoh_result::ZResult; #[zenoh_macros::unstable] use { - crate::api::{query::ReplyKeyExpr, sample::SourceInfo}, - zenoh_config::wrappers::EntityGlobalId, + crate::api::query::ReplyKeyExpr, zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto, }; @@ -38,15 +36,11 @@ use { use crate::api::selector::ZenohParameters; use crate::{ api::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, - TimestampBuilderTrait, - }, - bytes::{OptionZBytes, ZBytes}, + builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, + bytes::ZBytes, encoding::Encoding, key_expr::KeyExpr, - publisher::Priority, - sample::{Locality, QoSBuilder, Sample, SampleKind}, + sample::{Locality, Sample, SampleKind}, selector::Selector, session::{UndeclarableSealed, WeakSession}, value::Value, @@ -161,19 +155,7 @@ impl Query { >>::Error: Into, IntoZBytes: Into, { - ReplyBuilder { - query: self, - key_expr: key_expr.try_into().map_err(Into::into), - qos: response::ext::QoSType::RESPONSE.into(), - kind: ReplyBuilderPut { - payload: payload.into(), - encoding: Encoding::default(), - }, - timestamp: None, - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - attachment: None, - } + ReplyBuilder::<'_, 'b, ReplyBuilderPut>::new(self, key_expr, payload) } /// Sends a [`crate::query::ReplyError`] as a reply to this Query. @@ -182,10 +164,7 @@ impl Query { where IntoZBytes: Into, { - ReplyErrBuilder { - query: self, - value: Value::new(payload, Encoding::default()), - } + ReplyErrBuilder::new(self, payload) } /// Sends a [`crate::sample::Sample`] of kind [`crate::sample::SampleKind::Delete`] as a reply to this Query. @@ -202,16 +181,7 @@ impl Query { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - ReplyBuilder { - query: self, - key_expr: key_expr.try_into().map_err(Into::into), - qos: response::ext::QoSType::RESPONSE.into(), - kind: ReplyBuilderDelete, - timestamp: None, - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - attachment: None, - } + ReplyBuilder::<'_, 'b, ReplyBuilderDelete>::new(self, key_expr) } /// Queries may or may not accept replies on key expressions that do not intersect with their own key expression. @@ -280,121 +250,8 @@ impl IntoFuture for ReplySample<'_> { } } -#[derive(Debug)] -pub struct ReplyBuilderPut { - payload: ZBytes, - encoding: Encoding, -} -#[derive(Debug)] -pub struct ReplyBuilderDelete; - -/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct ReplyBuilder<'a, 'b, T> { - query: &'a Query, - key_expr: ZResult>, - kind: T, - timestamp: Option, - qos: QoSBuilder, - #[cfg(feature = "unstable")] - source_info: SourceInfo, - attachment: Option, -} - -#[zenoh_macros::internal_trait] -impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { - fn timestamp>>(self, timestamp: U) -> Self { - Self { - timestamp: timestamp.into(), - ..self - } - } -} - -#[zenoh_macros::internal_trait] -impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { - fn attachment>(self, attachment: U) -> Self { - let attachment: OptionZBytes = attachment.into(); - Self { - attachment: attachment.into(), - ..self - } - } - - #[cfg(feature = "unstable")] - fn source_info(self, source_info: SourceInfo) -> Self { - Self { - source_info, - ..self - } - } -} - -#[zenoh_macros::internal_trait] -impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let qos = self.qos.congestion_control(congestion_control); - Self { qos, ..self } - } - - fn priority(self, priority: Priority) -> Self { - let qos = self.qos.priority(priority); - Self { qos, ..self } - } - - fn express(self, is_express: bool) -> Self { - let qos = self.qos.express(is_express); - Self { qos, ..self } - } -} - -#[zenoh_macros::internal_trait] -impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { - fn encoding>(self, encoding: T) -> Self { - Self { - kind: ReplyBuilderPut { - encoding: encoding.into(), - ..self.kind - }, - ..self - } - } -} - -impl Resolvable for ReplyBuilder<'_, '_, T> { - type To = ZResult<()>; -} - -impl Wait for ReplyBuilder<'_, '_, ReplyBuilderPut> { - fn wait(self) -> ::To { - let key_expr = self.key_expr?.into_owned(); - let sample = SampleBuilder::put(key_expr, self.kind.payload) - .encoding(self.kind.encoding) - .timestamp(self.timestamp) - .qos(self.qos.into()); - #[cfg(feature = "unstable")] - let sample = sample.source_info(self.source_info); - let sample = sample.attachment(self.attachment); - self.query._reply_sample(sample.into()) - } -} - -impl Wait for ReplyBuilder<'_, '_, ReplyBuilderDelete> { - fn wait(self) -> ::To { - let key_expr = self.key_expr?.into_owned(); - let sample = SampleBuilder::delete(key_expr) - .timestamp(self.timestamp) - .qos(self.qos.into()); - #[cfg(feature = "unstable")] - let sample = sample.source_info(self.source_info); - let sample = sample.attachment(self.attachment); - self.query._reply_sample(sample.into()) - } -} - impl Query { - fn _reply_sample(&self, sample: Sample) -> ZResult<()> { + pub(crate) fn _reply_sample(&self, sample: Sample) -> ZResult<()> { let c = zcondfeat!( "unstable", !self._accepts_any_replies().unwrap_or(false), @@ -446,83 +303,6 @@ impl Query { Ok(()) } } - -impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderPut> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -impl IntoFuture for ReplyBuilder<'_, '_, ReplyBuilderDelete> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - -/// A builder returned by [`Query::reply_err()`](Query::reply_err). -#[must_use = "Resolvables do nothing unless you resolve them using `.await` or `zenoh::Wait::wait`"] -#[derive(Debug)] -pub struct ReplyErrBuilder<'a> { - query: &'a Query, - value: Value, -} - -#[zenoh_macros::internal_trait] -impl EncodingBuilderTrait for ReplyErrBuilder<'_> { - fn encoding>(self, encoding: T) -> Self { - let mut value = self.value.clone(); - value.encoding = encoding.into(); - Self { value, ..self } - } -} - -impl<'a> Resolvable for ReplyErrBuilder<'a> { - type To = ZResult<()>; -} - -impl Wait for ReplyErrBuilder<'_> { - fn wait(self) -> ::To { - self.query.inner.primitives.send_response(Response { - rid: self.query.inner.qid, - wire_expr: WireExpr { - scope: 0, - suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), - mapping: Mapping::Sender, - }, - payload: ResponseBody::Err(zenoh::Err { - encoding: self.value.encoding.into(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: self.value.payload.into(), - }), - ext_qos: response::ext::QoSType::RESPONSE, - ext_tstamp: None, - ext_respid: Some(response::ext::ResponderIdType { - zid: self.query.inner.zid, - eid: self.query.eid, - }), - }); - Ok(()) - } -} - -impl<'a> IntoFuture for ReplyErrBuilder<'a> { - type Output = ::To; - type IntoFuture = Ready<::To>; - - fn into_future(self) -> Self::IntoFuture { - std::future::ready(self.wait()) - } -} - pub(crate) struct QueryableState { pub(crate) id: Id, pub(crate) key_expr: WireExpr<'static>, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 010575134e..c2cbbcec1a 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -245,11 +245,9 @@ pub mod query { pub use crate::api::queryable::ReplySample; pub use crate::api::{ builders::queryable::QueryableBuilder, + builders::reply::{ReplyBuilder, ReplyBuilderDelete, ReplyBuilderPut, ReplyErrBuilder}, query::{ConsolidationMode, QueryConsolidation, QueryTarget, Reply, ReplyError}, - queryable::{ - Query, Queryable, QueryableUndeclaration, ReplyBuilder, ReplyBuilderDelete, - ReplyBuilderPut, ReplyErrBuilder, - }, + queryable::{Query, Queryable, QueryableUndeclaration}, selector::Selector, }; #[zenoh_macros::unstable]