diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4a76c15601..24d014b9b7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -62,7 +62,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: default - toolchain: "1.64.0" # Switch to "stable" when the 1.67.0 release of https://github.com/rust-lang/rust is published, more information: https://github.com/delta-io/delta-rs/pull/923 + toolchain: "stable" override: true - uses: Swatinem/rust-cache@v2 - name: Run tests @@ -97,6 +97,20 @@ jobs: profile: default toolchain: stable override: true + + - uses: actions/setup-java@v3 + with: + distribution: 'zulu' + java-version: '17' + + - uses: beyondstorage/setup-hdfs@master + with: + hdfs-version: '3.3.2' + + - name: Set Hadoop env + run: | + echo "CLASSPATH=$CLASSPATH:`hadoop classpath --glob`" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=$JAVA_HOME/lib/server" >> $GITHUB_ENV - uses: Swatinem/rust-cache@v2 @@ -105,10 +119,10 @@ jobs: - name: Run tests with rustls (default) run: | - cargo test -p deltalake --features integration_test,azure,s3,gcs,datafusion + cargo test -p deltalake --features integration_test,azure,s3,gcs,datafusion,hdfs - name: Run tests with native-tls run: | - cargo test -p deltalake --no-default-features --features integration_test,s3-native-tls,datafusion + cargo test -p deltalake --no-default-features --features integration_test,s3-native-tls,datafusion,hdfs parquet2_test: runs-on: ubuntu-latest diff --git a/README.adoc b/README.adoc index 7b4daa8ce4..5b8772d3b0 100644 --- a/README.adoc +++ b/README.adoc @@ -32,6 +32,7 @@ link:https://github.com/rajasekarv/vega[vega], etc. It also provides bindings to * AWS S3 * Azure Blob Storage / Azure Datalake Storage Gen2 * Google Cloud Storage +* HDFS .Support features |=== diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 01ca4d3b91..a3c0a35f32 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -18,6 +18,7 @@ async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" +datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = ["hdfs3", "try_spawn_blocking"], optional = true } errno = "0.3" futures = "0.3" itertools = "0.10" @@ -119,6 +120,7 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] +hdfs = ["datafusion-objectstore-hdfs"] glue-native-tls = ["s3-native-tls", "rusoto_glue"] glue = ["s3", "rusoto_glue/rustls"] python = ["arrow/pyarrow"] diff --git a/rust/README.md b/rust/README.md index 0ea3e4976f..cc98b4e777 100644 --- a/rust/README.md +++ b/rust/README.md @@ -49,6 +49,7 @@ cargo run --example read_delta_table - `datafusion` - enable the `datafusion::datasource::TableProvider` trait implementation for Delta Tables, allowing them to be queried using [DataFusion](https://github.com/apache/arrow-datafusion). - `datafusion-ext` - DEPRECATED: alias for `datafusion` feature - `parquet2` - use parquet2 for checkpoint deserialization. Since `arrow` and `parquet` features are enabled by default for backwards compatibility, this feature needs to be used with `--no-default-features`. +- `hdfs` - enable the HDFS storage backend to work with Delta Tables in HDFS. ## Development diff --git a/rust/src/data_catalog/mod.rs b/rust/src/data_catalog/mod.rs index 7567508b8d..5570c7e408 100644 --- a/rust/src/data_catalog/mod.rs +++ b/rust/src/data_catalog/mod.rs @@ -70,6 +70,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result, Data "gcp" => unimplemented!("GCP Data Catalog is not implemented"), #[cfg(feature = "azure")] "azure" => unimplemented!("Azure Data Catalog is not implemented"), + #[cfg(feature = "hdfs")] + "hdfs" => unimplemented!("HDFS Data Catalog is not implemented"), #[cfg(feature = "glue")] "glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)), _ => Err(DataCatalogError::InvalidDataCatalog { diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 682b104105..8f32747022 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -13,6 +13,8 @@ use url::Url; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use super::s3::{S3StorageBackend, S3StorageOptions}; +#[cfg(feature = "hdfs")] +use datafusion_objectstore_hdfs::object_store::hdfs::HadoopFileSystem; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; #[cfg(feature = "azure")] @@ -103,6 +105,7 @@ pub(crate) enum ObjectStoreKind { S3, Google, Azure, + Hdfs, } impl ObjectStoreKind { @@ -113,6 +116,7 @@ impl ObjectStoreKind { "az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure), "s3" | "s3a" => Ok(ObjectStoreKind::S3), "gs" => Ok(ObjectStoreKind::Google), + "hdfs" => Ok(ObjectStoreKind::Hdfs), "https" => { let host = url.host_str().unwrap_or_default(); if host.contains("amazonaws.com") { @@ -192,6 +196,21 @@ impl ObjectStoreKind { feature: "gcs", url: storage_url.as_ref().into(), }), + #[cfg(feature = "hdfs")] + ObjectStoreKind::Hdfs => { + let store = HadoopFileSystem::new(storage_url.as_ref()).ok_or_else(|| { + DeltaTableError::Generic(format!( + "failed to create HadoopFileSystem for {}", + storage_url.as_ref() + )) + })?; + Ok(Self::url_prefix_handler(store, storage_url)) + } + #[cfg(not(feature = "hdfs"))] + ObjectStoreKind::Hdfs => Err(DeltaTableError::MissingFeature { + feature: "hdfs", + url: storage_url.as_ref().into(), + }), } } diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 13a350095c..56f5508078 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -52,6 +52,7 @@ impl IntegrationContext { StorageIntegration::Microsoft => format!("az://{}", &bucket), StorageIntegration::Google => format!("gs://{}", &bucket), StorageIntegration::Local => format!("file://{}", &bucket), + StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &bucket), }; // the "storage_backend" will always point to the root ofg the object store. // TODO should we provide the store via object_Store builders? @@ -85,6 +86,7 @@ impl IntegrationContext { StorageIntegration::Microsoft => format!("az://{}", &self.bucket), StorageIntegration::Google => format!("gs://{}", &self.bucket), StorageIntegration::Local => format!("file://{}", &self.bucket), + StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket), } } @@ -140,6 +142,9 @@ impl Drop for IntegrationContext { gs_cli::delete_bucket(&self.bucket).unwrap(); } StorageIntegration::Local => (), + StorageIntegration::Hdfs => { + hdfs_cli::delete_dir(&self.bucket).unwrap(); + } }; } } @@ -150,6 +155,7 @@ pub enum StorageIntegration { Microsoft, Google, Local, + Hdfs, } impl StorageIntegration { @@ -159,6 +165,7 @@ impl StorageIntegration { Self::Amazon => s3_cli::prepare_env(), Self::Google => gs_cli::prepare_env(), Self::Local => (), + Self::Hdfs => (), } } @@ -182,6 +189,10 @@ impl StorageIntegration { Ok(()) } Self::Local => Ok(()), + Self::Hdfs => { + hdfs_cli::create_dir(name)?; + Ok(()) + } } } } @@ -447,3 +458,45 @@ pub mod gs_cli { set_env_if_not_set("GOOGLE_ENDPOINT_URL", "http://localhost:4443/storage/v1/b"); } } + +/// small wrapper around hdfs cli +pub mod hdfs_cli { + use std::env; + use std::path::PathBuf; + use std::process::{Command, ExitStatus}; + + fn hdfs_cli_path() -> PathBuf { + let hadoop_home = + env::var("HADOOP_HOME").expect("HADOOP_HOME environment variable not set"); + PathBuf::from(hadoop_home).join("bin").join("hdfs") + } + + pub fn create_dir(dir_name: impl AsRef) -> std::io::Result { + let path = hdfs_cli_path(); + let mut child = Command::new(&path) + .args([ + "dfs", + "-mkdir", + "-p", + format!("/{}", dir_name.as_ref()).as_str(), + ]) + .spawn() + .expect("hdfs command is installed"); + child.wait() + } + + pub fn delete_dir(dir_name: impl AsRef) -> std::io::Result { + let path = hdfs_cli_path(); + let mut child = Command::new(&path) + .args([ + "dfs", + "-rm", + "-r", + "-f", + format!("/{}", dir_name.as_ref()).as_str(), + ]) + .spawn() + .expect("hdfs command is installed"); + child.wait() + } +} diff --git a/rust/tests/command_filesystem_check.rs b/rust/tests/command_filesystem_check.rs index 06584cc077..b39ac39cac 100644 --- a/rust/tests/command_filesystem_check.rs +++ b/rust/tests/command_filesystem_check.rs @@ -38,6 +38,13 @@ async fn test_filesystem_check_gcp() -> TestResult { Ok(test_filesystem_check(StorageIntegration::Google).await?) } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn test_filesystem_check_hdfs() -> TestResult { + Ok(test_filesystem_check(StorageIntegration::Hdfs).await?) +} + async fn test_filesystem_check(storage: StorageIntegration) -> TestResult { let context = IntegrationContext::new(storage)?; context.load_table(TestTables::Simple).await?; diff --git a/rust/tests/common/hdfs.rs b/rust/tests/common/hdfs.rs new file mode 100644 index 0000000000..8da5ef83b6 --- /dev/null +++ b/rust/tests/common/hdfs.rs @@ -0,0 +1,20 @@ +use super::TestContext; +use std::collections::HashMap; + +pub struct Hdfs { + name_node: String, +} + +pub fn setup_hdfs_context() -> TestContext { + let mut config = HashMap::new(); + + let name_node = "hdfs://localhost:9000".to_owned(); + + config.insert("URI".to_owned(), name_node.clone()); + + TestContext { + storage_context: Some(Box::new(Hdfs { name_node })), + config, + ..TestContext::default() + } +} diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 439bf911b6..a2c5dacc05 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -18,6 +18,8 @@ pub mod adls; pub mod clock; #[cfg(feature = "datafusion")] pub mod datafusion; +#[cfg(feature = "hdfs")] +pub mod hdfs; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] pub mod s3; pub mod schemas; @@ -47,6 +49,8 @@ impl TestContext { Ok("AZURE_GEN2") => adls::setup_azure_gen2_context().await, #[cfg(any(feature = "s3", feature = "s3-native-tls"))] Ok("S3_LOCAL_STACK") => s3::setup_s3_context().await, + #[cfg(feature = "hdfs")] + Ok("HDFS") => hdfs::setup_hdfs_context(), _ => panic!("Invalid backend for delta-rs tests"), } } diff --git a/rust/tests/integration_checkpoint.rs b/rust/tests/integration_checkpoint.rs index aeeb4c38d1..f20cdd629c 100644 --- a/rust/tests/integration_checkpoint.rs +++ b/rust/tests/integration_checkpoint.rs @@ -40,6 +40,15 @@ async fn cleanup_metadata_gcp_test() -> TestResult { Ok(()) } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn cleanup_metadata_hdfs_test() -> TestResult { + let context = IntegrationContext::new(StorageIntegration::Hdfs)?; + cleanup_metadata_test(&context).await?; + Ok(()) +} + // Last-Modified for S3 could not be altered by user, hence using system pauses which makes // test to run longer but reliable async fn cleanup_metadata_test(context: &IntegrationContext) -> TestResult { diff --git a/rust/tests/integration_commit.rs b/rust/tests/integration_commit.rs index 5112f65c77..368e72f1b9 100644 --- a/rust/tests/integration_commit.rs +++ b/rust/tests/integration_commit.rs @@ -36,6 +36,13 @@ async fn test_commit_tables_gcp() { commit_tables(StorageIntegration::Google).await.unwrap(); } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn test_commit_tables_hdfs() { + commit_tables(StorageIntegration::Hdfs).await.unwrap(); +} + #[cfg(any(feature = "s3", feature = "s3-native-tls"))] #[tokio::test] #[serial] diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index 32f0493615..ca91654ad8 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -28,6 +28,15 @@ async fn test_concurrent_writes_azure() -> TestResult { Ok(()) } +// tracked via https://github.com/datafusion-contrib/datafusion-objectstore-hdfs/issues/13 +#[ignore] +#[cfg(feature = "hdfs")] +#[tokio::test] +async fn test_concurrent_writes_hdfs() -> TestResult { + test_concurrent_writes(StorageIntegration::Hdfs).await?; + Ok(()) +} + async fn test_concurrent_writes(integration: StorageIntegration) -> TestResult { let context = IntegrationContext::new(integration)?; let (_table, table_uri) = prepare_table(&context).await?; diff --git a/rust/tests/integration_datafusion.rs b/rust/tests/integration_datafusion.rs index 7a3fdde464..49665b59ad 100644 --- a/rust/tests/integration_datafusion.rs +++ b/rust/tests/integration_datafusion.rs @@ -35,6 +35,13 @@ async fn test_datafusion_gcp() -> TestResult { Ok(test_datafusion(StorageIntegration::Google).await?) } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn test_datafusion_hdfs() -> TestResult { + Ok(test_datafusion(StorageIntegration::Hdfs).await?) +} + async fn test_datafusion(storage: StorageIntegration) -> TestResult { let context = IntegrationContext::new(storage)?; context.load_table(TestTables::Simple).await?; diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 9c32ec0e59..2dd96a5cda 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -39,6 +39,14 @@ async fn test_object_store_google() -> TestResult { Ok(()) } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn test_object_store_hdfs() -> TestResult { + test_object_store(StorageIntegration::Hdfs, false).await?; + Ok(()) +} + async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult { let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index 4b7531441d..b2a499ad6f 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -22,6 +22,13 @@ async fn test_read_tables_azure() -> TestResult { Ok(read_tables(StorageIntegration::Microsoft).await?) } +#[cfg(feature = "hdfs")] +#[tokio::test] +#[serial] +async fn test_read_tables_hdfs() -> TestResult { + Ok(read_tables(StorageIntegration::Hdfs).await?) +} + #[cfg(any(feature = "s3", feature = "s3-native-tls"))] #[tokio::test] #[serial]