fix: include .venv in .gitignore

This commit is contained in:
gautham acharya 2024-03-08 15:32:52 -08:00 committed by R. Tyler Croy
parent 2786e924a4
commit 1e19cf3f20
10 changed files with 91 additions and 399 deletions

3
.gitignore vendored
View File

@ -11,6 +11,7 @@ tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
/.idea
.vscode
.env
.venv
**/.DS_Store
**/.python-version
.coverage
@ -29,4 +30,4 @@ Cargo.lock
justfile
site
__pycache__
__pycache__

View File

@ -74,15 +74,6 @@ pub struct Metrics {
pub preserve_insertion_order: bool,
}
/// Information to be committed by the optimizer.
#[derive(Debug)]
pub struct CommitContext {
actions: Vec<Action>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
snapshot: Option<DeltaTableState>,
operation: OptimizeInput,
}
/// Statistics on files for a particular operation
/// Operation can be remove or add
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
@ -184,8 +175,6 @@ pub struct OptimizeBuilder<'a> {
/// Optimize type
optimize_type: OptimizeType,
min_commit_interval: Option<Duration>,
/// Indicates whether the writes should be committed
commit_writes: bool,
}
impl<'a> OptimizeBuilder<'a> {
@ -203,7 +192,6 @@ impl<'a> OptimizeBuilder<'a> {
max_spill_size: 20 * 1024 * 1024 * 2014, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
commit_writes: true, // commit by default
}
}
@ -263,36 +251,10 @@ impl<'a> OptimizeBuilder<'a> {
self.min_commit_interval = Some(min_commit_interval);
self
}
/// Commit Writes
pub fn with_commit_writes(mut self, commit_writes: bool) -> Self {
self.commit_writes = commit_writes;
self
}
}
impl<'a> OptimizeBuilder<'a> {
/// Commit writes after processing
pub async fn commit_writes(self, commit_info: CommitContext) -> DeltaResult<DeltaTable> {
commit(
self.log_store.as_ref(),
&commit_info.actions,
commit_info.operation.clone().into(),
commit_info.snapshot.as_ref(),
commit_info.app_metadata.clone(),
)
.await?;
let mut table = DeltaTable::new_with_state(self.log_store, self.snapshot);
table.update().await?;
Ok(table)
}
}
impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
type Output = DeltaResult<(DeltaTable, Metrics, Option<CommitContext>)>;
type Output = DeltaResult<(DeltaTable, Metrics)>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(self) -> Self::IntoFuture {
@ -314,7 +276,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.target_size.to_owned(),
writer_properties,
)?;
let (metrics, commit_info) = plan
let metrics = plan
.execute(
this.log_store.clone(),
&this.snapshot,
@ -322,14 +284,11 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
this.commit_writes,
)
.await?;
let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
table.update().await?;
Ok((table, metrics, commit_info))
Ok((table, metrics))
})
}
}
@ -645,7 +604,6 @@ impl MergePlan {
}
/// Perform the operations outlined in the plan.
#[allow(clippy::too_many_arguments)]
pub async fn execute(
mut self,
log_store: LogStoreRef,
@ -655,9 +613,9 @@ impl MergePlan {
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
commit_writes: bool,
) -> Result<(Metrics, Option<CommitContext>), DeltaTableError> {
) -> Result<Metrics, DeltaTableError> {
let operations = std::mem::take(&mut self.operations);
let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
.flat_map(|(_, (partition, bins))| {
@ -740,9 +698,6 @@ impl MergePlan {
// or when we reach the commit interval.
let mut actions = vec![];
// Store total actions if we are not committing
let mut total_actions: Vec<Action> = vec![];
// Each time we commit, we'll reset buffered_metrics to orig_metrics.
let orig_metrics = std::mem::take(&mut self.metrics);
let mut buffered_metrics = orig_metrics.clone();
@ -766,11 +721,7 @@ impl MergePlan {
None => false,
Some(i) => now.duration_since(last_commit) > i,
};
// if commit_writes is false, we are not committing anyway, and will
// instead return a commit_info object, so no need to consider the
// min_commit_interval
if !actions.is_empty() && (!commit_writes || mature || end) {
if !actions.is_empty() && (mature || end) {
let actions = std::mem::take(&mut actions);
last_commit = now;
@ -789,23 +740,17 @@ impl MergePlan {
}
table.update().await?;
if commit_writes {
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;
} else {
// Save the actions buffer so the client can commit later
total_actions.extend(actions)
}
debug!("committing {} actions", actions.len());
//// TODO: Check for remove actions on optimized partitions. If a
//// optimized partition was updated then abort the commit. Requires (#593).
commit(
table.log_store.as_ref(),
&actions,
self.task_parameters.input_parameters.clone().into(),
Some(table.snapshot()?),
Some(app_metadata.clone()),
)
.await?;
}
if end {
@ -813,17 +758,6 @@ impl MergePlan {
}
}
let commit_context = if !commit_writes {
Some(CommitContext {
actions: total_actions,
app_metadata,
snapshot: Some(snapshot.clone()), // using original table snapshot as all commits are added at once
operation: self.task_parameters.input_parameters.clone(),
})
} else {
None
};
total_metrics.preserve_insertion_order = true;
if total_metrics.num_files_added == 0 {
total_metrics.files_added.min = 0;
@ -832,7 +766,7 @@ impl MergePlan {
total_metrics.files_removed.min = 0;
}
Ok((total_metrics, commit_context))
Ok(total_metrics)
}
}

View File

@ -171,7 +171,7 @@ async fn test_optimize_non_partitioned_table() -> Result<(), Box<dyn Error>> {
assert_eq!(dt.get_files_count(), 5);
let optimize = DeltaOps(dt).optimize().with_target_size(2_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(version + 1, dt.version());
assert_eq!(metrics.num_files_added, 1);
@ -200,153 +200,6 @@ async fn write(
Ok(())
}
#[tokio::test]
async fn test_optimize_non_partitioned_table_deferred_commit() -> Result<(), Box<dyn Error>> {
let context = setup_test(false).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?,
)
.await?;
write(
&mut writer,
&mut dt,
generate_random_batch(records_for_size(4_000_000), "2022-05-22")?,
)
.await?;
let version = dt.version();
assert_eq!(dt.get_files_count(), 5);
let optimize = DeltaOps(dt)
.optimize()
.with_target_size(2_000_000)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;
// Still have same version, and file count,
// no commit yet!
assert_eq!(version, dt.version());
assert_eq!(dt.get_files_count(), 5);
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 4);
assert_eq!(metrics.total_considered_files, 5);
assert_eq!(metrics.partitions_optimized, 1);
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 6);
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;
assert_eq!(version + 1, dt_commit.version());
// Should have compacted into two files,
// one for 2022-05-22, one for 2022-05-23
assert_eq!(dt_commit.get_files_count(), 2);
let commit_info = dt_commit.history(None).await?;
assert_eq!(commit_info.len(), 7);
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["targetSize"], json!("2000000"));
assert_eq!(parameters["predicate"], "[]");
Ok(())
}
#[tokio::test]
async fn test_optimize_with_partitions_deferred_commit() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
let mut dt = context.table;
let mut writer = RecordBatchWriter::for_table(&dt)?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(1, 2), (1, 3), (1, 4)], "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(2, 1), (2, 3), (2, 3)], "2022-05-23")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(3, 1), (3, 3), (3, 3)], "2022-05-22")?,
)
.await?;
write(
&mut writer,
&mut dt,
tuples_to_batch(vec![(4, 1), (4, 3), (4, 3)], "2022-05-23")?,
)
.await?;
let version = dt.version();
let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];
let optimize = DeltaOps(dt)
.optimize()
.with_filters(&filter)
.with_commit_writes(false);
let (dt, metrics, commit_context) = optimize.await?;
// optimize is not committed, so we have 4 files.
assert_eq!(version, dt.version());
assert_eq!(dt.get_files_count(), 4);
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
let commit_info = dt.history(None).await?;
assert_eq!(commit_info.len(), 5);
let dt_commit = DeltaOps(dt)
.optimize()
.commit_writes(commit_context.unwrap())
.await?;
assert_eq!(version + 1, dt_commit.version());
// Expect three files, one for 2022-05-22 (compacted)
// two for 2022-05-23 (uncompacted)
assert_eq!(dt_commit.get_files_count(), 3);
let commit_info = dt_commit.history(None).await?;
assert_eq!(commit_info.len(), 6);
let last_commit = &commit_info[0];
let parameters = last_commit.operation_parameters.clone().unwrap();
assert_eq!(parameters["predicate"], "[\"date = '2022-05-22'\"]");
Ok(())
}
#[tokio::test]
async fn test_optimize_with_partitions() -> Result<(), Box<dyn Error>> {
let context = setup_test(true).await?;
@ -382,7 +235,7 @@ async fn test_optimize_with_partitions() -> Result<(), Box<dyn Error>> {
let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?];
let optimize = DeltaOps(dt).optimize().with_filters(&filter);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(version + 1, dt.version());
assert_eq!(metrics.num_files_added, 1);
@ -454,7 +307,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
.await?;
let maybe_metrics = plan
.execute(dt.log_store(), dt.snapshot()?, 1, 20, None, None, true)
.execute(dt.log_store(), dt.snapshot()?, 1, 20, None, None)
.await;
assert!(maybe_metrics.is_err());
@ -504,8 +357,8 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
)
.await?;
let (metrics, _) = plan
.execute(dt.log_store(), dt.snapshot()?, 1, 20, None, None, true)
let metrics = plan
.execute(dt.log_store(), dt.snapshot()?, 1, 20, None, None)
.await?;
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
@ -544,7 +397,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
WriterProperties::builder().build(),
)?;
let (metrics, _) = plan
let metrics = plan
.execute(
dt.log_store(),
dt.snapshot()?,
@ -552,7 +405,6 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
20,
Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added
None,
true,
)
.await?;
assert_eq!(metrics.num_files_added, 2);
@ -599,7 +451,7 @@ async fn test_idempotent() -> Result<(), Box<dyn Error>> {
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
assert_eq!(dt.version(), version + 1);
@ -608,7 +460,7 @@ async fn test_idempotent() -> Result<(), Box<dyn Error>> {
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 0);
assert_eq!(metrics.num_files_removed, 0);
@ -633,7 +485,7 @@ async fn test_idempotent_metrics() -> Result<(), Box<dyn Error>> {
let version = dt.version();
let optimize = DeltaOps(dt).optimize().with_target_size(10_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
let expected_metric_details = MetricDetails {
min: 0,
@ -708,7 +560,7 @@ async fn test_idempotent_with_multiple_bins() -> Result<(), Box<dyn Error>> {
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 2);
assert_eq!(metrics.num_files_removed, 4);
assert_eq!(dt.version(), version + 1);
@ -717,7 +569,7 @@ async fn test_idempotent_with_multiple_bins() -> Result<(), Box<dyn Error>> {
.optimize()
.with_filters(&filter)
.with_target_size(10_000_000);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 0);
assert_eq!(metrics.num_files_removed, 0);
assert_eq!(dt.version(), version + 1);
@ -754,7 +606,7 @@ async fn test_commit_info() -> Result<(), Box<dyn Error>> {
.optimize()
.with_target_size(2_000_000)
.with_filters(&filter);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
let commit_info = dt.history(None).await?;
let last_commit = &commit_info[0];
@ -857,7 +709,7 @@ async fn test_zorder_unpartitioned() -> Result<(), Box<dyn Error>> {
"x".to_string(),
"y".to_string(),
]));
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
@ -927,7 +779,7 @@ async fn test_zorder_partitioned() -> Result<(), Box<dyn Error>> {
.optimize()
.with_type(OptimizeType::ZOrder(vec!["x".to_string(), "y".to_string()]))
.with_filters(&filter);
let (dt, metrics, _) = optimize.await?;
let (dt, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);
@ -987,7 +839,7 @@ async fn test_zorder_respects_target_size() -> Result<(), Box<dyn Error>> {
)
.with_type(OptimizeType::ZOrder(vec!["x".to_string(), "y".to_string()]))
.with_target_size(10_000_000);
let (_, metrics, _) = optimize.await?;
let (_, metrics) = optimize.await?;
assert_eq!(metrics.num_files_added, 2);
assert_eq!(metrics.num_files_removed, 3);

View File

@ -98,6 +98,12 @@ impl StorageIntegration for GcpIntegration {
}
}
impl GcpIntegration {
fn delete_bucket(&self) -> std::io::Result<ExitStatus> {
gs_cli::delete_bucket(self.bucket_name.clone())
}
}
/// small wrapper around google api
pub mod gs_cli {
use super::set_env_if_not_set;

View File

@ -299,29 +299,7 @@ Delta tables can accumulate small files for a variety of reasons:
* User error: users can accidentally write files that are too small. Users should sometimes repartition in memory before writing to disk to avoid appending files that are too small.
* Frequent appends: systems that append more often tend to append more smaller files. A pipeline that appends every minute will generally generate ten times as many small files compared to a system that appends every ten minutes.
* Appending to partitioned data lakes with high cardinality columns can also cause small files. If you append every hour to a table thats partitioned on a column with 1,000 distinct values, then every append could create 1,000 new files. Partitioning by date avoids this problem because the data isnt split up across partitions in this manner.
## Deferring commits while optimizing
`delta-rs` also supports **deferred commits** for compaction. This means that you can run your compaction and commit later.
This is useful when you have separate writer and compaction threads. Compaction can run in the background while the writer continues to append small files. However, commits for the writer and compactor must occur in sequence. Simultaneous commits may result in a race condition.
When compaction is complete, the python client can wait for the writer to stop (likely by acquiring a lock) before committing the compaction operation.
An example of two threads writing and optimizing simultaneously:
```python
# Thread 1
with lock:
dt.write_deltalake(...)
# Thread 2
dt.optimize.compact(commit_writes=False)
with lock:
dt.optimize.commit()
```
* Appending to partitioned data lakes with high cardinality columns can also cause small files. If you append every hour to a table thats partitioned on a column with 1,000 distinct values, then every append could create 1,000 new files. Partitioning by date avoids this problem because the data isnt split up across partitions in this manner.
## Conclusion

View File

@ -28,7 +28,6 @@ env_logger = "0"
lazy_static = "1"
regex = { workspace = true }
thiserror = { workspace = true }
tracing = { version = "0.1", features = ["log"] }
# runtime
futures = { workspace = true }

View File

@ -59,9 +59,6 @@ class RawDeltaTable:
enforce_retention_duration: bool,
custom_metadata: Optional[Dict[str, str]],
) -> List[str]: ...
def commit_optimize(
self
) -> None: ...
def compact_optimize(
self,
partition_filters: Optional[FilterType],
@ -70,7 +67,6 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
commit_writes: bool
) -> str: ...
def z_order_optimize(
self,
@ -299,11 +295,11 @@ class ArrayType:
"""
element_type: DataType
""" The type of the element, of type:
""" The type of the element, of type:
Union[
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[StructType][deltalake.schema.StructType]
]
"""
@ -381,21 +377,21 @@ class MapType:
) -> None: ...
type: Literal["map"]
key_type: DataType
""" The type of the keys, of type:
""" The type of the keys, of type:
Union[
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[StructType][deltalake.schema.StructType]
]
"""
value_type: DataType
"""The type of the values, of type:
"""The type of the values, of type:
Union[
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[StructType][deltalake.schema.StructType]
]
"""
@ -483,11 +479,11 @@ class Field:
"""
type: DataType
""" The type of the field, of type:
""" The type of the field, of type:
Union[
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[PrimitiveType][deltalake.schema.PrimitiveType],
[ArrayType][deltalake.schema.ArrayType],
[MapType][deltalake.schema.MapType],
[StructType][deltalake.schema.StructType]
]
"""

View File

@ -162,7 +162,7 @@ class WriterProperties:
if compression_level is not None and compression is None:
raise ValueError(
"""Providing a compression level without the compression type is not possible,
"""Providing a compression level without the compression type is not possible,
please provide the compression as well."""
)
if isinstance(compression, str):
@ -1791,7 +1791,7 @@ class TableAlterer:
"""
if len(constraints.keys()) > 1:
raise ValueError(
"""add_constraints is limited to a single constraint addition at once for now.
"""add_constraints is limited to a single constraint addition at once for now.
Please execute add_constraints multiple times with each time a different constraint."""
)
@ -1857,11 +1857,6 @@ class TableOptimizer:
return self.compact(partition_filters, target_size, max_concurrent_tasks)
def commit(
self
) -> Dict[str, Any]:
self.table._table.commit_optimize()
def compact(
self,
partition_filters: Optional[FilterType] = None,
@ -1870,7 +1865,6 @@ class TableOptimizer:
min_commit_interval: Optional[Union[int, timedelta]] = None,
writer_properties: Optional[WriterProperties] = None,
custom_metadata: Optional[Dict[str, str]] = None,
commit_writes: bool = True
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
@ -1924,7 +1918,6 @@ class TableOptimizer:
min_commit_interval,
writer_properties._to_dict() if writer_properties else None,
custom_metadata,
commit_writes
)
self.table.update_incremental()
return json.loads(metrics)

View File

@ -32,7 +32,7 @@ use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::drop_constraints::DropConstraintBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
use deltalake::operations::optimize::{CommitContext, OptimizeBuilder, OptimizeType};
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::update::UpdateBuilder;
@ -70,8 +70,6 @@ struct RawDeltaTable {
_table: deltalake::DeltaTable,
// storing the config additionally on the table helps us make pickling work.
_config: FsConfig,
// commit
_commit: Option<CommitContext>,
}
#[pyclass]
@ -127,7 +125,6 @@ impl RawDeltaTable {
root_url: table_uri.into(),
options,
},
_commit: None,
})
}
@ -356,31 +353,6 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}
#[pyo3(signature = ())]
pub fn commit_optimize(&mut self) -> PyResult<()> {
let cmd = OptimizeBuilder::new(
self._table.log_store(),
self._table.snapshot().map_err(PythonError::from)?.clone(),
)
.with_max_concurrent_tasks(num_cpus::get());
if self._commit.is_none() {
return Err(PyValueError::new_err("Cannot commit optimization, nothing to commit."))
}
let table = rt()?.block_on(
cmd.commit_writes(
self._commit.take().unwrap()
)
).map_err(PythonError::from)?;
self._table.state = table.state;
self._commit = None;
Ok(())
}
/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (
partition_filters = None,
@ -389,64 +361,49 @@ impl RawDeltaTable {
min_commit_interval = None,
writer_properties=None,
custom_metadata=None,
commit_writes=true
))]
#[allow(clippy::too_many_arguments)]
pub fn compact_optimize(
&mut self,
py: Python,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
writer_properties: Option<HashMap<String, Option<String>>>,
custom_metadata: Option<HashMap<String, String>>,
commit_writes: bool,
) -> PyResult<String> {
py.allow_threads(|| {
let mut cmd = OptimizeBuilder::new(
self._table.log_store(),
self._table.snapshot().map_err(PythonError::from)?.clone(),
)
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
let mut cmd = OptimizeBuilder::new(
self._table.log_store(),
self._table.snapshot().map_err(PythonError::from)?.clone(),
)
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
if let Some(writer_props) = writer_properties {
cmd = cmd.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}
cmd = cmd.with_commit_writes(commit_writes);
if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
cmd = cmd.with_metadata(json_metadata);
};
if let Some(writer_props) = writer_properties {
cmd = cmd.with_writer_properties(
set_writer_properties(writer_props).map_err(PythonError::from)?,
);
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
if let Some(metadata) = custom_metadata {
let json_metadata: Map<String, Value> =
metadata.into_iter().map(|(k, v)| (k, v.into())).collect();
cmd = cmd.with_metadata(json_metadata);
};
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
let (table, metrics, commit_info) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
if commit_info.is_some() {
self._commit = commit_info
}
Ok(serde_json::to_string(&metrics).unwrap())
})
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}
/// Run z-order variation of optimize
@ -500,7 +457,7 @@ impl RawDeltaTable {
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
let (table, metrics, _commit_info) = rt()?
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;

View File

@ -79,30 +79,6 @@ def test_z_order_optimize(
assert dt.version() == old_version + 1
assert len(dt.file_uris()) == 1
def test_optimize_deferred_write(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
dt = DeltaTable(tmp_path)
old_version = dt.version()
dt.optimize.compact(commit_writes=False)
last_action = dt.history(1)[0]
# OPTIMIZE operation has not yet been committed
assert last_action["operation"] == "WRITE"
assert dt.version() == old_version
dt.optimize.commit()
assert dt.version() == old_version + 1
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,