Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consumers to implement POSIX AIO sources. #4054

Merged
merged 18 commits into from
Sep 15, 2021
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -130,6 +130,9 @@ tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["futures", "checkpoint"] }

18 changes: 18 additions & 0 deletions tokio/src/io/driver/interest.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,24 @@ use std::ops;
pub struct Interest(mio::Interest);

impl Interest {
cfg_aio! {
/// Interest for POSIX AIO
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);

/// Interest for POSIX AIO
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);

/// Interest for POSIX AIO lio_listio events
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);

/// Interest for POSIX AIO lio_listio events
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
1 change: 1 addition & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
11 changes: 11 additions & 0 deletions tokio/src/io/driver/ready.rs
Original file line number Diff line number Diff line change
@@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

#[cfg(all(target_os = "freebsd", feature = "net"))]
{
if event.is_aio() {
ready |= Ready::READABLE;
}

if event.is_lio() {
ready |= Ready::READABLE;
}
}

if event.is_readable() {
ready |= Ready::READABLE;
}
4 changes: 4 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -217,6 +217,10 @@ cfg_io_driver_impl! {
pub(crate) use poll_evented::PollEvented;
}

cfg_aio! {
pub mod poll_aio;
}

cfg_net_unix! {
mod async_fd;

195 changes: 195 additions & 0 deletions tokio/src/io/poll_aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Use POSIX AIO futures with Tokio
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
use mio::Registry;
use mio::Token;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::task::{Context, Poll};

/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`PollAio`] object.
pub trait AioSource {
/// Register this AIO event source with Tokio's reactor
fn register(&mut self, kq: RawFd, token: usize);

/// Deregister this AIO event source with Tokio's reactor
fn deregister(&mut self);
}

/// Wrap the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);

impl<T: AioSource> Source for MioSource<T> {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}

fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
self.0.deregister();
Ok(())
}

fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}
}

/// Associates a POSIX AIO control block with the reactor that drives it.
///
/// `PollAio`'s wrapped type must implement [`AioSource`] to be driven
/// by the reactor.
///
/// The wrapped source may be accessed through the `PollAio` via the `Deref` and
/// `DerefMut` traits.
///
/// ## Clearing readiness
///
/// If [`PollAio::poll`] returns ready, but the consumer determines that the
/// Source is not completely ready and must return to the Pending state,
/// [`PollAio::clear_ready`] may be used. This can be useful with
/// [`lio_listio`], which may generate a kevent when only a portion of the
/// operations have completed.
///
/// ## Platforms
///
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
/// `PollAio` is only available for that operating system.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
// AsyncFd can't be used for POSIX AIO.
//
// Note that PollAio doesn't implement Drop. There's no need. Unlike other
// kqueue sources, simply dropping the object effectively deregisters it.
pub struct PollAio<E: AioSource> {
io: MioSource<E>,
registration: Registration,
}

// ===== impl PollAio =====

impl<E: AioSource> PollAio<E> {
/// Indicates to Tokio that the source is no longer ready. The internal
/// readiness flag will be cleared, and tokio will wait for the next
/// edge-triggered readiness notification from the OS.
///
/// It is critical that this method not be called unless your code
/// _actually observes_ that the source is _not_ ready. The OS must
/// deliver a subsequent notification, or this source will block
/// forever. It is equally critical that you `do` call this method if you
/// resubmit the same structure to the kernel and poll it again.
///
/// This method is not very useful with AIO readiness, since each `aiocb`
/// structure is typically only used once. It's main use with
/// [`lio_listio`], which will sometimes send notification when only a
/// portion of its elements are complete. In that case, the caller must
/// call `clear_ready` before resubmitting it.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn clear_ready(&self, ev: PollAioEvent) {
self.registration.clear_readiness(ev.0)
}

/// Destroy the [`PollAio`] and return its inner Source
pub fn into_inner(self) -> E {
self.io.0
}

/// Creates a new `PollAio` suitable for use with POSIX AIO functions.
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_for_aio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::AIO)
}

/// Creates a new `PollAio` suitable for use with [`lio_listio`].
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn new_for_lio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::LIO)
}

fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
let handle = Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}

/// Polls for readiness. Either AIO or LIO counts.
///
/// This method returns:
/// * `Poll::Pending` if the underlying operation is not complete, whether
/// or not it completed successfully. This will be true if the OS is
/// still processing it, or if it has not yet been submitted to the OS.
/// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
/// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
/// _not_ indicate that the underlying operation encountered an error.
///
/// When the method returns Poll::Pending, the Waker in the provided Context
/// is scheduled to receive a wakeup when the underlying operation
/// completes. Note that on multiple calls to poll, only the Waker from the
/// Context passed to the most recent call is scheduled to receive a wakeup.
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<PollAioEvent>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid inherent fns named just poll as they would conflict w/ Future. Also, why is this an inherent fn? If it were implemented as a future, then it could be used with .await.

