Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust ws server asset protocol #229

Merged
merged 18 commits into from
Feb 28, 2025

Conversation

eloff
Copy link
Contributor

@eloff eloff commented Feb 20, 2025

This introduces another callback on ServerListener:

fn on_fetch_asset(&self, _uri: String, _responder: AssetResponder) {}

Where the AssetResponder has two methods

pub fn send_error(&self, error: &str)
pub fn send_data(&self, asset: &[u8])

And a public Client property. Alternately this could be private and we could pass the client as the first argument like the other callbacks. I don't feel strongly about which is better.

I added an example showing how to implement an asset server, although it doesn't show how the client would work. Not sure if I should add a client as well?

I also added a way to send status messages to a specific client on Client, because callbacks in general lack a way to give feedback about errors or warnings.

pub fn send_status(&self, status: Status)

Which mirrors publish_status on the server handle.

@eloff eloff requested review from jtbandes and bryfox February 20, 2025 17:17
Copy link

linear bot commented Feb 20, 2025

@jtbandes jtbandes requested a review from achim-k February 20, 2025 17:35
Copy link
Contributor

@bryfox bryfox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of the AssetResponder over the approach used by services to return Bytes? I like this approach, but it seems like a new pattern for a user to learn.

I'm also still trying to understand how this will work if the user's implementation needs to do some slow work to get the asset. With python specifically, I have a similar question with services (#230) — it seems like we want to have some sort of out-of-band delivery of responses (services or assets) that correlate by call_id/request_id. Maybe this isn't a use case we need to support yet, though.

}

