Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example for posting a body for AsyncRead? #879

Open
hamaluik opened this issue Apr 11, 2020 · 17 comments · May be fixed by #888
Open

Example for posting a body for AsyncRead? #879

hamaluik opened this issue Apr 11, 2020 · 17 comments · May be fixed by #888

Comments

@hamaluik
Copy link

reqwest::blocking::Body can be easily built from any Read object. I'm working on trying to convert some of this code to async, but can't for the life of me figure out how to do something similar in async land. I assume you have to use reqwest::Body::wrap_stream, but how to create a proper stream from an tokio::fs::AsyncRead appears to be beyond me as I've been banging my head against this all day. It's easy enough to read the entire AsyncRead into memory and provide it directly, but when uploading a file, that requires loading the entire file into memory. Any tips would be greatly appreciated, and since I doubt I'm the only one attempting this, perhaps adding an example of posting a file with the new async API would help a lot.

Thanks!

@nickkuk
Copy link

nickkuk commented Apr 24, 2020

For now I found that the best way is to use BytesCodec & FramedRead types from tokio-util crate. Internally it minimizes memory allocations, see an investigation here.

So, full code will be:

use reqwest::Body;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};

fn file_to_body(file: File) -> Body {
  let stream = FramedRead::new(file, BytesCodec::new());
  Body::wrap_stream(stream)
}

@nickkuk nickkuk linked a pull request Apr 24, 2020 that will close this issue
@nbari
Copy link

nbari commented Aug 11, 2020

@nickkuk Thanks for the example, any idea how to integrate indicatif? I want to display the current progress.

Is there a way to "do something" (bar.inc(1);) in every chunk from the stream or maybe from Body?

Thanks in advance :-)

@nickkuk
Copy link

nickkuk commented Aug 12, 2020

Hi, @nbari, if you mean file uploading, here I've just tried to create Stream that consumes stream from

FramedRead::new(file, BytesCodec::new())

and calls inc inside. But it works not smoothly, maybe there is some buffering between reqwest and the system. At now I don't know better way.

If you mean file downloading, just call inc inside this loop, but you need to know total file size somehow, not every server gives you content length.

@nbari
Copy link

nbari commented Aug 12, 2020

Thanks, I am using this approach, inspect_ok does the trick:

use futures::stream::TryStreamExt;
use tokio_util::codec::{BytesCodec, FramedRead};

let stream = FramedRead::new(file, BytesCodec::new())
    .inspect_ok(|chunk| {
        // do X with chunk...
    });

let body = Body::wrap_stream(stream);

@nickkuk
Copy link

nickkuk commented Aug 12, 2020

@nbari, I'm tried both approaches, but eventually decided that it is better to call inc for previous chunk, i.e., when reqwest reads just another chunk, we believe that it sends all previous. But it is not the case, probably due to some buffering.

Did you get smoothly increasing progress bar?

@nbari
Copy link

nbari commented Aug 12, 2020

Hi @nickkuk, I am currently working on it, indeed I am stuck with this, https://stackoverflow.com/q/63374041/1135424, any helps is pretty much appreciated.

let stream = if let Some(mut tx) = sender {
    FramedRead::new(file, BytesCodec::new()).inspect_ok(move |chunk|
         tx.send(chunk.len())
    )
} else {
    FramedRead::new(file, BytesCodec::new())
};


// reqwest
let body = Body::wrap_stream(stream);
client.put(url).body(body)
let body = Body::wrap_stream(stream);

I am getting:

`if` and `else` have incompatible types

But for other similar cases (multipart upload), I do get a nice progress bar: https://github.com/s3m/s3m/blob/develop/src/s3m/upload.rs#L55
(currently working/learning so any feedback is more than welcome)

@nickkuk
Copy link

nickkuk commented Aug 12, 2020

I think you can use async-stream crate, that I've already mentioned here. It allows you to write sync-like code with yield instead of using combinators (as map or inspect_ok):

  async_stream::stream! {
    if let Some(mut sender) = sender {
      while let Some(bytes) = stream.next().await {
        if let Ok(bytes) = &bytes {
          sender.send(bytes.len()).await;
        }
        yield bytes;
      }
    } else {
      while let Some(bytes) = stream.next().await {
        yield bytes;
      }
    }
  }

@nbari
Copy link

nbari commented Aug 12, 2020

@nickkuk many thanks for the examples, I was indeed dealing with the if/ else until found I could do:

let stream  =  async_stream::stream! {
...
};

@nbari
Copy link

nbari commented Aug 12, 2020

Hi @nickkuk I found interesting this example:

use tokio::sync::mpsc;
use futures::future::Either;

fn upload(file: String, sender: Option<mpsc::Sender<usize>) {
    let stream = FramedRead::new(file, BytesCodec::new());

    let stream = if let Some(mut tx) = sender {
        Either::Left(stream
            .inspect_ok(move |chunk| tx.send(chunk.len()))
        )
    } else {
        Either::Right(stream)
    };

    let body = Body::wrap_stream(stream);

}

Is not clear for me why It won't compile if I don't add a println!("{}", 0):

Either::Left(stream
    .inspect_ok(move |chunk| {
        tx.send(chunk.len());
        println!("{}",0);
     }))

