From caffbde4acd9aa5f19cc149d3908c32ca4d3b77a Mon Sep 17 00:00:00 2001 From: Chitral Verma Date: Fri, 30 Dec 2022 17:15:54 +0530 Subject: [PATCH 01/88] Support for LargeUtf8 and LargeBinary --- rust/src/delta_arrow.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 995baa97ff..6d4affd7ca 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -211,6 +211,7 @@ impl TryFrom<&ArrowDataType> for schema::SchemaDataType { fn try_from(arrow_datatype: &ArrowDataType) -> Result { match arrow_datatype { ArrowDataType::Utf8 => Ok(schema::SchemaDataType::primitive("string".to_string())), + ArrowDataType::LargeUtf8 => Ok(schema::SchemaDataType::primitive("string".to_string())), ArrowDataType::Int64 => Ok(schema::SchemaDataType::primitive("long".to_string())), // undocumented type ArrowDataType::Int32 => Ok(schema::SchemaDataType::primitive("integer".to_string())), ArrowDataType::Int16 => Ok(schema::SchemaDataType::primitive("short".to_string())), @@ -219,6 +220,7 @@ impl TryFrom<&ArrowDataType> for schema::SchemaDataType { ArrowDataType::Float64 => Ok(schema::SchemaDataType::primitive("double".to_string())), ArrowDataType::Boolean => Ok(schema::SchemaDataType::primitive("boolean".to_string())), ArrowDataType::Binary => Ok(schema::SchemaDataType::primitive("binary".to_string())), + ArrowDataType::LargeBinary => Ok(schema::SchemaDataType::primitive("binary".to_string())), ArrowDataType::Decimal128(p, s) => Ok(schema::SchemaDataType::primitive(format!( "decimal({},{})", p, s From 6109d4b6259f286f4810b9046b32a9455a15faf6 Mon Sep 17 00:00:00 2001 From: Chitral Verma Date: Fri, 30 Dec 2022 19:13:11 +0530 Subject: [PATCH 02/88] fix formatting Signed-off-by: Chitral Verma --- rust/src/delta_arrow.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 6d4affd7ca..ae29f215f1 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -220,7 +220,9 @@ impl TryFrom<&ArrowDataType> for schema::SchemaDataType { ArrowDataType::Float64 => Ok(schema::SchemaDataType::primitive("double".to_string())), ArrowDataType::Boolean => Ok(schema::SchemaDataType::primitive("boolean".to_string())), ArrowDataType::Binary => Ok(schema::SchemaDataType::primitive("binary".to_string())), - ArrowDataType::LargeBinary => Ok(schema::SchemaDataType::primitive("binary".to_string())), + ArrowDataType::LargeBinary => { + Ok(schema::SchemaDataType::primitive("binary".to_string())) + } ArrowDataType::Decimal128(p, s) => Ok(schema::SchemaDataType::primitive(format!( "decimal({},{})", p, s From 94423ba64a81c753fd39c4a7b12805568d1a6f6a Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Fri, 30 Dec 2022 20:15:09 +0100 Subject: [PATCH 03/88] feat: enable passing storage options to Delta table builder via DataFusion's CREATE EXTERNAL TABLE (#1043) # Description We've recently added Delta table support to [Seafowl](https://github.com/splitgraph/seafowl) using delta-rs, which utilizes the new `OPTIONS` clause in sqlparser/DataFusion. It allows propagating a set of key/values down to the `DeltaTableBuilder`, which in turn can use those to instantiate a corresponding object store client. This means someone can now define a delta table without relying on env vars as: ```sql CREATE EXTERNAL TABLE my_delta STORED AS DELTATABLE OPTIONS ('AWS_ACCESS_KEY_ID' 'secret', 'AWS_SECRET_ACCESS_KEY' 'also_secret', 'AWS_REGION' 'eu-west-3') LOCATION 's3://my-bucket/my-delta-table/' ``` I've also changed the existing datafusion integration tests to use this approach to exercise it. I'm not sure whether it makes sense to merge this PR upstream, but opening this PR just in case it does. # Related Issue(s) Didn't find any related issues. # Documentation --- rust/src/delta.rs | 13 +++++++++++ rust/src/delta_datafusion.rs | 8 +++++-- rust/src/test_utils.rs | 2 +- rust/tests/common/datafusion.rs | 16 +++++++++++++ rust/tests/common/mod.rs | 5 +++- rust/tests/datafusion_test.rs | 22 +++++++---------- rust/tests/integration_datafusion.rs | 35 ++++++++++++++++++---------- 7 files changed, 71 insertions(+), 30 deletions(-) create mode 100644 rust/tests/common/datafusion.rs diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 82838e9dec..5a12496b66 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1482,6 +1482,19 @@ pub async fn open_table(table_uri: impl AsRef) -> Result, + storage_options: HashMap, +) -> Result { + let table = DeltaTableBuilder::from_uri(table_uri) + .with_storage_options(storage_options) + .load() + .await?; + Ok(table) +} + /// Creates a DeltaTable from the given path and loads it with the metadata from the given version. /// Infers the storage backend to use from the scheme in the given table path. pub async fn open_table_with_version( diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index e756a9b2fc..ab25bd66b9 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -57,7 +57,7 @@ use object_store::{path::Path, ObjectMeta}; use url::Url; use crate::Invariant; -use crate::{action, open_table}; +use crate::{action, open_table, open_table_with_storage_options}; use crate::{schema, DeltaTableBuilder}; use crate::{DeltaTable, DeltaTableError}; @@ -866,7 +866,11 @@ impl TableProviderFactory for DeltaTableFactory { _ctx: &SessionState, cmd: &CreateExternalTable, ) -> datafusion::error::Result> { - let provider = open_table(cmd.to_owned().location).await.unwrap(); + let provider = if cmd.options.is_empty() { + open_table(cmd.to_owned().location).await? + } else { + open_table_with_storage_options(cmd.to_owned().location, cmd.to_owned().options).await? + }; Ok(Arc::new(provider)) } } diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index c2624baf0c..639ec7dc22 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -12,7 +12,7 @@ pub type TestResult = Result<(), Box>; /// The IntegrationContext provides temporary resources to test against cloud storage services. pub struct IntegrationContext { - integration: StorageIntegration, + pub integration: StorageIntegration, bucket: String, store: Arc, tmp_dir: TempDir, diff --git a/rust/tests/common/datafusion.rs b/rust/tests/common/datafusion.rs new file mode 100644 index 0000000000..d741938962 --- /dev/null +++ b/rust/tests/common/datafusion.rs @@ -0,0 +1,16 @@ +use datafusion::datasource::datasource::TableProviderFactory; +use datafusion::execution::context::SessionContext; +use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::prelude::SessionConfig; +use deltalake::delta_datafusion::DeltaTableFactory; +use std::collections::HashMap; +use std::sync::Arc; + +pub fn context_with_delta_table_factory() -> SessionContext { + let mut table_factories: HashMap> = HashMap::new(); + table_factories.insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); + let cfg = RuntimeConfig::new().with_table_factories(table_factories); + let env = RuntimeEnv::new(cfg).unwrap(); + let ses = SessionConfig::new(); + SessionContext::with_config_rt(ses, Arc::new(env)) +} diff --git a/rust/tests/common/mod.rs b/rust/tests/common/mod.rs index 735a01904f..8710c97858 100644 --- a/rust/tests/common/mod.rs +++ b/rust/tests/common/mod.rs @@ -1,4 +1,5 @@ -#![deny(warnings)] +#![allow(dead_code)] +#![allow(unused_variables)] use bytes::Bytes; use deltalake::action::{self, Add, Remove}; @@ -15,6 +16,8 @@ use tempdir::TempDir; #[cfg(feature = "azure")] pub mod adls; pub mod clock; +#[cfg(feature = "datafusion-ext")] +pub mod datafusion; #[cfg(any(feature = "s3", feature = "s3-rustls"))] pub mod s3; pub mod schemas; diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 27e3642054..293acbc28b 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -1,29 +1,28 @@ #![cfg(feature = "datafusion-ext")] -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use arrow::array::*; use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; use arrow::record_batch::RecordBatch; +use common::datafusion::context_with_delta_table_factory; use datafusion::assert_batches_sorted_eq; -use datafusion::datasource::datasource::TableProviderFactory; use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::{common, file_format::ParquetExec, metrics::Label}; +use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label}; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; -use datafusion::prelude::SessionConfig; use datafusion_common::scalar::ScalarValue; use datafusion_common::{Column, DataFusionError, Result}; use datafusion_expr::Expr; use deltalake::action::SaveMode; -use deltalake::delta_datafusion::DeltaTableFactory; use deltalake::{operations::DeltaOps, DeltaTable, Schema}; +mod common; + fn get_scanned_files(node: &dyn ExecutionPlan) -> HashSet