/// Send a successful response to the client with the asset.
pub fn send_data(&self, asset: &[u8]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is sync, how does it play with longer-running responses? Can I respond with assets downloaded from s3, for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can download the asset in a background thread or in a new tokio task, and then call this when it's fully buffered.

@eloff
Copy link
Contributor Author

eloff commented Feb 21, 2025

What's the advantage of the AssetResponder over the approach used by services to return Bytes? I like this approach, but it seems like a new pattern for a user to learn.

You mean take a Result with a single respond method instead of one each for success and failure? Yeah I can align that.

I'm also still trying to understand how this will work if the user's implementation needs to do some slow work to get the asset. With python specifically, I have a similar question with services (#230) — it seems like we want to have some sort of out-of-band delivery of responses (services or assets) that correlate by call_id/request_id. Maybe this isn't a use case we need to support yet, though.

ServerListener has a note:

These methods are invoked from the client's main poll loop and must not block. If blocking or
long-running behavior is required, the implementation should use [tokio::task::spawn] (or
[tokio::task::spawn_blocking]).

The only callbacks that return a value are on_get_parameters and on_set_parameters which are expected to be quick.

Service calls and asset responders use this responder object pattern where you can launch the work in a thread pool, or an async runtime, and then call respond() when the result is ready.

For cheap handlers this works well, as it does for on_get_parameters/on_set_parameters where we need the result right away. For expensive handlers it means the user has to do some kind of async or threads themselves.

Is that better than doing it for them by calling the handlers from the blocking thread pool to begin with?

@bryfox
Copy link
Contributor

bryfox commented Feb 21, 2025

The only callbacks that return a value are on_get_parameters and on_set_parameters which are expected to be quick.

The service handler has a sync_handler_fn, and there's no equivalent for assets. Maybe I'm making too many assumptions about Python consumers of the SDK, but I'd expect most to reach for the sync version (which is what I started with in #230). Maybe we can make the asset responder version feel natural there too.

I added an example showing how to implement an asset server, although it doesn't show how the client would work. Not sure if I should add a client as well?

In my opinion, right now we should focus on examples targeting the Foxglove app as the client, rather than building out custom clients. e.g., how could this be used to load a URDF into the 3d panel for live viz?

@eloff
Copy link
Contributor Author

eloff commented Feb 21, 2025

The only callbacks that return a value are on_get_parameters and on_set_parameters which are expected to be quick.

The service handler has a sync_handler_fn, and there's no equivalent for assets. Maybe I'm making too many assumptions about Python consumers of the SDK, but I'd expect most to reach for the sync version (which is what I started with in #230). Maybe we can make the asset responder version feel natural there too.

That'd be an anti-pattern with the fetch asset API. You're not supposed to block for long, so if you returned it as a Result we'd need to call that via spawn_blocking and document that as an exception from the other handler methods. But maybe that's better than requiring the fetch asset handler to take responsibility for doing its work in the background.

For async you'd need a different interface (ideally an async handler). I can't think of a nice way to structure that code via the ServerListener. It works for services because they don't use that interface. We could do a similar dedicated API for fetch asset with a similar interface.

I can rewrite it like that if it makes sense.

I added an example showing how to implement an asset server, although it doesn't show how the client would work. Not sure if I should add a client as well?

In my opinion, right now we should focus on examples targeting the Foxglove app as the client, rather than building out custom clients. e.g., how could this be used to load a URDF into the 3d panel for live viz?

That makes sense. I don't know how to do that in the app, I'll look into it.

Comment on lines 801 to 802
// Invoke the handler.
service.call(Client(self), request, responder);
service.call(Client::new(self), request, responder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also rework the service Responder to take a Client instead of Arc<ConnectedClient>, I suppose. That can be a separate change.

@gasmith
Copy link
Contributor

gasmith commented Feb 21, 2025

What's the advantage of the AssetResponder over the approach used by services to return Bytes? I like this approach, but it seems like a new pattern for a user to learn.

You mean take a Result with a single respond method instead of one each for success and failure? Yeah I can align that.

Yeah, might be nice to use the same pattern as the service Responder::respond method, which takes a Result<Bytes, String>.

For async you'd need a different interface (ideally an async handler). I can't think of a nice way to structure that code via the ServerListener. It works for services because they don't use that interface. We could do a similar dedicated API for fetch asset with a similar interface.

Maybe? A big motivation for doing services differently was to accommodate multiple unrelated service implementations, without forcing the user to build a routing layer. We don't have that same motivation here, since there's at most one implementation for fetching assets.

If we were to offer a separate FetchAsset trait, the main benefit would be the sync implementation sugaring we added for service handlers, and the ability for clients to decouple their implementation from other ServiceListener callbacks. Maybe those are nice to have, but they don't feel like big wins...

The bigger issue is about blocking the client's poll loop, which is an awfully subtle footgun. Maybe the on_fetch_asset and service::Handler::call callbacks should be async, and we should unconditionally spawn a new tokio task to invoke them. A nice benefit of doing things that way (with the spawn on our side of the fence), is that we can easily track and kill those tasks when the client disconnects (or we otherwise reset session state). The main downside is the latency we incur for spawning a task when we don't actually need one (e.g., because the implementation is sync, or needs spawn_blocking).

@eloff
Copy link
Contributor Author

eloff commented Feb 23, 2025

What's the advantage of the AssetResponder over the approach used by services to return Bytes? I like this approach, but it seems like a new pattern for a user to learn.

You mean take a Result with a single respond method instead of one each for success and failure? Yeah I can align that.

Yeah, might be nice to use the same pattern as the service Responder::respond method, which takes a Result<Bytes, String>.

For async you'd need a different interface (ideally an async handler). I can't think of a nice way to structure that code via the ServerListener. It works for services because they don't use that interface. We could do a similar dedicated API for fetch asset with a similar interface.

Maybe? A big motivation for doing services differently was to accommodate multiple unrelated service implementations, without forcing the user to build a routing layer. We don't have that same motivation here, since there's at most one implementation for fetching assets.

If we were to offer a separate FetchAsset trait, the main benefit would be the sync implementation sugaring we added for service handlers, and the ability for clients to decouple their implementation from other ServiceListener callbacks. Maybe those are nice to have, but they don't feel like big wins...

The bigger issue is about blocking the client's poll loop, which is an awfully subtle footgun. Maybe the on_fetch_asset and service::Handler::call callbacks should be async, and we should unconditionally spawn a new tokio task to invoke them. A nice benefit of doing things that way (with the spawn on our side of the fence), is that we can easily track and kill those tasks when the client disconnects (or we otherwise reset session state). The main downside is the latency we incur for spawning a task when we don't actually need one (e.g., because the implementation is sync, or needs spawn_blocking).

Yeah these are exactly the kinds of considerations I've been thinking about.

I think ideally we take a sync or async callback somehow and then invoke it ourselves via spawn_blocking or spawn, and document that in the API. So no footgun, and the user can write their code according to whatever works best (e.g. sync for filesystem or Python, or async for a rust network operation), and we can "cancel" it when the client disconnects.

I think that isn't easily done via the ServerListener, we'd need something like:

ServerBuilder::new()
    .fetch_asset_handler(sync_callback)`
   
// Or
    .fetch_asset_async_handler(async_callback)

Which can be implemented something like:

use std::future::Future;
use tokio::task::{spawn, spawn_blocking};

/// Define an enum to distinguish between sync and async callbacks
enum Callback<ResultType> {
    Sync(fn() -> Result<ResultType, String>),
    Async(fn() -> impl Future<Output = Result<ResultType, String>> + Send + 'static),
}

impl<ResultType: Send + 'static> Callback<ResultType> {
    /// Execute the callback correctly based on its type
    async fn execute(self) -> Result<ResultType, String> {
        match self {
            Self::Sync(sync_fn) => {
                // Run sync function in a blocking thread
                spawn_blocking(move || sync_fn()).await.unwrap_or_else(|e| Err(format!("Task join error: {}", e)))
            }
            Self::Async(async_fn) => {
                // Run async function in an async task
                spawn(async_fn()).await.unwrap_or_else(|e| Err(format!("Task join error: {}", e)))
            }
        }
    }
}

#[tokio::main]
async fn main() {
    // Define a synchronous callback
    let sync_callback = Callback::Sync(|| {
        println!("Sync callback running");
        Ok(24)
    });

    // Define an asynchronous callback
    let async_callback = Callback::Async(|| async {
        println!("Async callback running");
        Ok(42)
    });

    // Execute both callbacks
    let res1 = sync_callback.execute().await;
    println!("Sync callback result: {:?}", res1);

    let res2 = async_callback.execute().await;
    println!("Async callback result: {:?}", res2);
}

@gasmith
Copy link
Contributor

gasmith commented Feb 24, 2025

The bigger issue is about blocking the client's poll loop, which is an awfully subtle footgun. Maybe the on_fetch_asset and service::Handler::call callbacks should be async, and we should unconditionally spawn a new tokio task to invoke them.

I think that isn't easily done via the ServerListener, we'd need something like:

ServerBuilder::new()
    .fetch_asset_handler(sync_callback)`
   
// Or
    .fetch_asset_async_handler(async_callback)

As we discussed this morning, I think we should try to keep it simple, and lean towards async, since that's what we expect rust devs to be doing anyway.

enum Callback<ResultType> {
    Sync(fn() -> Result<ResultType, String>),
    Async(fn() -> impl Future<Output = Result<ResultType, String>> + Send + 'static),
}

I don't think it's possible to describe fn pointers with an impl return. But even if it were, I think I'd like to keep the callback interface as a trait, for wider flexibility. Then we can provide wrapper structs for sugaring concrete use cases, like we did for service callbacks.

We could follow the pattern that tower::Service uses, and describe the future as an associated type. That should be less finicky than async-trait (with its pin-box-ness), or native async traits (which are still not dyn-compatible). Something like:

trait Call {
    type Error;
    type Future: Future<Output = Result<(), Self::Error>>;

    fn call(&self) -> Self::Future;
}

// Sugaring for an async function or block.
struct CallFn<F, Fut>(F)
where
    F: Fn() -> Fut,
    Fut: Future<Output = anyhow::Result<()>>;

impl<F, Fut> Call for CallFn<F, Fut>
where
    F: Fn() -> Fut,
    Fut: Future<Output = anyhow::Result<()>>,
{
    type Error = anyhow::Error;
    type Future = Fut;

    fn call(&self) -> Self::Future {
        self.0()
    }
}

// It's dyn-compatible, so long as the function and future are 'static.
impl<F, Fut> CallFn<F, Fut>
where
    F: Fn() -> Fut + 'static,
    Fut: Future<Output = anyhow::Result<()>> + 'static,
{
    fn boxed(self) -> Box<dyn Call<Error = anyhow::Error, Future = Fut>> {
        Box::new(self)
    }
}

// Examples follow...
async fn handler() -> anyhow::Result<()> {
    println!("hello world");
    Ok(())
}

fn main() {
    let call1 = CallFn(|| async {
        println!("hello world");
        Ok(())
    });
    let call2 = CallFn(handler);
}

@eloff eloff requested a review from gasmith February 26, 2025 05:30
Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great!

Comment on lines 21 to 23
impl<F> AssetHandler for SyncAssetHandlerFn<F>
where
F: Fn(Client, String) -> FetchAssetResult + Send + Sync + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to make this generic over the error type, so long as it implements Display. That would also be consistent with the service handler function wrappers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I tried it and I didn't like this, because it forces the user to specify the error type in the success case, because it can't be inferred. I think it's fine to get the user to call to_string() themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the case where you have an infallible handler implemented as a closure is going to be pretty rare. Most of the time you're either going to have a function (where the error type is explicitly declared) or the implementation will be fallible (in which case the error type is inferred). Plus, you'll often have more than one place in your handler where you need to return an error. I'd much rather do this:

.fetch_asset_handler_async_fn(|_, url| async move {
    reqwest::get(url).await?.bytes().await
})

than this:

.fetch_asset_handler_async_fn(|_, url| async move {
    reqwest::get(url)
        .await
        .map_err(|e| e.to_string())?
        .bytes()
        .await
        .map_err(|e| e.to_string())
})

/// Fetch an asset with the given uri and return it via the responder.
/// Fetch should not block, it should call `runtime.spawn`
/// or `runtime.spawn_blocking` to do the actual work.
fn fetch(self: Arc<Self>, _runtime: &Handle, _uri: String, _responder: AssetResponder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing Arc<Self> is a little unusual. I wonder whether we might just bury another Arc inside the Fn structs instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think any implementation of the trait would also need to be in an Arc, and this is the only way to enforce that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need the Arc if you spawn.

Consider a server that prefetches assets on startup, and serves them up from memory. That could be totally synchronous.

@eloff eloff requested a review from gasmith February 27, 2025 00:14
Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think the callback wrappers should be generic over the error type, and that we don't really need the Arc receiver in the trait. Other than that, LG.

@eloff
Copy link
Contributor Author

eloff commented Feb 27, 2025

I still think the callback wrappers should be generic over the error type, and that we don't really need the Arc receiver in the trait. Other than that, LG.

I got rid of the Arc receiver. The user is almost always going to need to use an Arc, because the trait requires them to spawn unless they've got the data already sitting in memory somehow. They're going to need to do a Foo { inner: Arc<InnerFoo> } instead, which is maybe not as obvious without the Arc in the trait, and a little ugly. We also have a double indirect Box<Arc<T>>, but I doubt that will ever matter. I'm not convinced this was an improvement for the user, but it's not a such a big deal, we can keep it like this.

I'm not convinced about the generic Err. I tried it again to see what the error was, but I don't see a nice way around it:

If I use Err: Display, then if one implements AssetHandler::fetch and calls respond, it has to look like this:

fn fetch(self: Arc<Self>, uri: String, responder: AssetResponder) {
        match self.assets.get(&uri) {
            Some(asset) => responder.respond(Ok::<_, String>(Bytes::copy_from_slice(asset))),
            None => responder.respond(Err(format!("Asset {} not found", uri))),
        }
    }

You have to specify the type of of the unused Error here, which is strange and not intuitive. What do you choose, there is no error! Didn't you run into this as well?

Some(asset) => responder.respond(Ok(Bytes::copy_from_slice(asset))),
   |                                      ------- ^^ cannot infer type of the type parameter `E` declared on the enum `Result`

Without that you'd need to return .map_err(|e|e.t_string()), which isn't as convenient but at least it's clear from the compile error what to do.

We should make these consistent between the service handler and asset handler, so let's talk about it after you read this and settle on a direction.

@eloff eloff requested a review from gasmith February 27, 2025 22:22
@gasmith
Copy link
Contributor

gasmith commented Feb 27, 2025

If I use Err: Display, then if one implements AssetHandler::fetch and calls respond, it has to look like this:

I think AssetResponder::respond should still take a concrete Result<Bytes, String>. I was only proposing the generic error for the function call wrappers. The service handler is written the same way.

gasmith added a commit that referenced this pull request Feb 27, 2025
Borrow some good ideas for wrapping callbacks from @eloff's asset
handler work in #229.

In particular, I love the idea of handling the `tokio::spawn` and
`tokio::task::spawn_blocking` on our side, because it handles the 99%
case beautifully. Folks that want even finer control can always
implement `Handler` themselves.

Changes here:
- Remove `handler_fn` (which took `(Request, Responder)` as arguments).
- Rename `sync_handler_fn` as `handler_fn`.
- Add `blocking_handler_fn` and `async_handler_fn` for blocking and
async function handlers.
- Remove some useless `'static` bounds.
@eloff
Copy link
Contributor Author

eloff commented Feb 27, 2025

If I use Err: Display, then if one implements AssetHandler::fetch and calls respond, it has to look like this:

I think AssetResponder::respond should still take a concrete Result<Bytes, String>. I was only proposing the generic error for the function call wrappers. The service handler is written the same way.

Oh, that makes sense. I couldn't see it.

Copy link
Contributor

@gasmith gasmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚢

Co-authored-by: Greg Smith <gasmith@foxglove.dev>
@eloff eloff merged commit 4c14cd6 into main Feb 28, 2025
35 of 36 checks passed
@eloff eloff deleted the dan/fg-9676-rust-ws-server-support-asset-protocol branch February 28, 2025 01:52
jtbandes pushed a commit that referenced this pull request Mar 1, 2025
This test was moved into `src/websocket/semaphore.rs` in #229.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants