Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropping stream without delay? #7

Closed
SionoiS opened this issue Sep 18, 2021 · 2 comments · Fixed by #8
Closed

Dropping stream without delay? #7

SionoiS opened this issue Sep 18, 2021 · 2 comments · Fixed by #8
Labels
bug Something isn't working
Milestone

Comments

@SionoiS
Copy link

SionoiS commented Sep 18, 2021

Hello!

I'm trying to drop a stream and I get this error.

index-7a824d050b9c0592.js:645 Uncaught (in promise) TypeError: Failed to execute 'releaseLock' on 'ReadableStreamDefaultReader': Cannot release a readable stream reader when it still has outstanding read() calls that have not yet settled
    at index-7a824d050b9c0592.js:645
    at handleError (index-7a824d050b9c0592.js:254)
    at imports.wbg.__wbg_releaseLock_7d4190d57c8409be (index-7a824d050b9c0592.js:644)
    at wasm_streams::readable::sys::ReadableStreamGenericReader::release_lock::hcfbe7420eb03a3d1 (index-7a824d050b9c0592_bg.wasm:0x6c900f)
    at wasm_streams::readable::default_reader::ReadableStreamDefaultReader::release_lock_mut::h93647f378fa93bb1 (index-7a824d050b9c0592_bg.wasm:0x8fab11)
    at <wasm_streams::readable::default_reader::ReadableStreamDefaultReader as core::ops::drop::Drop>::drop::hd17e458a60cd2b2a (index-7a824d050b9c0592_bg.wasm:0x9c2007)
    at core::ptr::drop_in_place<wasm_streams::readable::default_reader::ReadableStreamDefaultReader>::h4e88cc6be1f859ae (index-7a824d050b9c0592_bg.wasm:0x99f4e7)
    at core::ptr::drop_in_place<core::option::Option<wasm_streams::readable::default_reader::ReadableStreamDefaultReader>>::h8dd9e5cd6471130c (index-7a824d050b9c0592_bg.wasm:0x92417f)
    at core::ptr::drop_in_place<wasm_streams::readable::into_stream::IntoStream>::h3d9de5d2efa44b98 (index-7a824d050b9c0592_bg.wasm:0x95eb60)
    at core::ptr::drop_in_place<futures_util::stream::stream::map::Map<wasm_streams::readable::into_stream::IntoStream,reqwest::wasm::response::Response::bytes_stream::{{closure}}>>::h33896a440b976ee3 (index-7a824d050b9c0592_bg.wasm:0x9bbc68)

Here's the code in Reqwest for bytes_stream()

pub fn bytes_stream(self) -> impl futures_core::Stream<Item = crate::Result<Bytes>> {
    let body = ReadableStream::from_raw(self.http.into_body().body().unwrap().unchecked_into());

    body.into_stream().map(|buf_js| {
        let buffer = Uint8Array::new(
            &buf_js
                .map_err(crate::error::wasm)
                .map_err(crate::error::decode)?,
        );
        let mut bytes = vec![0; buffer.length() as usize];
        buffer.copy_to(&mut bytes);
        Ok(bytes.into())
    })
}

My code

async fn pubsub_stream<U>(
        &self,
        topic: U,
        cb: Callback<Result<(String, Vec<u8>)>>,
        abort_regis: AbortRegistration,
    ) -> Result<()>
    where
        U: Into<Cow<'static, str>>,
    {
        let url = self.base_url.join("pubsub/sub")?;

        let response = self
            .client
            .post(url)
            .query(&[("arg", &topic.into())])
            .send()
            .await?;

        let stream = response.bytes_stream();

        let line_stream = stream.err_into().into_async_read().lines();

        let mut drop_stream = Abortable::new(line_stream, abort_regis);

        while let Some(line) = drop_stream.try_next().await? {
           //if I break here the stream is dropped without error
           //Do stuff...
        }

        Ok(())
    }

Do I have to wait for new data to come in to drop the stream or is there away to drop it right when I don't need it anymore?

@MattiasBuelens
Copy link
Owner

It looks like the current implementation of into_async_read() doesn't cancel the stream on Drop. But if we don't cancel, then releaseLock() will throw because there's still a pending read request (as per spec). Hmmm... 🤔

I suppose that, at least for ReadableStream.into_async_read(), dropping the returned AsyncRead should cancel the stream, since into_async_read() consumes the entire ReadableStream. I'm not sure if the same should apply to ReadableStreamBYOBReader.into_async_read() though, because ReadableStreamBYOBReader only borrows the stream and you could acquire a new reader afterwards. (Same logic applies to .into_stream() for a ReadableStreamDefaultReader.)

So yes, this is not a bug in your code, but a missing feature in wasm-streams. I'll see what I can do to fix it. 🙂

@SionoiS
Copy link
Author

SionoiS commented Sep 18, 2021

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants