Skip to content

Commit

Permalink
ObjectStore WASM32 Support
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 3, 2025
1 parent 88eaa33 commit 871e29d
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ jobs:
- name: Build wasm32-unknown-unknown
run: cargo build --target wasm32-unknown-unknown
- name: Build wasm32-wasip1
run: cargo build --target wasm32-wasip1
run: cargo build --all-features --target wasm32-wasip1

windows:
name: cargo test LocalFileSystem (win64)
Expand Down
6 changes: 2 additions & 4 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::aws::{
AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, S3ConditionalPut, S3CopyIfNotExists,
STORE,
};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{http_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use base64::prelude::BASE64_STANDARD;
Expand Down Expand Up @@ -896,9 +896,7 @@ impl AmazonS3Builder {
self.parse_url(&url)?;
}

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = http_connector(self.http_connector)?;

let bucket = self.bucket_name.ok_or(Error::MissingBucketName)?;
let region = self.region.unwrap_or_else(|| "us-east-1".to_string());
Expand Down
4 changes: 4 additions & 0 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ mod client;
mod credential;
mod dynamo;
mod precondition;

#[cfg(not(target_arch = "wasm32"))]
mod resolve;

pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
pub use checksum::Checksum;
pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};

#[cfg(not(target_arch = "wasm32"))]
pub use resolve::resolve_bucket_region;

