Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consumers to implement POSIX AIO sources. #4054

Merged
merged 18 commits into from
Sep 15, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename aio-related types as suggested by carllerche
asomers committed Sep 3, 2021
commit 7e66ce2d7daf60c6634c76135b221927c86fb5c8
46 changes: 23 additions & 23 deletions tokio/src/io/poll_aio.rs → tokio/src/io/bsd/poll_aio.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use std::task::{Context, Poll};
/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`PollAio`] object.
/// [`Aio`] object.
pub trait AioSource {
/// Register this AIO event source with Tokio's reactor
fn register(&mut self, kq: RawFd, token: usize);
@@ -58,41 +58,41 @@ impl<T: AioSource> Source for MioSource<T> {

/// Associates a POSIX AIO control block with the reactor that drives it.
///
/// `PollAio`'s wrapped type must implement [`AioSource`] to be driven
/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
/// by the reactor.
///
/// The wrapped source may be accessed through the `PollAio` via the `Deref` and
/// The wrapped source may be accessed through the `Aio` via the `Deref` and
/// `DerefMut` traits.
///
/// ## Clearing readiness
///
/// If [`PollAio::poll`] returns ready, but the consumer determines that the
/// If [`Aio::poll`] returns ready, but the consumer determines that the
/// Source is not completely ready and must return to the Pending state,
/// [`PollAio::clear_ready`] may be used. This can be useful with
/// [`Aio::clear_ready`] may be used. This can be useful with
/// [`lio_listio`], which may generate a kevent when only a portion of the
/// operations have completed.
///
/// ## Platforms
///
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
/// `PollAio` is only available for that operating system.
/// `Aio` is only available for that operating system.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
// AsyncFd can't be used for POSIX AIO.
//
// Note that PollAio doesn't implement Drop. There's no need. Unlike other
// Note that Aio doesn't implement Drop. There's no need. Unlike other
// kqueue sources, simply dropping the object effectively deregisters it.
pub struct PollAio<E: AioSource> {
pub struct Aio<E: AioSource> {
io: MioSource<E>,
registration: Registration,
}

// ===== impl PollAio =====
// ===== impl Aio =====

impl<E: AioSource> PollAio<E> {
impl<E: AioSource> Aio<E> {
/// Indicates to Tokio that the source is no longer ready. The internal
/// readiness flag will be cleared, and tokio will wait for the next
/// edge-triggered readiness notification from the OS.
@@ -110,16 +110,16 @@ impl<E: AioSource> PollAio<E> {
/// call `clear_ready` before resubmitting it.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn clear_ready(&self, ev: PollAioEvent) {
pub fn clear_ready(&self, ev: AioEvent) {
self.registration.clear_readiness(ev.0)
}

/// Destroy the [`PollAio`] and return its inner Source
/// Destroy the [`Aio`] and return its inner Source
pub fn into_inner(self) -> E {
self.io.0
}

/// Creates a new `PollAio` suitable for use with POSIX AIO functions.
/// Creates a new `Aio` suitable for use with POSIX AIO functions.
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
@@ -129,7 +129,7 @@ impl<E: AioSource> PollAio<E> {
Self::new_with_interest(io, Interest::AIO)
}

/// Creates a new `PollAio` suitable for use with [`lio_listio`].
/// Creates a new `Aio` suitable for use with [`lio_listio`].
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
@@ -162,34 +162,34 @@ impl<E: AioSource> PollAio<E> {
/// is scheduled to receive a wakeup when the underlying operation
/// completes. Note that on multiple calls to poll, only the Waker from the
/// Context passed to the most recent call is scheduled to receive a wakeup.
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<PollAioEvent>> {
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
let ev = ready!(self.registration.poll_read_ready(cx))?;
Poll::Ready(Ok(PollAioEvent(ev)))
Poll::Ready(Ok(AioEvent(ev)))
}
}

impl<E: AioSource> Deref for PollAio<E> {
impl<E: AioSource> Deref for Aio<E> {
type Target = E;

fn deref(&self) -> &E {
&self.io.0
}
}

impl<E: AioSource> DerefMut for PollAio<E> {
impl<E: AioSource> DerefMut for Aio<E> {
fn deref_mut(&mut self) -> &mut E {
&mut self.io.0
}
}

impl<E: AioSource + fmt::Debug> fmt::Debug for PollAio<E> {
impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollAio").field("io", &self.io.0).finish()
f.debug_struct("Aio").field("io", &self.io.0).finish()
}
}

/// Opaque data returned by [`PollAio::poll`].
/// Opaque data returned by [`Aio::poll`].
///
/// It can be fed back to [`PollAio::clear_ready`].
/// It can be fed back to [`Aio::clear_ready`].
#[derive(Debug)]
pub struct PollAioEvent(ReadyEvent);
pub struct AioEvent(ReadyEvent);
5 changes: 4 additions & 1 deletion tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -218,7 +218,10 @@ cfg_io_driver_impl! {
}

cfg_aio! {
pub mod poll_aio;
/// BSD-specific I/O types
pub mod bsd {
pub mod poll_aio;
}
}

cfg_net_unix! {
40 changes: 20 additions & 20 deletions tokio/tests/io_poll_aio.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use std::{
task::{Context, Poll},
};
use tempfile::tempfile;
use tokio::io::poll_aio::{AioSource, PollAio};
use tokio::io::bsd::poll_aio::{AioSource, Aio};
use tokio_test::assert_pending;

mod aio {
@@ -28,7 +28,7 @@ mod aio {
}

/// A very crude implementation of an AIO-based future
struct FsyncFut(PollAio<WrappedAioCb<'static>>);
struct FsyncFut(Aio<WrappedAioCb<'static>>);

impl Future for FsyncFut {
type Output = std::io::Result<()>;
@@ -40,7 +40,7 @@ mod aio {
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the PollAio.
// point, since we're about to drop the Aio.
let result = (*self.0).0.aio_return();
match result {
Ok(_) => Poll::Ready(Ok(())),
@@ -75,7 +75,7 @@ mod aio {
}
}

struct LlFut(PollAio<LlSource>);
struct LlFut(Aio<LlSource>);

impl Future for LlFut {
type Output = std::io::Result<()>;
@@ -97,7 +97,7 @@ mod aio {
/// A very simple object that can implement AioSource and can be reused.
///
/// mio_aio normally assumes that each AioCb will be consumed on completion.
/// This somewhat contrived example shows how a PollAio object can be reused
/// This somewhat contrived example shows how an Aio object can be reused
/// anyway.
struct ReusableFsyncSource {
aiocb: Pin<Box<AioCb<'static>>>,
@@ -130,7 +130,7 @@ mod aio {
}
}

struct ReusableFsyncFut<'a>(&'a mut PollAio<ReusableFsyncSource>);
struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
impl<'a> Future for ReusableFsyncFut<'a> {
type Output = std::io::Result<()>;

@@ -140,7 +140,7 @@ mod aio {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable PollAio, we must clear
// Since this future uses a reusable Aio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
@@ -162,7 +162,7 @@ mod aio {
let fd = f.as_raw_fd();
let aiocb = AioCb::from_fd(fd, 0);
let source = WrappedAioCb(aiocb);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
let mut poll_aio = Aio::new_for_aio(source).unwrap();
(*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
let fut = FsyncFut(poll_aio);
fut.await.unwrap();
@@ -175,7 +175,7 @@ mod aio {
let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
aiocb.aio_fildes = fd;
let source = LlSource(Box::pin(aiocb));
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
let mut poll_aio = Aio::new_for_aio(source).unwrap();
let r = unsafe {
let p = (*poll_aio).0.as_mut().get_unchecked_mut();
libc::aio_fsync(libc::O_SYNC, p)
@@ -185,14 +185,14 @@ mod aio {
fut.await.unwrap();
}

/// A suitably crafted future type can reuse a PollAio object
/// A suitably crafted future type can reuse an Aio object
#[tokio::test]
async fn reuse() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let aiocb0 = AioCb::from_fd(fd, 0);
let source = ReusableFsyncSource::new(aiocb0);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
let mut poll_aio = Aio::new_for_aio(source).unwrap();
poll_aio.fsync();
let fut0 = ReusableFsyncFut(&mut poll_aio);
fut0.await.unwrap();
@@ -221,7 +221,7 @@ mod lio {
}

/// A very crude lio_listio-based Future
struct LioFut(Option<PollAio<WrappedLioCb<'static>>>);
struct LioFut(Option<Aio<WrappedLioCb<'static>>>);

impl Future for LioFut {
type Output = std::io::Result<Vec<isize>>;
@@ -233,7 +233,7 @@ mod lio {
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the PollAio.
// point, since we're about to drop the Aio.
let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
});
@@ -243,7 +243,7 @@ mod lio {
}
}

/// Minimal example demonstrating reuse of a PollAio object with lio
/// Minimal example demonstrating reuse of an Aio object with lio
/// readiness. mio_aio::LioCb actually does something similar under the
/// hood.
struct ReusableLioSource {
@@ -279,7 +279,7 @@ mod lio {
self.fd = 0;
}
}
struct ReusableLioFut<'a>(&'a mut PollAio<ReusableLioSource>);
struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
impl<'a> Future for ReusableLioFut<'a> {
type Output = std::io::Result<Vec<isize>>;

@@ -289,7 +289,7 @@ mod lio {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable PollAio, we must clear
// Since this future uses a reusable Aio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
@@ -320,7 +320,7 @@ mod lio {
);
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
let mut poll_aio = PollAio::new_for_lio(source).unwrap();
let mut poll_aio = Aio::new_for_lio(source).unwrap();

// Send the operation to the kernel
(*poll_aio).0.submit().unwrap();
@@ -330,7 +330,7 @@ mod lio {
assert_eq!(v[0] as usize, WBUF.len());
}

/// A suitably crafted future type can reuse a PollAio object
/// A suitably crafted future type can reuse an Aio object
#[tokio::test]
async fn reuse() {
const WBUF: &[u8] = b"abcdef";
@@ -346,14 +346,14 @@ mod lio {
);
let liocb0 = builder0.finish();
let source = ReusableLioSource::new(liocb0);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
let mut poll_aio = Aio::new_for_aio(source).unwrap();
poll_aio.submit();
let fut0 = ReusableLioFut(&mut poll_aio);
let v = fut0.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());

// Now reuse the same PollAio
// Now reuse the same Aio
let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
builder1 = builder1.emplace_slice(
f.as_raw_fd(),