Perhaps, the inherent fn could be named poll_ready (or something) and an equivalent async fn is added async fn ready(&self) -> io::Result<AioEvent>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the principle that "futures should do nothing until polled", I elected to submit the operation to the kernel (aio_write, etc) as part of poll. But that means that it needs to be in the external crate instead of in Tokio. Hence the inherent function. I'll rename it to poll_ready.

let ev = ready!(self.registration.poll_read_ready(cx))?;
Poll::Ready(Ok(PollAioEvent(ev)))
}
}

impl<E: AioSource> Deref for PollAio<E> {
type Target = E;

fn deref(&self) -> &E {
&self.io.0
}
}

impl<E: AioSource> DerefMut for PollAio<E> {
fn deref_mut(&mut self) -> &mut E {
&mut self.io.0
}
}

impl<E: AioSource + fmt::Debug> fmt::Debug for PollAio<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollAio").field("io", &self.io.0).finish()
}
}

/// Opaque data returned by [`PollAio::poll`].
///
/// It can be fed back to [`PollAio::clear_ready`].
#[derive(Debug)]
pub struct PollAioEvent(ReadyEvent);
5 changes: 3 additions & 2 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -307,8 +307,9 @@
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
//! as well as (on Unix-like systems) `AsyncFd`
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and
//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on
//! FreeBSD) `PollAio`.
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
12 changes: 12 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
@@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
}
}

macro_rules! cfg_aio {
($($item:item)*) => {
$(
#[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))]
#[cfg_attr(docsrs,
doc(cfg(all(target_os = "freebsd", feature = "net")))
)]
$item
)*
}
}

macro_rules! cfg_fs {
($($item:item)*) => {
$(
375 changes: 375 additions & 0 deletions tokio/tests/io_poll_aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,375 @@
#![warn(rust_2018_idioms)]
#![cfg(all(target_os = "freebsd", feature = "net"))]

use mio_aio::{AioCb, AioFsyncMode, LioCb};
use std::{
future::Future,
mem,
os::unix::io::{AsRawFd, RawFd},
pin::Pin,
task::{Context, Poll},
};
use tempfile::tempfile;
use tokio::io::poll_aio::{AioSource, PollAio};
use tokio_test::assert_pending;

mod aio {
use super::*;

/// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
struct WrappedAioCb<'a>(AioCb<'a>);
impl<'a> AioSource for WrappedAioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}

/// A very crude implementation of an AIO-based future
struct FsyncFut(PollAio<WrappedAioCb<'static>>);

impl Future for FsyncFut {
type Output = std::io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the PollAio.
let result = (*self.0).0.aio_return();
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
}
}

/// Low-level AIO Source
///
/// An example bypassing mio_aio and Nix to demonstrate how the kevent
/// registration actually works, under the hood.
struct LlSource(Pin<Box<libc::aiocb>>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The libc::aiocb type is Unpin so pinning it doesn't do anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually a kernel requirement, not a Futures requirement, that the libc::aiocb have a stable address, because the kernel uses that address to identify it. Nix gets around the Unpin problem by adding a std::marker::PhantomPinned field. I could do that here too, though it isn't strictly necessary to test Tokio's functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting it in a box is enough here, but I'm also ok with keeping the Pin as it still does serve to document your intent. You may find this interesting.


impl AioSource for LlSource {
fn register(&mut self, kq: RawFd, token: usize) {
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
sev.sigev_notify = libc::SIGEV_KEVENT;
sev.sigev_signo = kq;
sev.sigev_value = libc::sigval {
sival_ptr: token as *mut libc::c_void,
};
self.0.aio_sigevent = sev;
}

fn deregister(&mut self) {
unsafe {
self.0.aio_sigevent = mem::zeroed();
}
}
}

struct LlFut(PollAio<LlSource>);

impl Future for LlFut {
type Output = std::io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
assert_eq!(0, r);
Poll::Ready(Ok(()))
}
}
}
}

/// A very simple object that can implement AioSource and can be reused.
///
/// mio_aio normally assumes that each AioCb will be consumed on completion.
/// This somewhat contrived example shows how a PollAio object can be reused
/// anyway.
struct ReusableFsyncSource {
aiocb: Pin<Box<AioCb<'static>>>,
fd: RawFd,
token: usize,
}
impl ReusableFsyncSource {
fn fsync(&mut self) {
self.aiocb.register_raw(self.fd, self.token);
self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
}
fn new(aiocb: AioCb<'static>) -> Self {
ReusableFsyncSource {
aiocb: Box::pin(aiocb),
fd: 0,
token: 0,
}
}
fn reset(&mut self, aiocb: AioCb<'static>) {
self.aiocb = Box::pin(aiocb);
}
}
impl AioSource for ReusableFsyncSource {
fn register(&mut self, kq: RawFd, token: usize) {
self.fd = kq;
self.token = token;
}
fn deregister(&mut self) {
self.fd = 0;
}
}

struct ReusableFsyncFut<'a>(&'a mut PollAio<ReusableFsyncSource>);
impl<'a> Future for ReusableFsyncFut<'a> {
type Output = std::io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable PollAio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
// futures behave this way.
self.0.clear_ready(ev);
let result = (*self.0).aiocb.aio_return();
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
}
}