/// This struct is used to maintain the URI path encoding
Expand Down
6 changes: 2 additions & 4 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::azure::credential::{
ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider,
};
use crate::azure::{AzureCredential, AzureCredentialProvider, MicrosoftAzure, STORE};
use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{http_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use percent_encoding::percent_decode_str;
Expand Down Expand Up @@ -907,9 +907,7 @@ impl MicrosoftAzureBuilder {
Arc::new(StaticCredentialProvider::new(credential))
};

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = http_connector(self.http_connector)?;

let (is_emulator, storage_url, auth, account) = if self.use_emulator.get()? {
let account_name = self
Expand Down
1 change: 1 addition & 0 deletions object_store/src/client/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl HttpRequestBody {
Self(Inner::Bytes(Bytes::new()))
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn into_reqwest(self) -> reqwest::Body {
match self.0 {
Inner::Bytes(b) => b.into(),
Expand Down
34 changes: 33 additions & 1 deletion object_store/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,14 @@ impl HttpError {
}

pub(crate) fn reqwest(e: reqwest::Error) -> Self {
#[cfg(not(target_arch = "wasm32"))]
let is_connect = || e.is_connect();
#[cfg(target_arch = "wasm32")]
let is_connect = || false;

let mut kind = if e.is_timeout() {
HttpErrorKind::Timeout
} else if e.is_connect() {
} else if is_connect() {
HttpErrorKind::Connect
} else if e.is_decode() {
HttpErrorKind::Decode
Expand Down Expand Up @@ -200,6 +205,7 @@ impl HttpClient {
}

#[async_trait]
#[cfg(not(target_arch = "wasm32"))]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
let (parts, body) = req.into_parts();
Expand Down Expand Up @@ -227,11 +233,37 @@ pub trait HttpConnector: std::fmt::Debug + Send + Sync + 'static {
/// [`HttpConnector`] using [`reqwest::Client`]
#[derive(Debug, Default)]
#[allow(missing_copy_implementations)]
#[cfg(not(target_arch = "wasm32"))]
pub struct ReqwestConnector {}

#[cfg(not(target_arch = "wasm32"))]
impl HttpConnector for ReqwestConnector {
fn connect(&self, options: &ClientOptions) -> crate::Result<HttpClient> {
let client = options.client()?;
Ok(HttpClient::new(client))
}
}

#[cfg(target_arch = "wasm32")]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Err(crate::Error::NotSupported {
source: "WASM32 architectures must provide an HTTPConnector"
.to_string()
.into(),
}),
}
}

#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn http_connector(
custom: Option<Arc<dyn HttpConnector>>,
) -> crate::Result<Arc<dyn HttpConnector>> {
match custom {
Some(x) => Ok(x),
None => Ok(Arc::new(ReqwestConnector {})),
}
}
22 changes: 16 additions & 6 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
pub(crate) mod backoff;

#[cfg(not(target_arch = "wasm32"))]
mod dns;

#[cfg(test)]
Expand Down Expand Up @@ -48,22 +49,25 @@ pub use body::{HttpRequest, HttpRequestBody, HttpResponse, HttpResponseBody};
pub(crate) mod builder;

mod connection;
pub use connection::{
HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService, ReqwestConnector,
};
pub(crate) use connection::http_connector;
#[cfg(not(target_arch = "wasm32"))]
pub use connection::ReqwestConnector;
pub use connection::{HttpClient, HttpConnector, HttpError, HttpErrorKind, HttpService};

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub(crate) mod parts;

use async_trait::async_trait;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, ClientBuilder, NoProxy, Proxy};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

#[cfg(not(target_arch = "wasm32"))]
use reqwest::{NoProxy, Proxy};

use crate::config::{fmt_duration, ConfigValue};
use crate::path::Path;
use crate::{GetOptions, Result};
Expand Down Expand Up @@ -195,8 +199,10 @@ impl FromStr for ClientConfigKey {
/// This is used to configure the client to trust a specific certificate. See
/// [Self::from_pem] for an example
#[derive(Debug, Clone)]
#[cfg(not(target_arch = "wasm32"))]
pub struct Certificate(reqwest::tls::Certificate);

#[cfg(not(target_arch = "wasm32"))]
impl Certificate {
/// Create a `Certificate` from a PEM encoded certificate.
///
Expand Down Expand Up @@ -243,6 +249,7 @@ impl Certificate {
#[derive(Debug, Clone)]
pub struct ClientOptions {
user_agent: Option<ConfigValue<HeaderValue>>,
#[cfg(not(target_arch = "wasm32"))]
root_certificates: Vec<Certificate>,
content_type_map: HashMap<String, String>,
default_content_type: Option<String>,
Expand Down Expand Up @@ -276,6 +283,7 @@ impl Default for ClientOptions {
// we opt for a slightly higher default timeout of 30 seconds
Self {
user_agent: None,
#[cfg(not(target_arch = "wasm32"))]
root_certificates: Default::default(),
content_type_map: Default::default(),
default_content_type: None,
Expand Down Expand Up @@ -402,6 +410,7 @@ impl ClientOptions {
///
/// This can be used to connect to a server that has a self-signed
/// certificate for example.
#[cfg(not(target_arch = "wasm32"))]
pub fn with_root_certificate(mut self, certificate: Certificate) -> Self {
self.root_certificates.push(certificate);
self
Expand Down Expand Up @@ -614,8 +623,9 @@ impl ClientOptions {
.with_connect_timeout(Duration::from_secs(1))
}

pub(crate) fn client(&self) -> Result<Client> {
let mut builder = ClientBuilder::new();
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn client(&self) -> Result<reqwest::Client> {
let mut builder = reqwest::ClientBuilder::new();

match &self.user_agent {
Some(user_agent) => builder = builder.user_agent(user_agent.get()?),
Expand Down
6 changes: 2 additions & 4 deletions object_store/src/gcp/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::client::{HttpConnector, ReqwestConnector, TokenCredentialProvider};
use crate::client::{http_connector, HttpConnector, TokenCredentialProvider};
use crate::gcp::client::{GoogleCloudStorageClient, GoogleCloudStorageConfig};
use crate::gcp::credential::{
ApplicationDefaultCredentials, InstanceCredentialProvider, ServiceAccountCredentials,
Expand Down Expand Up @@ -442,9 +442,7 @@ impl GoogleCloudStorageBuilder {

let bucket_name = self.bucket_name.ok_or(Error::MissingBucketName {})?;

let http = self
.http_connector
.unwrap_or_else(|| Arc::new(ReqwestConnector::default()));
let http = http_connector(self.http_connector)?;

// First try to initialize from the service account information.
let service_account_credentials =
Expand Down
7 changes: 2 additions & 5 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use url::Url;

use crate::client::get::GetClientExt;
use crate::client::header::get_etag;
use crate::client::{HttpConnector, ReqwestConnector};
use crate::client::{http_connector, HttpConnector};
use crate::http::client::Client;
use crate::path::Path;
use crate::{
Expand Down Expand Up @@ -248,10 +248,7 @@ impl HttpBuilder {
let url = self.url.ok_or(Error::MissingUrl)?;
let parsed = Url::parse(&url).map_err(|source| Error::UnableToParseUrl { url, source })?;

let client = match self.http_connector {
None => ReqwestConnector::default().connect(&self.client_options)?,
Some(x) => x.connect(&self.client_options)?,
};
let client = http_connector(self.http_connector)?.connect(&self.client_options)?;

Ok(HttpStore {
client: Arc::new(Client::new(
Expand Down
17 changes: 5 additions & 12 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,6 @@
//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
//!
#[cfg(all(
target_arch = "wasm32",
any(feature = "gcp", feature = "aws", feature = "azure", feature = "http")
))]
compile_error!("Features 'gcp', 'aws', 'azure', 'http' are not supported on wasm.");

#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
Expand Down Expand Up @@ -530,10 +524,13 @@ pub mod client;

#[cfg(feature = "cloud")]
pub use client::{
backoff::BackoffConfig, retry::RetryConfig, Certificate, ClientConfigKey, ClientOptions,
CredentialProvider, StaticCredentialProvider,
backoff::BackoffConfig, retry::RetryConfig, ClientConfigKey, ClientOptions, CredentialProvider,
StaticCredentialProvider,
};

#[cfg(all(feature = "cloud", not(target_arch = "wasm32")))]
pub use client::Certificate;

#[cfg(feature = "cloud")]
mod config;

Expand Down Expand Up @@ -1083,8 +1080,6 @@ impl GetResult {
.await
}
GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
}
}

Expand All @@ -1110,8 +1105,6 @@ impl GetResult {
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
#[cfg(target_arch = "wasm32")]
_ => unimplemented!("File IO not implemented on wasm32."),
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion object_store/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ where
let url = &url[..url::Position::BeforePath];
builder_opts!(crate::http::HttpBuilder, url, _options)
}
#[cfg(not(all(feature = "aws", feature = "azure", feature = "gcp", feature = "http")))]
#[cfg(not(all(
feature = "aws",
feature = "azure",
feature = "gcp",
feature = "http",
not(target_arch = "wasm32")
)))]
s => {
return Err(super::Error::Generic {
store: "parse_url",
Expand Down

0 comments on commit 871e29d

Please sign in to comment.