Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support converting WritableStream to AsyncWrite #9

Closed
leonbotros opened this issue Dec 2, 2021 · 7 comments · Fixed by #10
Closed

Support converting WritableStream to AsyncWrite #9

leonbotros opened this issue Dec 2, 2021 · 7 comments · Fixed by #10
Labels
question Further information is requested
Milestone

Comments

@leonbotros
Copy link

I'm trying to use this library and wasm-bindgen to connect the Streams API with my rust library (specifically, encryption).
The goal is to use these bindings in web apps/or email clients for example. This way we pass the result of fetch() or a mail body straight into decryption.

My library offers a simple asynchronous API that looks like this:

pub async fn seal<R, W>(
<some encryption parameters>,
    r: R,
    w: W,
) -> Result<(), Error>
where
    R: AsyncRead + Unpin,
    W: AsyncWrite + Unpin,

Basically the plaintext is read from an AsyncRead and the ciphertext is written to an AsyncWrite.
Converting the JS ReadableStream to a Rust AsyncRead was easy as this was features in the examples:

let readable = ReadableStream::from_raw(read.dyn_into().unwrap_throw()).into_async_read();

This readable can then be used in the function our library exposes. Great!

However, converting the WritableStream parameter given by the JS side to an AsyncWrite is not so easy.
Converting the JS stream to a Sink via the into_sink() method seems like a logical first step.

let mut sink = WritableStream::from_raw(writable.dyn_into().unwrap_throw()).into_sink();

If I understand correctly we end up with a Sink to which we can send JsValues. Now I see no way to convert this into an AsyncWrite to use in my library. Rust's futures::Sink and other helpers (e.g., tokio-util) only support converting from AsyncRead/AsyncWrite into either Stream or Sink, but I cannot find the inverses of these adapters. Should my library expose an API using Stream/Sink instead of AsyncRead/AsyncWrite? Any other ideas?

@MattiasBuelens MattiasBuelens added the question Further information is requested label Dec 2, 2021
@MattiasBuelens
Copy link
Owner

Now I see no way to convert this into an AsyncWrite to use in my library. Rust's futures::Sink and other helpers (e.g., tokio-util) only support converting from AsyncRead/AsyncWrite into either Stream or Sink, but I cannot find the inverses of these adapters.

Indeed, the futures crate only supports converting from Stream to AsyncRead (via TryStreamExt::into_async_read()) and from AsyncWrite to Sink (via AsyncWriteExt::into_sink()). There are no standard methods to go from AsyncRead to Stream, or from Sink to AsyncWrite.

To convert from Sink to AsyncWrite, you need an adapter that implements AsyncWrite using a Sink. But there's a bit of a mismatch between the two:

  • With AsyncWrite, you only get a borrowed slice &'a [u8] that lasts for the duration of the write (see AsyncWriteExt::write()).
  • A Sink expects an owned item like a Box<[u8]> (see SinkExt::send()).

It's possible to write such an adapter, but you'll need to copy the borrowed slice from AsyncWrite::poll_write into a new boxed slice or Vec<u8> before you can send it to the underlying Sink. Something like this could work:

use std::pin::Pin;
use std::task::{Context, Poll};

use futures::io::AsyncWrite;
use futures::ready;
use futures::sink::Sink;
use pin_project_lite::pin_project;

pin_project! {
    #[derive(Debug)]
    #[must_use = "writers do nothing unless polled"]
    pub struct IntoAsyncWrite<Si>
    {
        #[pin]
        sink: Si
    }
}

impl<Si> IntoAsyncWrite<Si> {
    #[inline]
    pub(super) fn new(sink: Si) -> Self {
        Self { sink }
    }
}

impl<Si> AsyncWrite for IntoAsyncWrite<Si>
where
    Si: Sink<Box<[u8]>, Error = std::io::Error>,
{
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        let mut this = self.project();
        ready!(this.sink.as_mut().poll_ready(cx))?;
        this.sink.as_mut().start_send(buf.into())?; // buf.into() copies the slice to a `Box<[u8]>`
        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        let mut this = self.project();
        this.sink.as_mut().poll_flush(cx).map_ok(|_| ())
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        let mut this = self.project();
        this.sink.as_mut().poll_close(cx).map_ok(|_| ())
    }
}

This allows you to turn any Sink<Box<[u8]>, Error = std::io::Error> into an AsyncWrite.

Of course, to get such a Sink, you have to do some additional conversions first:

let sink = writable
    .into_sink()
    .with(|buf: Box<[u8]>| -> futures::future::Ready<Result<JsValue, JsValue>> {
        futures::future::ready(Ok(Uint8Array::from(buf.as_ref()).into()))
    })
    // TODO if _err is a JS `Error` object, use its error message in the std::io::Error
    .sink_map_err(|_err| std::io::Error::new(ErrorKind::Other, "foo"));
let async_write = IntoAsyncWrite::new(sink);

I don't know if this is something wasm-streams should provide, or if this should go into a separate utility crate (or perhaps even added to futures-rs or tokio-util?).

  • On the one hand, it's not specific to Web Streams. You could use this in any Rust project that uses Sink and AsyncWrite. There's also no such thing as "writable byte streams" yet in Web Streams, so it feels a bit weird to already add a dedicated .into_async_write() method.
  • On the other hand, we could remove a copy if we did Uint8Array::from(buf) immediately inside IntoAsyncWrite, rather than copying to a Box<[u8]> and then copying again inside .with().

Not sure yet... 🤔

@MattiasBuelens
Copy link
Owner

I decided that it is worth adding to wasm-streams, see #10. 😉

@MattiasBuelens MattiasBuelens added this to the v0.2.2 milestone Dec 2, 2021
@leonbotros
Copy link
Author

Well, that was fast! I'll test it and get back to you 😃

@MattiasBuelens
Copy link
Owner

Turns out that async_io_stream also supports converting a Sink to an AsyncWrite, and their implementation is very similar to what I came up with in #9 (comment). So that's reassuring. 🙂

I'm still going to ship my own version though, just to avoid that extra copy when going from &[u8] to Uint8Array. 😛

@leonbotros
Copy link
Author

Hi again. Ha, I missed that crate. That is reassuring indeed.

I have tested this in my code and it seems to work fine! :)
I'm still working on a version of my application that uses this branch of wasm_stream to process somewhat larger data streams (e.g., from a file). It might help us give an indication of performance since I can compare it to a pure JS implementation that uses only web streams.

@leonbotros
Copy link
Author

It seems to perform reasonably well too (at least, my bottleneck is Web Crypto not supporting either streaming authenticated encryption or an incremental hash API).

@MattiasBuelens
Copy link
Owner

That's great to hear! I'll get this merged and shipped into a new release then. 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants