Handle ObjectRemoved:Delete events and translate those into Delta table removals

This change will handle deleted files correctly, but will also ensure
that removed files don't incorrectly show up as additions.

With this change S3 LifeCycle configurations should _just work_ with
Delta tables

Fixes #10
This commit is contained in:
R Tyler Croy 2023-12-18 10:52:07 -08:00
parent 6c34f4ed81
commit b3f45b2b2d
4 changed files with 226 additions and 12 deletions

View File

@ -35,6 +35,14 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRe
.collect()
}
/// Struct to keep track of the table modifications needing to be made based on
/// [S3EventRecord] objects .
#[derive(Debug, Clone, Default)]
pub struct TableMods {
pub adds: Vec<ObjectMeta>,
pub removes: Vec<ObjectMeta>,
}
/**
* Group the objects from the notification based on the delta tables they should be added to.
*
@ -43,8 +51,8 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRe
* we iterate the records, we can group them based on the delta table and then create the
* appropriate transactions
*/
pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, Vec<ObjectMeta>> {
let mut result = HashMap::new();
pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, TableMods> {
let mut mods = HashMap::new();
for record in records.iter() {
if let Some(bucket) = &record.s3.bucket.name {
@ -53,16 +61,22 @@ pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, Vec<Object
let key = format!("s3://{}/{}", bucket, log_path);
if !result.contains_key(&key) {
result.insert(key.clone(), vec![]);
if !mods.contains_key(&key) {
mods.insert(key.clone(), TableMods::default());
}
if let Some(objects) = result.get_mut(&key) {
objects.push(om);
if let Some(objects) = mods.get_mut(&key) {
if let Some(event_name) = &record.event_name {
if event_name.starts_with("ObjectCreated") {
objects.adds.push(om);
} else if event_name == "ObjectRemoved:Delete" {
objects.removes.push(om);
}
}
}
}
}
result
mods
}
/**
@ -315,7 +329,7 @@ mod tests {
.expect("Failed to get the first table");
assert_eq!(
1,
table_one.len(),
table_one.adds.len(),
"Shoulid only be one object in table one"
);
@ -324,7 +338,7 @@ mod tests {
.expect("Failed to get the second table");
assert_eq!(
2,
table_two.len(),
table_two.adds.len(),
"Shoulid only be two objects in table two"
);
}
@ -345,6 +359,33 @@ mod tests {
assert_eq!(4, events.len(), "Unexpected number of entries");
}
#[test]
fn test_s3_from_sqs_with_delete() {
let buf = r#"{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"us-west-2","eventTime":"2023-12-18T00:22:24.292Z","eventName":"ObjectRemoved:Delete","userIdentity":{"principalId":"A16S3A764ZBGJN"},"requestParameters":{"sourceIPAddress":"76.218.225.124"},"responseElements":{"x-amz-request-id":"CWK6W9YANZBH6SK4","x-amz-id-2":"H7P6nIKhchv9soZ4pnX0GsAj3zqqdrShFddk4kX9UpSbC2C5FL9XNvNtSxtTD1Nt0ZtTnREeZIMqO1IsSpkebocjUTRJkumh"},"s3":{"s3SchemaVersion":"1.0","configurationId":"test-delete","bucket":{"name":"oxbow-simple","ownerIdentity":{"principalId":"A16S3A764ZBGJN"},"arn":"arn:aws:s3:::oxbow-simple"},"object":{"key":"gcs-export/ds%3D2023-12-12/testing_oxbow-partitioned2_ds%3D2023-12-12_000000000000.parquet","sequencer":"00657F90C047858AE9"}}}]}"#;
let message = SqsMessage {
body: Some(buf.into()),
..Default::default()
};
let event = SqsEvent {
records: vec![message],
};
let events = s3_from_sqs(event).expect("Failed to get events");
assert_eq!(1, events.len(), "Unexpected number of entries");
let records = records_with_url_decoded_keys(&events);
let tables = objects_by_table(records.as_slice());
if let Some(mods) = tables.get("s3://oxbow-simple/gcs-export") {
assert_eq!(
mods.removes.len(),
1,
"Should have recorded a removes table modification"
);
} else {
assert!(false, "Failed to find the right key on {tables:?}");
}
}
#[test]
fn test_s3_from_sqs_with_invalid() {
let message = SqsMessage {

View File

@ -226,6 +226,24 @@ pub async fn append_to_table(files: &[ObjectMeta], table: &mut DeltaTable) -> De
.await
}
/// Remove the given files from an already existing and initialized [DeltaTable]
pub async fn remove_from_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
let actions = remove_actions_for(files);
if actions.is_empty() {
return Ok(table.version());
}
deltalake::operations::transaction::commit(
table.object_store().as_ref(),
&actions,
DeltaOperation::Delete { predicate: None },
table.get_state(),
None,
)
.await
}
/**
* Take an iterator of files and determine what looks like a partition column from it
*/
@ -287,6 +305,26 @@ pub fn add_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
.collect()
}
/// Provide a series of Remove actions for the given [ObjectMeta] entries
pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
files
.iter()
.map(|om| Remove {
path: om.location.to_string(),
data_change: true,
size: Some(om.size as i64),
partition_values: Some(
partitions_from(om.location.as_ref())
.into_iter()
.map(|p| ((p.key), Some(p.value)))
.collect(),
),
..Default::default()
})
.map(Action::remove)
.collect()
}
/**
* Return the smallest file from the given set of files.
*
@ -456,6 +494,20 @@ mod tests {
assert_eq!(1, result.len());
}
#[test]
fn remove_actions_for_not_empty() {
let files = vec![ObjectMeta {
location: Path::from(
"part-00001-f2126b8d-1594-451b-9c89-c4c2481bfd93-c000.snappy.parquet",
),
last_modified: Utc::now(),
size: 689,
e_tag: None,
}];
let result = remove_actions_for(&files);
assert_eq!(1, result.len());
}
#[test]
fn partition_columns_from_empty() {
let expected: Vec<String> = vec![];
@ -719,4 +771,101 @@ mod tests {
"There were not unique fields, that probably means a `ds` column is doubled up"
);
}
#[tokio::test]
async fn test_remove_from_table() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
remove_from_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(
1,
table.version(),
"Unexpected version, should have created a commit"
);
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
}
#[tokio::test]
async fn test_remove_unknown_from_table() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
// Only creating the table with some of the files
let mut table = create_table_with(&files[0..1].to_vec(), store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
// Removing _all_ the files we know about, this should not error
remove_from_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(
1,
table.version(),
"Unexpected version, should have created a commit"
);
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
}
#[tokio::test]
async fn test_remove_empty_set() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
remove_from_table(&vec![], &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(
0,
table.version(),
"Unexpected version, should not have created a commit"
);
}
}

View File

@ -9,7 +9,7 @@ resource "aws_s3_bucket_notification" "bucket-notifications" {
queue {
queue_arn = aws_sqs_queue.oxbow.arn
events = ["s3:ObjectCreated:*"]
events = ["s3:ObjectCreated:*", "s3:ObjectRemoved:Delete"]
filter_suffix = ".parquet"
}

View File

@ -49,7 +49,7 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
for table_name in by_table.keys() {
let location = Url::parse(table_name).expect("Failed to turn a table into a URL");
debug!("Handling table: {:?}", location);
let files = by_table
let table_mods = by_table
.get(table_name)
.expect("Failed to get the files for a table, impossible!");
let mut storage_options: HashMap<String, String> = HashMap::default();
@ -76,7 +76,7 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
Ok(mut table) => {
info!("Opened table to append: {:?}", table);
match oxbow::append_to_table(files, &mut table).await {
match oxbow::append_to_table(table_mods.adds.as_slice(), &mut table).await {
Ok(version) => {
info!(
"Successfully appended version {} to table at {}",
@ -105,6 +105,30 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
return Err(Box::new(err));
}
}
if !table_mods.removes.is_empty() {
info!(
"{} Remove actions are expected in this operation",
table_mods.removes.len()
);
match oxbow::remove_from_table(table_mods.removes.as_slice(), &mut table).await
{
Ok(version) => {
info!(
"Successfully created version {} with remove actions",
version
);
}
Err(err) => {
error!(
"Failed to create removes on the table {}: {:?}",
location, err
);
let _ = release_lock(lock, &lock_client).await;
return Err(Box::new(err));
}
}
}
}
Err(DeltaTableError::NotATable(_e)) => {
// create the table with our objects