diff --git a/crates/oxbow-lambda-shared/src/lib.rs b/crates/oxbow-lambda-shared/src/lib.rs index c045f0a..f3bcbee 100644 --- a/crates/oxbow-lambda-shared/src/lib.rs +++ b/crates/oxbow-lambda-shared/src/lib.rs @@ -35,6 +35,14 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec, + pub removes: Vec, +} + /** * Group the objects from the notification based on the delta tables they should be added to. * @@ -43,8 +51,8 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec HashMap> { - let mut result = HashMap::new(); +pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap { + let mut mods = HashMap::new(); for record in records.iter() { if let Some(bucket) = &record.s3.bucket.name { @@ -53,16 +61,22 @@ pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap De .await } +/// Remove the given files from an already existing and initialized [DeltaTable] +pub async fn remove_from_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult { + let actions = remove_actions_for(files); + + if actions.is_empty() { + return Ok(table.version()); + } + + deltalake::operations::transaction::commit( + table.object_store().as_ref(), + &actions, + DeltaOperation::Delete { predicate: None }, + table.get_state(), + None, + ) + .await +} + /** * Take an iterator of files and determine what looks like a partition column from it */ @@ -287,6 +305,26 @@ pub fn add_actions_for(files: &[ObjectMeta]) -> Vec { .collect() } +/// Provide a series of Remove actions for the given [ObjectMeta] entries +pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec { + files + .iter() + .map(|om| Remove { + path: om.location.to_string(), + data_change: true, + size: Some(om.size as i64), + partition_values: Some( + partitions_from(om.location.as_ref()) + .into_iter() + .map(|p| ((p.key), Some(p.value))) + .collect(), + ), + ..Default::default() + }) + .map(Action::remove) + .collect() +} + /** * Return the smallest file from the given set of files. * @@ -456,6 +494,20 @@ mod tests { assert_eq!(1, result.len()); } + #[test] + fn remove_actions_for_not_empty() { + let files = vec![ObjectMeta { + location: Path::from( + "part-00001-f2126b8d-1594-451b-9c89-c4c2481bfd93-c000.snappy.parquet", + ), + last_modified: Utc::now(), + size: 689, + e_tag: None, + }]; + let result = remove_actions_for(&files); + assert_eq!(1, result.len()); + } + #[test] fn partition_columns_from_empty() { let expected: Vec = vec![]; @@ -719,4 +771,101 @@ mod tests { "There were not unique fields, that probably means a `ds` column is doubled up" ); } + + #[tokio::test] + async fn test_remove_from_table() { + let (_tempdir, store) = + util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned"); + + let files = discover_parquet_files(store.clone()) + .await + .expect("Failed to discover parquet files"); + assert_eq!(files.len(), 4, "No files discovered"); + + let mut table = create_table_with(&files, store.clone()) + .await + .expect("Failed to create table"); + let schema = table.get_schema().expect("Failed to get schema"); + assert!( + schema.get_field_with_name("c2").is_ok(), + "The schema does not include the expected partition key `c2`" + ); + assert_eq!(0, table.version(), "Unexpected version"); + + remove_from_table(&files, &mut table) + .await + .expect("Failed to append files"); + table.load().await.expect("Failed to reload the table"); + assert_eq!( + 1, + table.version(), + "Unexpected version, should have created a commit" + ); + assert_eq!(table.get_files().len(), 0, "Found redundant files!"); + } + + #[tokio::test] + async fn test_remove_unknown_from_table() { + let (_tempdir, store) = + util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned"); + + let files = discover_parquet_files(store.clone()) + .await + .expect("Failed to discover parquet files"); + assert_eq!(files.len(), 4, "No files discovered"); + + // Only creating the table with some of the files + let mut table = create_table_with(&files[0..1].to_vec(), store.clone()) + .await + .expect("Failed to create table"); + let schema = table.get_schema().expect("Failed to get schema"); + assert!( + schema.get_field_with_name("c2").is_ok(), + "The schema does not include the expected partition key `c2`" + ); + assert_eq!(0, table.version(), "Unexpected version"); + + // Removing _all_ the files we know about, this should not error + remove_from_table(&files, &mut table) + .await + .expect("Failed to append files"); + table.load().await.expect("Failed to reload the table"); + assert_eq!( + 1, + table.version(), + "Unexpected version, should have created a commit" + ); + assert_eq!(table.get_files().len(), 0, "Found redundant files!"); + } + + #[tokio::test] + async fn test_remove_empty_set() { + let (_tempdir, store) = + util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned"); + + let files = discover_parquet_files(store.clone()) + .await + .expect("Failed to discover parquet files"); + assert_eq!(files.len(), 4, "No files discovered"); + + let mut table = create_table_with(&files, store.clone()) + .await + .expect("Failed to create table"); + let schema = table.get_schema().expect("Failed to get schema"); + assert!( + schema.get_field_with_name("c2").is_ok(), + "The schema does not include the expected partition key `c2`" + ); + assert_eq!(0, table.version(), "Unexpected version"); + + remove_from_table(&vec![], &mut table) + .await + .expect("Failed to append files"); + table.load().await.expect("Failed to reload the table"); + assert_eq!( + 0, + table.version(), + "Unexpected version, should not have created a commit" + ); + } } diff --git a/deployment/simple.tf b/deployment/simple.tf index e6c19b0..3e4704a 100644 --- a/deployment/simple.tf +++ b/deployment/simple.tf @@ -9,7 +9,7 @@ resource "aws_s3_bucket_notification" "bucket-notifications" { queue { queue_arn = aws_sqs_queue.oxbow.arn - events = ["s3:ObjectCreated:*"] + events = ["s3:ObjectCreated:*", "s3:ObjectRemoved:Delete"] filter_suffix = ".parquet" } diff --git a/lambdas/oxbow/src/main.rs b/lambdas/oxbow/src/main.rs index 4a61275..3edcc13 100644 --- a/lambdas/oxbow/src/main.rs +++ b/lambdas/oxbow/src/main.rs @@ -49,7 +49,7 @@ async fn func<'a>(event: LambdaEvent) -> Result { for table_name in by_table.keys() { let location = Url::parse(table_name).expect("Failed to turn a table into a URL"); debug!("Handling table: {:?}", location); - let files = by_table + let table_mods = by_table .get(table_name) .expect("Failed to get the files for a table, impossible!"); let mut storage_options: HashMap = HashMap::default(); @@ -76,7 +76,7 @@ async fn func<'a>(event: LambdaEvent) -> Result { Ok(mut table) => { info!("Opened table to append: {:?}", table); - match oxbow::append_to_table(files, &mut table).await { + match oxbow::append_to_table(table_mods.adds.as_slice(), &mut table).await { Ok(version) => { info!( "Successfully appended version {} to table at {}", @@ -105,6 +105,30 @@ async fn func<'a>(event: LambdaEvent) -> Result { return Err(Box::new(err)); } } + + if !table_mods.removes.is_empty() { + info!( + "{} Remove actions are expected in this operation", + table_mods.removes.len() + ); + match oxbow::remove_from_table(table_mods.removes.as_slice(), &mut table).await + { + Ok(version) => { + info!( + "Successfully created version {} with remove actions", + version + ); + } + Err(err) => { + error!( + "Failed to create removes on the table {}: {:?}", + location, err + ); + let _ = release_lock(lock, &lock_client).await; + return Err(Box::new(err)); + } + } + } } Err(DeltaTableError::NotATable(_e)) => { // create the table with our objects