From 2e6664d170757320263415a367d94d59512bd9c4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 5 Jun 2023 21:10:45 -0700 Subject: [PATCH 1/5] expose API in Python --- python/deltalake/table.py | 102 ++++++++++++++++++++++++++-------- python/src/lib.rs | 30 +++++++++- python/tests/test_optimize.py | 21 ++++++- 3 files changed, 126 insertions(+), 27 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 846f11b58d..5e6d61b900 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -8,6 +8,7 @@ TYPE_CHECKING, Any, Dict, + Iterable, List, NamedTuple, Optional, @@ -437,35 +438,14 @@ def vacuum( max_concurrent_requests, ) + @property def optimize( self, partition_filters: Optional[List[Tuple[str, str, Any]]] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, - ) -> Dict[str, Any]: - """ - Compacts small files to reduce the total number of files in the table. - - This operation is idempotent; if run twice on the same table (assuming it has - not been updated) it will do nothing the second time. - - If this operation happens concurrently with any operations other than append, - it will fail. - - :param partition_filters: the partition filters that will be used for getting the matched files - :param target_size: desired file size after bin-packing files, in bytes. If not - provided, will attempt to read the table configuration value ``delta.targetFileSize``. - If that value isn't set, will use default value of 256MB. - :param max_concurrent_tasks: the maximum number of concurrent tasks to use for - file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction - faster, but will also use more memory. - :return: the metrics from optimize - """ - metrics = self._table.optimize( - partition_filters, target_size, max_concurrent_tasks - ) - self.update_incremental() - return json.loads(metrics) + ) -> "TableOptimizer": + return TableOptimizer(self) def pyarrow_schema(self) -> pyarrow.Schema: """ @@ -638,3 +618,77 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: 2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4 """ return self._table.get_add_actions(flatten) + + +class TableOptimizer: + """API for various table optimization commands.""" + + def __init__(self, table: DeltaTable): + self.table = table + + def __call__( + self, + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + .. deprecated:: 0.10.0 + Use :meth:`compact` instead, which has the same signature. + """ + + warnings.warn( + "Call to deprecated method files_by_partitions. Please use file_uris instead.", + category=DeprecationWarning, + stacklevel=2, + ) + + return self.compact(partition_filters, target_size, max_concurrent_tasks) + + def compact( + self, + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Compacts small files to reduce the total number of files in the table. + + This operation is idempotent; if run twice on the same table (assuming it has + not been updated) it will do nothing the second time. + + If this operation happens concurrently with any operations other than append, + it will fail. + + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. + :param max_concurrent_tasks: the maximum number of concurrent tasks to use for + file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction + faster, but will also use more memory. + :return: the metrics from optimize + """ + metrics = self.table._table.compact_optimize( + partition_filters, target_size, max_concurrent_tasks + ) + self.table.update_incremental() + return json.loads(metrics) + + def z_order( + self, + columns: Iterable[str], + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Reorders the data using a Z-order curve to improve data skipping. + + This also performs compaction, so the same parameters as compact() apply. + """ + metrics = self.table._table.z_order_optimize( + list(columns), partition_filters, target_size, max_concurrent_tasks + ) + self.table.update_incremental() + return json.loads(metrics) diff --git a/python/src/lib.rs b/python/src/lib.rs index 8e2c4d282f..4b63567f26 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,7 @@ use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; -use deltalake::operations::optimize::OptimizeBuilder; +use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; @@ -268,7 +268,7 @@ impl RawDeltaTable { /// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))] - pub fn optimize( + pub fn compact_optimize( &mut self, partition_filters: Option>, target_size: Option, @@ -290,6 +290,32 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + /// Run z-order variation of optimize + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None))] + pub fn z_order_optimize( + &mut self, + z_order_columns: Vec, + partition_filters: Option>, + target_size: Option, + max_concurrent_tasks: Option, + ) -> PyResult { + let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_type(OptimizeType::ZOrder(z_order_columns)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); + + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 31339d3110..665aaaec8f 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -23,7 +23,26 @@ def test_optimize_run_table( dt = DeltaTable(table_path) old_version = dt.version() - dt.optimize() + with pytest.warns(DeprecationWarning): + dt.optimize() + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + assert dt.version() == old_version + 1 + + +def test_z_order_optimize( + tmp_path: pathlib.Path, + sample_data: pa.Table, +): + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + dt.optimize.z_order(["date32", "timestamp"]) + last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" assert dt.version() == old_version + 1 From 35fa9fffd46c3640443c5005bb5f7f91109bd935 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 5 Jun 2023 21:23:35 -0700 Subject: [PATCH 2/5] update docs --- python/deltalake/table.py | 13 ++++++++++--- python/docs/source/usage.rst | 23 ++++++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 5e6d61b900..03d01b0a19 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -441,9 +441,6 @@ def vacuum( @property def optimize( self, - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, - target_size: Optional[int] = None, - max_concurrent_tasks: Optional[int] = None, ) -> "TableOptimizer": return TableOptimizer(self) @@ -686,6 +683,16 @@ def z_order( Reorders the data using a Z-order curve to improve data skipping. This also performs compaction, so the same parameters as compact() apply. + + :param columns: the columns to use for Z-ordering. There must be at least one column. + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. + :param max_concurrent_tasks: the maximum number of concurrent tasks to use for + file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction + faster, but will also use more memory. + :return: the metrics from optimize """ metrics = self.table._table.z_order_optimize( list(columns), partition_filters, target_size, max_concurrent_tasks diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 5390fb1c0f..4c5b5f008d 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -417,19 +417,36 @@ into a large file. Bin-packing reduces the number of API calls required for read Optimizing will increments the table's version and creates remove actions for optimized files. Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. -Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a -concurrent writer performs an operation that removes any files (such as an overwrite). +:attr:`DeltaTable.optimize` returns a :class:`TableOptimizer` object which provides +methods for optimizing the table. Note that these method will fail if a concurrent +writer performs an operation that removes any files (such as an overwrite). + +For just file compaction, use the :meth:`TableOptimizer.compact` method: .. code-block:: python >>> dt = DeltaTable("../rust/tests/data/simple_table") - >>> dt.optimize() + >>> dt.optimize.compact() {'numFilesAdded': 1, 'numFilesRemoved': 5, 'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555}, 'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811}, 'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} +For improved data skipping, use the :meth:`TableOptimizer.z_order` method. This +is slower than just file compaction, but can improve performance for queries that +filter on multiple columns at once. + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/COVID-19_NYT") + >>> dt.optimize.z_order(["date", "county"]) + {'numFilesAdded': 1, 'numFilesRemoved': 8, + 'filesAdded': {'min': 2473439, 'max': 2473439, 'avg': 2473439.0, 'totalFiles': 1, 'totalSize': 2473439}, + 'filesRemoved': {'min': 325440, 'max': 895702, 'avg': 773810.625, 'totalFiles': 8, 'totalSize': 6190485}, + 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 8, + 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + Writing Delta Tables -------------------- From 98fa301ed72d83bf21de9c44755324441883ce65 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 5 Jun 2023 21:26:05 -0700 Subject: [PATCH 3/5] fix deprecation warning text --- python/deltalake/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 03d01b0a19..913de2e2e6 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -635,7 +635,7 @@ def __call__( """ warnings.warn( - "Call to deprecated method files_by_partitions. Please use file_uris instead.", + "Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.", category=DeprecationWarning, stacklevel=2, ) From 373997c25e8fe2f6ba275c7d1b2b311c012e7b86 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 6 Jun 2023 17:34:06 -0700 Subject: [PATCH 4/5] pr feedback --- python/deltalake/table.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 913de2e2e6..dc7e7a1214 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -625,7 +625,7 @@ def __init__(self, table: DeltaTable): def __call__( self, - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, ) -> Dict[str, Any]: @@ -644,7 +644,7 @@ def __call__( def compact( self, - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, ) -> Dict[str, Any]: @@ -675,7 +675,7 @@ def compact( def z_order( self, columns: Iterable[str], - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, ) -> Dict[str, Any]: From 5bd1252c3450be294a360ad9de21e8ebe910b6b0 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 6 Jun 2023 18:53:16 -0700 Subject: [PATCH 5/5] fix reference --- python/docs/source/usage.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 4c5b5f008d..fdecfdc1a1 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -417,7 +417,7 @@ into a large file. Bin-packing reduces the number of API calls required for read Optimizing will increments the table's version and creates remove actions for optimized files. Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. -:attr:`DeltaTable.optimize` returns a :class:`TableOptimizer` object which provides +``DeltaTable.optimize`` returns a :class:`TableOptimizer` object which provides methods for optimizing the table. Note that these method will fail if a concurrent writer performs an operation that removes any files (such as an overwrite).