diff --git a/zbus/src/azync/connection.rs b/zbus/src/azync/connection.rs index 565d91b0..c9e4fa75 100644 --- a/zbus/src/azync/connection.rs +++ b/zbus/src/azync/connection.rs @@ -119,7 +119,7 @@ struct ConnectionInner { raw_in_conn: Arc>>>, // FIXME: We really should be using async_lock::Mutex here but `Sink::start_send is not very // async friendly. :( - sink: Arc>, + raw_out_conn: Arc>, // Serial number for next outgoing message serial: AtomicU32, @@ -751,10 +751,7 @@ impl Connection { let out_socket = auth.conn.socket().get_ref().try_clone()?; let out_conn = RawConnection::wrap(Async::new(out_socket)?); let cap_unix_fd = auth.cap_unix_fd; - let sink = Arc::new(sync::Mutex::new(MessageSink { - raw_conn: out_conn, - cap_unix_fd, - })); + let raw_out_conn = Arc::new(sync::Mutex::new(out_conn)); let (mut msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED); msg_sender.set_overflow(true); @@ -772,7 +769,7 @@ impl Connection { msg_receiver, inner: Arc::new(ConnectionInner { raw_in_conn, - sink, + raw_out_conn, server_guid: auth.server_guid, cap_unix_fd, bus_conn: bus_connection, @@ -828,39 +825,7 @@ impl Connection { } } -#[derive(Debug)] -struct MessageSink { - raw_conn: DynSocketConnection, - cap_unix_fd: bool, -} - -assert_impl_all!(MessageSink: Send, Sync, Unpin); - -impl MessageSink { - fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - match self.raw_conn.try_flush() { - Ok(()) => return Poll::Ready(Ok(())), - Err(e) => { - if e.kind() == ErrorKind::WouldBlock { - let poll = self.raw_conn.socket().poll_writable(cx); - - match poll { - Poll::Pending => return Poll::Pending, - // Guess socket became ready already so let's try it again. - Poll::Ready(Ok(_)) => continue, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), - } - } else { - return Poll::Ready(Err(Error::Io(e))); - } - } - } - } - } -} - -impl Sink for MessageSink { +impl Sink for Connection { type Error = Error; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -869,48 +834,36 @@ impl Sink for MessageSink { } fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> { - if !msg.fds().is_empty() && !self.cap_unix_fd { + if !msg.fds().is_empty() && !self.inner.cap_unix_fd { return Err(Error::Unsupported); } - self.get_mut().raw_conn.enqueue_message(msg); + self.inner + .raw_out_conn + .lock() + .expect("poisened lock") + .enqueue_message(msg); Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().flush(cx) + self.inner + .raw_out_conn + .lock() + .expect("poisened lock") + .flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let sink = self.get_mut(); - match sink.flush(cx) { + let mut raw_out_conn = self.inner.raw_out_conn.lock().expect("poisened lock"); + match raw_out_conn.flush(cx) { Poll::Ready(Ok(_)) => (), Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), Poll::Pending => return Poll::Pending, } - Poll::Ready(sink.raw_conn.close()) - } -} - -impl Sink for Connection { - type Error = Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_ready(cx) - } - - fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> { - Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).start_send(msg) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_close(cx) + Poll::Ready(raw_out_conn.close()) } } diff --git a/zbus/src/raw/connection.rs b/zbus/src/raw/connection.rs index 37fd673d..7cd57b83 100644 --- a/zbus/src/raw/connection.rs +++ b/zbus/src/raw/connection.rs @@ -1,4 +1,10 @@ -use std::{collections::VecDeque, io}; +use std::{ + collections::VecDeque, + io::{self, ErrorKind}, + task::{Context, Poll}, +}; + +use async_io::Async; use crate::{message::Message, message_header::MIN_MESSAGE_SIZE, raw::Socket, OwnedFd}; @@ -168,6 +174,31 @@ impl Connection { } } +impl Connection>> { + /// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`] impls. + pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match self.try_flush() { + Ok(()) => return Poll::Ready(Ok(())), + Err(e) => { + if e.kind() == ErrorKind::WouldBlock { + let poll = self.socket().poll_writable(cx); + + match poll { + Poll::Pending => return Poll::Pending, + // Guess socket became ready already so let's try it again. + Poll::Ready(Ok(_)) => continue, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + } + } else { + return Poll::Ready(Err(crate::Error::Io(e))); + } + } + } + } + } +} + #[cfg(test)] mod tests { use super::Connection;