Skip to content
This repository was archived by the owner on May 11, 2023. It is now read-only.

Commit

Permalink
zb: Drop internal MessageSink entirely
Browse files Browse the repository at this point in the history
  • Loading branch information
zeenix committed Jul 4, 2021
1 parent 26c433a commit 3a0d099
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 66 deletions.
83 changes: 18 additions & 65 deletions zbus/src/azync/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct ConnectionInner<S> {
raw_in_conn: Arc<Mutex<RawConnection<Async<S>>>>,
// FIXME: We really should be using async_lock::Mutex here but `Sink::start_send is not very
// async friendly. :(
sink: Arc<sync::Mutex<MessageSink>>,
raw_out_conn: Arc<sync::Mutex<DynSocketConnection>>,
// Serial number for next outgoing message
serial: AtomicU32,

Expand Down Expand Up @@ -751,10 +751,7 @@ impl Connection {
let out_socket = auth.conn.socket().get_ref().try_clone()?;
let out_conn = RawConnection::wrap(Async::new(out_socket)?);
let cap_unix_fd = auth.cap_unix_fd;
let sink = Arc::new(sync::Mutex::new(MessageSink {
raw_conn: out_conn,
cap_unix_fd,
}));
let raw_out_conn = Arc::new(sync::Mutex::new(out_conn));

let (mut msg_sender, msg_receiver) = broadcast(DEFAULT_MAX_QUEUED);
msg_sender.set_overflow(true);
Expand All @@ -772,7 +769,7 @@ impl Connection {
msg_receiver,
inner: Arc::new(ConnectionInner {
raw_in_conn,
sink,
raw_out_conn,
server_guid: auth.server_guid,
cap_unix_fd,
bus_conn: bus_connection,
Expand Down Expand Up @@ -828,39 +825,7 @@ impl Connection {
}
}

#[derive(Debug)]
struct MessageSink {
raw_conn: DynSocketConnection,
cap_unix_fd: bool,
}

assert_impl_all!(MessageSink: Send, Sync, Unpin);

impl MessageSink {
fn flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match self.raw_conn.try_flush() {
Ok(()) => return Poll::Ready(Ok(())),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
let poll = self.raw_conn.socket().poll_writable(cx);

match poll {
Poll::Pending => return Poll::Pending,
// Guess socket became ready already so let's try it again.
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
} else {
return Poll::Ready(Err(Error::Io(e)));
}
}
}
}
}
}

impl Sink<Message> for MessageSink {
impl Sink<Message> for Connection {
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Expand All @@ -869,48 +834,36 @@ impl Sink<Message> for MessageSink {
}

fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
if !msg.fds().is_empty() && !self.cap_unix_fd {
if !msg.fds().is_empty() && !self.inner.cap_unix_fd {
return Err(Error::Unsupported);
}

self.get_mut().raw_conn.enqueue_message(msg);
self.inner
.raw_out_conn
.lock()
.expect("poisened lock")
.enqueue_message(msg);

Ok(())
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.get_mut().flush(cx)
self.inner
.raw_out_conn
.lock()
.expect("poisened lock")
.flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let sink = self.get_mut();
match sink.flush(cx) {
let mut raw_out_conn = self.inner.raw_out_conn.lock().expect("poisened lock");
match raw_out_conn.flush(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}

Poll::Ready(sink.raw_conn.close())
}
}

impl Sink<Message> for Connection {
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, msg: Message) -> Result<()> {
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).start_send(msg)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut *self.inner.sink.lock().expect("poisoned lock")).poll_close(cx)
Poll::Ready(raw_out_conn.close())
}
}

Expand Down
33 changes: 32 additions & 1 deletion zbus/src/raw/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{collections::VecDeque, io};
use std::{
collections::VecDeque,
io::{self, ErrorKind},
task::{Context, Poll},
};

use async_io::Async;

use crate::{message::Message, message_header::MIN_MESSAGE_SIZE, raw::Socket, OwnedFd};

Expand Down Expand Up @@ -168,6 +174,31 @@ impl<S: Socket> Connection<S> {
}
}

impl Connection<Async<Box<dyn Socket>>> {
/// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`] impls.
pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
loop {
match self.try_flush() {
Ok(()) => return Poll::Ready(Ok(())),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
let poll = self.socket().poll_writable(cx);

match poll {
Poll::Pending => return Poll::Pending,
// Guess socket became ready already so let's try it again.
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
} else {
return Poll::Ready(Err(crate::Error::Io(e)));
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::Connection;
Expand Down

0 comments on commit 3a0d099

Please sign in to comment.