#[tokio::test]
async fn fsync() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let aiocb = AioCb::from_fd(fd, 0);
let source = WrappedAioCb(aiocb);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
(*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
let fut = FsyncFut(poll_aio);
fut.await.unwrap();
}

#[tokio::test]
async fn ll_fsync() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
aiocb.aio_fildes = fd;
let source = LlSource(Box::pin(aiocb));
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
let r = unsafe {
let p = (*poll_aio).0.as_mut().get_unchecked_mut();
libc::aio_fsync(libc::O_SYNC, p)
};
assert_eq!(0, r);
let fut = LlFut(poll_aio);
fut.await.unwrap();
}

/// A suitably crafted future type can reuse a PollAio object
#[tokio::test]
async fn reuse() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let aiocb0 = AioCb::from_fd(fd, 0);
let source = ReusableFsyncSource::new(aiocb0);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
poll_aio.fsync();
let fut0 = ReusableFsyncFut(&mut poll_aio);
fut0.await.unwrap();

let aiocb1 = AioCb::from_fd(fd, 0);
poll_aio.reset(aiocb1);
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
assert_pending!(poll_aio.poll(&mut ctx));
poll_aio.fsync();
let fut1 = ReusableFsyncFut(&mut poll_aio);
fut1.await.unwrap();
}
}

mod lio {
use super::*;

struct WrappedLioCb<'a>(LioCb<'a>);
impl<'a> AioSource for WrappedLioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}

/// A very crude lio_listio-based Future
struct LioFut(Option<PollAio<WrappedLioCb<'static>>>);

impl Future for LioFut {
type Output = std::io::Result<Vec<isize>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.as_mut().unwrap().poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the PollAio.
let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
});
Poll::Ready(Ok(r))
}
}
}
}

/// Minimal example demonstrating reuse of a PollAio object with lio
/// readiness. mio_aio::LioCb actually does something similar under the
/// hood.
struct ReusableLioSource {
liocb: Option<LioCb<'static>>,
fd: RawFd,
token: usize,
}
impl ReusableLioSource {
fn new(liocb: LioCb<'static>) -> Self {
ReusableLioSource {
liocb: Some(liocb),
fd: 0,
token: 0,
}
}
fn reset(&mut self, liocb: LioCb<'static>) {
self.liocb = Some(liocb);
}
fn submit(&mut self) {
self.liocb
.as_mut()
.unwrap()
.register_raw(self.fd, self.token);
self.liocb.as_mut().unwrap().submit().unwrap();
}
}
impl AioSource for ReusableLioSource {
fn register(&mut self, kq: RawFd, token: usize) {
self.fd = kq;
self.token = token;
}
fn deregister(&mut self) {
self.fd = 0;
}
}
struct ReusableLioFut<'a>(&'a mut PollAio<ReusableLioSource>);
impl<'a> Future for ReusableLioFut<'a> {
type Output = std::io::Result<Vec<isize>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable PollAio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
// futures behave this way.
self.0.clear_ready(ev);
let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
});
Poll::Ready(Ok(r))
}
}
}
}

/// An lio_listio operation with one write element
#[tokio::test]
async fn onewrite() {
const WBUF: &[u8] = b"abcdef";
let f = tempfile().unwrap();

let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
builder = builder.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
let mut poll_aio = PollAio::new_for_lio(source).unwrap();

// Send the operation to the kernel
(*poll_aio).0.submit().unwrap();
let fut = LioFut(Some(poll_aio));
let v = fut.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());
}

/// A suitably crafted future type can reuse a PollAio object
#[tokio::test]
async fn reuse() {
const WBUF: &[u8] = b"abcdef";
let f = tempfile().unwrap();

let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
builder0 = builder0.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb0 = builder0.finish();
let source = ReusableLioSource::new(liocb0);
let mut poll_aio = PollAio::new_for_aio(source).unwrap();
poll_aio.submit();
let fut0 = ReusableLioFut(&mut poll_aio);
let v = fut0.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());

// Now reuse the same PollAio
let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
builder1 = builder1.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb1 = builder1.finish();
poll_aio.reset(liocb1);
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
assert_pending!(poll_aio.poll(&mut ctx));
poll_aio.submit();
let fut1 = ReusableLioFut(&mut poll_aio);
let v = fut1.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());
}
}