Skip to content

Commit

Permalink
chore: various fixes and cleanup after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Sep 23, 2023
1 parent 22b500d commit 546f1f4
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 31 deletions.
33 changes: 13 additions & 20 deletions rust/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
//! .await?
//! ````
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use arrow_schema::SchemaRef;
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -55,27 +59,16 @@ use datafusion_physical_expr::{create_physical_expr, expressions, PhysicalExpr};
use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde_json::{Map, Value};
use std::{
collections::HashMap,
sync::Arc,
time::{Instant, SystemTime, UNIX_EPOCH},
};

use crate::action::MergePredicate;
use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
use super::transaction::commit;
use crate::delta_datafusion::{parquet_scan_from_actions, register_store};
use crate::operations::datafusion_utils::MetricObserverExec;
use crate::{
action::{Action, DeltaOperation, Remove},
delta_datafusion::{parquet_scan_from_actions, register_store},
operations::write::write_execution_plan,
storage::{DeltaObjectStore, ObjectStoreRef},
table_state::DeltaTableState,
DeltaResult, DeltaTable, DeltaTableError,
};

use super::{
datafusion_utils::{into_expr, maybe_into_expr, Expression},
transaction::commit,
};
use crate::operations::write::write_execution_plan;
use crate::protocol::{Action, DeltaOperation, MergePredicate, Remove};
use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table::state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

const OPERATION_COLUMN: &str = "__delta_rs_operation";
const DELETE_COLUMN: &str = "__delta_rs_delete";
Expand Down Expand Up @@ -1105,8 +1098,8 @@ impl std::future::IntoFuture for MergeBuilder {
#[cfg(test)]
mod tests {

use crate::action::*;
use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::get_arrow_schema;
use crate::writer::test_utils::get_delta_schema;
Expand Down
2 changes: 1 addition & 1 deletion rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod datafusion_utils {
use datafusion_expr::Expr;
use futures::{Stream, StreamExt};

use crate::{table_state::DeltaTableState, DeltaResult};
use crate::{table::state::DeltaTableState, DeltaResult};

/// Used to represent user input of either a Datafusion expression or string expression
pub enum Expression {
Expand Down
8 changes: 4 additions & 4 deletions rust/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use num_traits::cast::ToPrimitive;
use parquet::record::{Field, ListAccessor, MapAccessor, RowAccessor};
use serde_json::json;

use crate::action::{
use crate::protocol::{
Action, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, DeletionVector, MetaData, Protocol,
ProtocolError, Remove, Stats, Txn,
};
Expand Down Expand Up @@ -217,7 +217,7 @@ impl Add {
"minValues" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(values) = field_to_value_stat(&field, name) {
if let Some(values) = field_to_value_stat(field, name) {
stats.min_values.insert(name.clone(), values);
}
}
Expand All @@ -228,7 +228,7 @@ impl Add {
"maxValues" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(values) = field_to_value_stat(&field, name) {
if let Some(values) = field_to_value_stat(field, name) {
stats.max_values.insert(name.clone(), values);
}
}
Expand All @@ -239,7 +239,7 @@ impl Add {
"nullCount" => if let Ok(row) = record.get_group(i) {
for (name, field) in row.get_column_iter() {
if !matches!(field, Field::Null) {
if let Some(count) = field_to_count_stat(&field, name) {
if let Some(count) = field_to_count_stat(field, name) {
stats.null_count.insert(name.clone(), count);
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl TryFrom<&Add> for ObjectMeta {
fn try_from(value: &Add) -> DeltaResult<Self> {
let last_modified = Utc.from_utc_datetime(
&NaiveDateTime::from_timestamp_millis(value.modification_time).ok_or(
DeltaTableError::from(crate::action::ProtocolError::InvalidField(format!(
DeltaTableError::from(crate::protocol::ProtocolError::InvalidField(format!(
"invalid modification_time: {:?}",
value.modification_time
))),
Expand Down
2 changes: 1 addition & 1 deletion rust/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::storage::{commit_uri_from_version, ObjectStoreRef};
pub mod builder;
pub mod config;
pub mod state;
#[cfg(all(feature = "arrow"))]
#[cfg(feature = "arrow")]
pub mod state_arrow;

/// Metadata for a checkpoint file
Expand Down
2 changes: 1 addition & 1 deletion rust/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl DeltaTableState {
protocol::Action::commitInfo(v) => {
self.commit_infos.push(v);
}
action::Action::domainMetadata(v) => {
protocol::Action::domainMetadata(v) => {
self.domain_metadatas.push(v);
}
}
Expand Down
2 changes: 0 additions & 2 deletions rust/src/writer/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use std::{collections::HashMap, sync::Arc};

use arrow::array::{Array, UInt32Array};
use arrow::compute::{partition, take};
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_row::{RowConverter, SortField};
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/read_delta_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async fn test_log_buffering() {
let max_iter = 10;
let buf_size = 10;

let location = deltalake::builder::ensure_table_uri(path).unwrap();
let location = deltalake::table::builder::ensure_table_uri(path).unwrap();

// use storage that sleeps 10ms on every `get`
let store = std::sync::Arc::new(
Expand Down

0 comments on commit 546f1f4

Please sign in to comment.