Also, I think it requires a .await: tx.send(chunk.len()).await; but if I add it I get this error:

 await` is only allowed inside `async` functions and blocks

Just asking to understand & learn more, I indeed became curios because code for me looks cleaner when using Either but would like to know what could be some pros cons against the async-stream create.

Thanks in advance.

@nickkuk
Copy link

nickkuk commented Aug 12, 2020

From documentation of inspect_ok you can see that it expects F: FnOnce(&Self::Ok), which means that a closure must returns ().
But tx.send(chunk.len()) returns impl Future<Output = Result<(), SendError<T>>, so it is not magic of println, but magic of semicolon.

About .await for send - you are right, and I use it in my example. It is needed because your Sender is bounded, so it will be blocked if capacity not enough. You have the following possibilities:

  1. use UnboundedSender, which have send that returns just Result<(), SendError<T>> (i.e., without Future);
  2. use async-stream crate that allows to use .await and yield inside stream! macro;
  3. use another combinator of Stream, e.g., and_then, that expects F: FnOnce(Self::Ok) -> Future.

I suggest to use async-stream because

  • it is more idiomatic for rust;
  • it unsugars to something like Either;
  • it maybe become standard in future: see about activity from rust lang team here.

@nbari
Copy link

nbari commented Aug 12, 2020

Hi @nickkuk many thanks for the explanation and your time on this, taking advantage of the thread, I have one last question regarding this topic, probably too basic but hope you don't mind since I am trying to grasp the concepts of async/await

To increase the progress bar bar.inc(1); or read from the Receiver I am found that I have to call tokio::spawn to do the request and then that will allow me to read from the channel, otherwise, since there is no reader I get locked, I thought that by using already .await I could skip the "spawn", my bad probably since I was thinking that await is like go ( Gorouting from Golang. I end up doing something like this:

let bar = ProgressBar::new(1000);
let (tx, mut rx): (mpsc::Sender<usize>, mpsc::Receiver<usize>) = mpsc::channel(100);

tokio::spawn(async move {
    my_upload(file, Some(tx)).await; // <-- method using `async-stream` uploading the file and returning in the chunks.len()
 });
while let Some(i) = rx.recv().await {
       bar.inc(1);
 }
...

Is think is the way to go, but now with too many new options that I just learn about how to do the same task but with different methods, I wonder if there is something more idiomatic to achieve the same.

nbari added a commit to s3m/s3m that referenced this issue Aug 12, 2020
@nickkuk
Copy link

nickkuk commented Aug 12, 2020

At first I want to clarify that in my last reply I suggested to use async-stream not against UnboundedSender (actually UnboundedSender is more suitable for your task), but against Stream combinators (and even this not always true, sometimes combinators are smaller).

Actually you don't need any tokio::spawn for single file uploading by using tokio::try_join!; see an example here. Even more, you can upload two, three, or more files by this approach (maybe with the same UnboundedReceiver for all files and cloned UnboundedSender). And even unknown number of files can be uploaded without tokio::spawn by using e.g. futures::stream::FuturesUnordered.

tokio::spawn

  • should be used to reduce memory size of Future - every rust Future (i.e., every async fn including async fn main, every bunch of combinators, tokio::try_join!, tokio::select!, futures::stream::FuturesUnordered) is state machine - it consumes more memory if it is bigger;
  • must be used to introduce task parallelism - every spawned rust Future (so called task) is executed on zero (waiting among all tasks or on .await) or one system thread; tokio::spawn creates new task that can be executed on another thread.

For example, tokio::spawn is used in hyper server for every new connection and every new request of connection.

Main drawback of tokio::spawn is that consumed Future must be 'static. There was an attempt to handle this constraint, but it failed.

@nbari
Copy link

nbari commented Aug 12, 2020

@nickkuk many thanks for the examples and sharing the knowledge, I highly appreciate it :-)

@niuhuan
Copy link
Contributor

niuhuan commented Feb 9, 2022

Example for save a response body to file for async, and display download total progress?

I don't know how to use buff to read and write

let rsp = reqwest::Client::new().get("http://aabbcc.com/1.mp4").send().await;
let file = std::fs::File::create(file).unwrap();
let mut buf = [0; 8192];
let mut reader = StreamReader::new(rsp.bytes_stream());
loop {
  // do  read buff , write buff, print r/w size, util eof
}

@frederikhors
Copy link

Hi people!

I have the below code and I'm stucked because I don't know how to post() the AsyncRead, can you help me understand?

#[async_trait::async_trait]
impl Trait for UploadClient {
    async fn put_file(
        &self,
        filename: &str,
        mut reader: Pin<&mut (dyn AsyncRead + Send + Sync)>,
    ) -> Result<u64> {
        // let url = ...;

        let client = reqwest::Client::new();

        let resp = client
            .post(url)
            // Nothing of these works!
            // .body(reader)
            // .body(reqwest::Body::wrap_stream(reader))
            // .body(ReaderStream::new(reader))
            .body(StreamReader::new(reader))
            .send()
            .await?
            .error_for_status()?;

        Ok(123)
    }
}

@nickkuk
Copy link

nickkuk commented Dec 19, 2022

Hey @frederikhors, I think that the current standard approach is to use tokio-util helper crate with BytesCodec and FramedRead structures. See an example here.

@matthiasg
Copy link

@frederikhors your example would not set the upload length though right ?

There are a number of implementations of From<File> for Body etc and even a function for streamwithlength which is helpful when uploading an async tokio file stream.

I am interested in this because i have a ProgressReader (reporting on upload progress) which is AsyncReader which can wrap a file stream etc, but i need it converted to a body to send it.

With some of the other examples above I am worried about backpressure and whether this is correctly handled. Based on the way async reads are handled I am pretty sure AsyncRead will always be pull based and thus handle pressure automatically.

In general, tracking progress (optionally) for sending a request should be something built in.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants