feat(rust): advance state in post commit (#2396)

# Description
We advance the state in the post commit now, so it's done in a single
location as per suggestion from @Blajda here:
https://github.com/delta-io/delta-rs/pull/2391#issuecomment-2041500757

This PR also supersedes this one:
https://github.com/delta-io/delta-rs/pull/2280

# Related Issue(s)
- fixes #2279
- fixes #2262
This commit is contained in:
Ion Koutsouris 2024-04-27 19:08:43 +02:00 committed by GitHub
parent 9d3ecbeb62
commit 28ad3950d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 111 additions and 151 deletions

View File

@ -198,9 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;
this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}

View File

@ -186,16 +186,16 @@ async fn excute_non_empty_expr(
async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), DeleteMetrics)> {
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();
let scan_start = Instant::now();
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis();
let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
@ -205,7 +205,7 @@ async fn execute(
} else {
let write_start = Instant::now();
let add = excute_non_empty_expr(
snapshot,
&snapshot,
log_store.clone(),
&state,
&predicate,
@ -258,21 +258,14 @@ async fn execute(
predicate: Some(fmt_expr_to_sql(&predicate)?),
};
if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot.clone(), metrics));
}
let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store, operation)?
.build(Some(&snapshot), log_store, operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}
impl std::future::IntoFuture for DeleteBuilder {
@ -305,22 +298,20 @@ impl std::future::IntoFuture for DeleteBuilder {
None => None,
};
let ((actions, version, operation), metrics) = execute(
let (new_snapshot, metrics) = execute(
predicate,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
)
.await?;
if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, new_snapshot),
metrics,
))
})
}
}

View File

@ -91,9 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;
this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}

View File

@ -932,7 +932,7 @@ async fn execute(
predicate: Expression,
source: DataFrame,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
@ -942,7 +942,7 @@ async fn execute(
match_operations: Vec<MergeOperationConfig>,
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), MergeMetrics)> {
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();
@ -987,7 +987,7 @@ async fn execute(
let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(true)
.with_parquet_pushdown(false)
.build(snapshot)?;
.build(&snapshot)?;
let target_provider = Arc::new(DeltaTableProvider::try_new(
snapshot.clone(),
@ -1017,7 +1017,7 @@ async fn execute(
} else {
try_construct_early_filter(
predicate.clone(),
snapshot,
&snapshot,
&state,
&source,
&source_name,
@ -1370,7 +1370,7 @@ async fn execute(
let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(snapshot),
Some(&snapshot),
state.clone(),
write,
table_partition_cols.clone(),
@ -1449,21 +1449,14 @@ async fn execute(
};
if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot, metrics));
}
let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store.clone(), operation)?
.build(Some(&snapshot), log_store.clone(), operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}
fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
@ -1521,11 +1514,11 @@ impl std::future::IntoFuture for MergeBuilder {
session.state()
});
let ((actions, version, operation), metrics) = execute(
let (snapshot, metrics) = execute(
this.predicate,
this.source,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
@ -1538,12 +1531,10 @@ impl std::future::IntoFuture for MergeBuilder {
)
.await?;
if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
))
})
}
}

View File

@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync {
fn metadata(&self) -> &Metadata;
/// Try to cast this table reference to a `EagerSnapshot`
fn eager_snapshot(&self) -> Option<&EagerSnapshot>;
fn eager_snapshot(&self) -> &EagerSnapshot;
}
impl TableReference for EagerSnapshot {
@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot {
self.table_config()
}
fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(self)
fn eager_snapshot(&self) -> &EagerSnapshot {
self
}
}
@ -241,8 +241,8 @@ impl TableReference for DeltaTableState {
self.snapshot.metadata()
}
fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(&self.snapshot)
fn eager_snapshot(&self) -> &EagerSnapshot {
&self.snapshot
}
}
@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot =
this.table_data
.unwrap()
.eager_snapshot()
.ok_or(DeltaTableError::Generic(
"Expected an instance of EagerSnapshot".to_owned(),
))?;
let read_snapshot = this.table_data.unwrap().eager_snapshot();
let mut attempt_number = 1;
while attempt_number <= this.max_retries {
@ -595,36 +589,49 @@ pub struct PostCommit<'a> {
impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(
&self,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if self.create_checkpoint {
self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data)
.await?
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
if let Some(table) = self.table_data {
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
// This may only occur during concurrent write actions. We need to update the state first to - 1
// then we can advance.
snapshot
.update(self.log_store.clone(), Some(self.version - 1))
.await?;
snapshot.advance(vec![&self.data])?;
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
.await?;
}
Ok(state)
} else {
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.object_store(),
Default::default(),
Some(self.version),
)
.await?;
Ok(state)
}
Ok(())
}
async fn create_checkpoint(
&self,
table: &Option<&'a dyn TableReference>,
table_state: &DeltaTableState,
log_store: &LogStoreRef,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if let Some(table) = table {
let checkpoint_interval = table.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
// We have to advance the snapshot otherwise we can't create a checkpoint
let mut snapshot = table.eager_snapshot().unwrap().clone();
snapshot.advance(vec![commit_data])?;
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
create_checkpoint_for(version, &state, log_store.as_ref()).await?
}
let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
create_checkpoint_for(version, table_state, log_store.as_ref()).await?
}
Ok(())
}
@ -632,22 +639,22 @@ impl<'a> PostCommit<'a> {
/// A commit that successfully completed
pub struct FinalizedCommit {
/// The winning version number of the commit
/// The new table state after a commmit
pub snapshot: DeltaTableState,
/// Version of the finalized commit
pub version: i64,
/// The data that was comitted to the log store
pub data: CommitData,
}
impl FinalizedCommit {
/// The materialized version of the commit
/// The new table state after a commmit
pub fn snapshot(&self) -> DeltaTableState {
self.snapshot.clone()
}
/// Version of the finalized commit
pub fn version(&self) -> i64 {
self.version
}
/// Data used to write the commit
pub fn data(&self) -> &CommitData {
&self.data
}
}
impl<'a> std::future::IntoFuture for PostCommit<'a> {
@ -658,15 +665,13 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {
let this = self;
Box::pin(async move {
match this.run_post_commit_hook(this.version, &this.data).await {
Ok(_) => {
return Ok(FinalizedCommit {
version: this.version,
data: this.data,
})
}
Err(err) => return Err(err),
};
match this.run_post_commit_hook().await {
Ok(snapshot) => Ok(FinalizedCommit {
snapshot,
version: this.version,
}),
Err(err) => Err(err),
}
})
}
}

View File

@ -41,7 +41,7 @@ use futures::future::BoxFuture;
use parquet::file::properties::WriterProperties;
use serde::Serialize;
use super::transaction::PROTOCOL;
use super::transaction::{FinalizedCommit, PROTOCOL};
use super::write::write_execution_plan;
use super::{
datafusion_utils::Expression,
@ -168,12 +168,12 @@ async fn execute(
predicate: Option<Expression>,
updates: HashMap<Column, Expression>,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
safe_cast: bool,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), UpdateMetrics)> {
) -> DeltaResult<(DeltaTableState, UpdateMetrics)> {
// Validate the predicate and update expressions.
//
// If the predicate is not set, then all files need to be updated.
@ -189,7 +189,7 @@ async fn execute(
let version = snapshot.version();
if updates.is_empty() {
return Ok(((Vec::new(), version, None), metrics));
return Ok((snapshot, metrics));
}
let predicate = match predicate {
@ -214,11 +214,11 @@ async fn execute(
let table_partition_cols = current_metadata.partition_columns.clone();
let scan_start = Instant::now();
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64;
if candidates.candidates.is_empty() {
return Ok(((Vec::new(), version, None), metrics));
return Ok((snapshot, metrics));
}
let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
@ -226,7 +226,7 @@ async fn execute(
let execution_props = state.execution_props();
// For each rewrite evaluate the predicate and then modify each expression
// to either compute the new value or obtain the old one then write these batches
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state)
.with_files(&candidates.candidates)
.build()
.await?;
@ -350,7 +350,7 @@ async fn execute(
)?);
let add_actions = write_execution_plan(
Some(snapshot),
Some(&snapshot),
state.clone(),
projection.clone(),
table_partition_cols.clone(),
@ -416,17 +416,10 @@ async fn execute(
let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store, operation)?
.build(Some(&snapshot), log_store, operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}
impl std::future::IntoFuture for UpdateBuilder {
@ -449,11 +442,11 @@ impl std::future::IntoFuture for UpdateBuilder {
session.state()
});
let ((actions, version, operation), metrics) = execute(
let (snapshot, metrics) = execute(
this.predicate,
this.updates,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
@ -461,12 +454,10 @@ impl std::future::IntoFuture for UpdateBuilder {
)
.await?;
if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
))
})
}
}

View File

@ -808,17 +808,7 @@ impl std::future::IntoFuture for WriteBuilder {
)?
.await?;
// TODO we do not have the table config available, but since we are merging only our newly
// created actions, it may be safe to assume, that we want to include all actions.
// then again, having only some tombstones may be misleading.
if let Some(mut snapshot) = this.snapshot {
snapshot.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, snapshot))
} else {
let mut table = DeltaTable::new(this.log_store, Default::default());
table.update().await?;
Ok(table)
}
Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot))
})
}
}

View File

@ -177,17 +177,7 @@ async fn test_merge_concurrent_different_partition() {
// TODO: Currently it throws a Version mismatch error, but the merge commit was successfully
// This bug needs to be fixed, see pull request #2280
assert!(!matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Transaction { .. }
));
assert!(matches!(
result.as_ref().unwrap_err(),
DeltaTableError::Generic(_)
));
if let DeltaTableError::Generic(msg) = result.unwrap_err() {
assert_eq!(msg, "Version mismatch");
}
assert!(matches!(result.as_ref().is_ok(), true));
}
#[tokio::test]