Refactor commits to accept Actions directly and prepare for schema evolution

Using actions directly for the commit also ensures that adds and removes
happen in the same commit rather than the two separate commits as was
done prior.
This commit is contained in:
R Tyler Croy 2024-04-20 08:17:48 -07:00
parent 795f095094
commit 46a1c10835
7 changed files with 296 additions and 284 deletions

View File

@ -7,7 +7,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.11.2"
version = "0.12.0"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
homepage = "https://github.com/buoyant-data/oxbow"

View File

@ -17,3 +17,5 @@ tracing = { workspace = true }
serde = { version = "=1", features = ["rc"] }
serde_json = "=1"
urlencoding = "=2"
oxbow = { path = "../oxbow" }

View File

@ -8,14 +8,14 @@ use chrono::prelude::*;
use deltalake::{DeltaResult, ObjectMeta, Path};
use tracing::log::*;
use oxbow::TableMods;
use std::collections::HashMap;
/**
* Return wholly new [`S3EventRecord`] objects with their the [`S3Object`] `url_decoded_key`
* properly filled in
*
* For whatever reason `aws_lambda_events` does not properly handle this
*/
///
/// Return wholly new [`S3EventRecord`] objects with their the [`S3Object`] `url_decoded_key`
/// properly filled in
///
/// For whatever reason `aws_lambda_events` does not properly handle this
pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRecord> {
use urlencoding::decode;
@ -37,22 +37,12 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRe
.collect()
}
/// Struct to keep track of the table modifications needing to be made based on
/// [S3EventRecord] objects .
#[derive(Debug, Clone, Default)]
pub struct TableMods {
pub adds: Vec<ObjectMeta>,
pub removes: Vec<ObjectMeta>,
}
/**
* Group the objects from the notification based on the delta tables they should be added to.
*
* There's a possibility that an S3 bucket notification will have objects mixed in which should be
* destined for different delta tables. Rather than re-opening/loading the table for each object as
* we iterate the records, we can group them based on the delta table and then create the
* appropriate transactions
*/
/// Group the objects from the notification based on the delta tables they should be added to.
///
/// There's a possibility that an S3 bucket notification will have objects mixed in which should be
/// destined for different delta tables. Rather than re-opening/loading the table for each object as
/// we iterate the records, we can group them based on the delta table and then create the
/// appropriate transactions
pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, TableMods> {
let mut mods = HashMap::new();
@ -81,13 +71,11 @@ pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, TableMods>
mods
}
/**
* Infer the log path from the given object path.
*
* The location of `_delta_log/` can technically be _anywhere_ but for convention's
* sake oxbow will attempt to put the `_delta_log/` some place predictable to ensure that
* `add` actions in the log can use relative file paths for newly added objects
*/
/// Infer the log path from the given object path.
///
/// The location of `_delta_log/` can technically be _anywhere_ but for convention's
/// sake oxbow will attempt to put the `_delta_log/` some place predictable to ensure that
/// `add` actions in the log can use relative file paths for newly added objects
pub fn infer_log_path_from(path: &str) -> String {
use std::path::{Component, Path};
@ -189,12 +177,11 @@ pub fn s3_from_sns(event: SqsEvent) -> DeltaResult<Vec<S3EventRecord>> {
Ok(records)
}
/**
* Convert an [`S3Object`] into an [`ObjectMeta`] for use in the creation of Delta transactions
*
* This is a _lossy_ conversion since the two structs do not share the same set of information,
* therefore this conversion is really only taking the path of the object and the size
*/
///
/// Convert an [`S3Object`] into an [`ObjectMeta`] for use in the creation of Delta transactions
///
/// This is a _lossy_ conversion since the two structs do not share the same set of information,
/// therefore this conversion is really only taking the path of the object and the size
fn into_object_meta(s3object: &S3Object, prune_prefix: Option<&str>) -> ObjectMeta {
let location = s3object.url_decoded_key.clone().unwrap_or("".to_string());

View File

@ -6,7 +6,7 @@ repository.workspace = true
homepage.workspace = true
[dependencies]
chrono = { workpace = true }
chrono = { workspace = true }
deltalake = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
@ -17,7 +17,6 @@ rusoto_core = { version = "0.47", default-features = false, features = ["rustls"
rusoto_credential = { version = "0.47"}
[dev-dependencies]
chrono = "0.4.31"
fs_extra = "=1"
tempfile = "*"
tokio = { workspace = true }

View File

@ -5,6 +5,7 @@ use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::parquet::arrow::async_reader::{
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};
use deltalake::parquet::file::metadata::ParquetMetaData;
use deltalake::partitions::DeltaTablePartition;
use deltalake::protocol::*;
use deltalake::storage::DeltaObjectStore;
@ -19,10 +20,19 @@ use std::sync::Arc;
pub mod lock;
pub mod write;
/**
* convert is the main function to be called by the CLI or other "one shot" executors which just
* need to take a given location and convert it all at once
*/
/// TableMods helps to keep track of files which should be added or removed in a given
/// invocation of Oxbow-based utilities.
///
/// Typically this struct is populated by S3 events which would indicate a file(s) has been added
/// or removed.
#[derive(Debug, Clone, Default)]
pub struct TableMods {
pub adds: Vec<ObjectMeta>,
pub removes: Vec<ObjectMeta>,
}
/// convert is the main function to be called by the CLI or other "one shot" executors which just
/// need to take a given location and convert it all at once
pub async fn convert(
location: &str,
storage_options: Option<HashMap<String, String>>,
@ -63,9 +73,9 @@ pub async fn convert(
}
}
/**
* Create the ObjectStore for the given location
*/
///
/// Create the [ObjectStore] for the given location
///
pub fn object_store_for(
location: &Url,
storage_options: Option<HashMap<String, String>>,
@ -77,9 +87,13 @@ pub fn object_store_for(
Arc::new(DeltaObjectStore::try_new(location.clone(), options).expect("Failed to make store"))
}
/**
* Discover `.parquet` files which are present in the location
*/
///
/// Discover `.parquet` files which are present in the location
///
/// **NOTE**: This will not _stop_ at any particular point and can list an entire
/// prefix. While it will not recursively list, it should only be used in cases
/// where the potential number of parquet files is quite small.
///
pub async fn discover_parquet_files(
store: Arc<DeltaObjectStore>,
) -> deltalake::DeltaResult<Vec<ObjectMeta>> {
@ -100,6 +114,8 @@ pub async fn discover_parquet_files(
if !filename.ends_with(".checkpoint.parquet") {
debug!("Discovered file: {:?}", meta);
result.push(meta);
} else {
warn!("Was asked to discover parquet files on what appears to already be a table, and found checkpoint files: {filename}");
}
}
}
@ -111,11 +127,11 @@ pub async fn discover_parquet_files(
Ok(result)
}
/**
* Create a Delta table with the given series of files at the specified location
*/
///
/// Create a Delta table with a given series of files at the specified location
///
pub async fn create_table_with(
files: &Vec<ObjectMeta>,
files: &[ObjectMeta],
store: Arc<DeltaObjectStore>,
) -> DeltaResult<DeltaTable> {
use deltalake::operations::create::CreateBuilder;
@ -197,59 +213,38 @@ pub async fn create_table_with(
.await
}
/**
* Append the given files to an already existing and initialized Delta Table
*/
pub async fn append_to_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
/// Commit the given [Action]s to the [DeltaTable]
pub async fn commit_to_table(actions: &Vec<Action>, table: &mut DeltaTable) -> DeltaResult<i64> {
if actions.is_empty() {
return Ok(table.version());
}
deltalake::operations::transaction::commit(
table.object_store().as_ref(),
actions,
DeltaOperation::Update { predicate: None },
table.get_state(),
None,
)
.await
}
/// Generate the list of [Action]s based on the given [TableMods]
pub fn actions_for(mods: &TableMods, table: &DeltaTable) -> DeltaResult<Vec<Action>> {
let existing_files = table.get_files();
let new_files: Vec<ObjectMeta> = files
let new_files: Vec<ObjectMeta> = mods
.adds
.iter()
.filter(|f| !existing_files.contains(&f.location))
.cloned()
.collect();
if new_files.is_empty() {
debug!("No new files to add on {table:?}, skipping a commit");
return Ok(table.version());
}
let adds = add_actions_for(&new_files);
let removes = remove_actions_for(&mods.removes);
let actions = add_actions_for(&new_files);
deltalake::operations::transaction::commit(
table.object_store().as_ref(),
&actions,
DeltaOperation::Write {
mode: SaveMode::Append,
partition_by: Some(partition_columns_from(files)),
predicate: None,
},
table.get_state(),
None,
)
.await
Ok([adds, removes].concat())
}
/// Remove the given files from an already existing and initialized [DeltaTable]
pub async fn remove_from_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
let actions = remove_actions_for(files);
if actions.is_empty() {
return Ok(table.version());
}
deltalake::operations::transaction::commit(
table.object_store().as_ref(),
&actions,
DeltaOperation::Delete { predicate: None },
table.get_state(),
None,
)
.await
}
/**
* Take an iterator of files and determine what looks like a partition column from it
*/
/// Take an iterator of files and determine what looks like a partition column from it
fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
// The HashSet is only to prevent collecting redundant partitions
let mut results = HashSet::new();
@ -270,9 +265,7 @@ fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
results.into_iter().collect()
}
/**
* Return all the partitions from the given path buf
*/
/// Return all the partitions from the given path buf
fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
path_str
.split('/')
@ -280,12 +273,10 @@ fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
.collect()
}
/**
* Provide a series of Add actions for the given ObjectMeta entries
*
* This is a critical translation layer between discovered parquet files and how those would be
* represented inside of the log
*/
/// Provide a series of Add actions for the given ObjectMeta entries
///
/// This is a critical translation layer between discovered parquet files and how those would be
/// represented inside of the log
pub fn add_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
files
.iter()
@ -328,12 +319,10 @@ pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
.collect()
}
/**
* Return the smallest file from the given set of files.
*
* This can be useful to find the smallest possible parquet file to load from the set in order to
* discern schema information
*/
/// Return the smallest file from the given set of files.
///
/// This can be useful to find the smallest possible parquet file to load from the set in order to
/// discern schema information
fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
if files.is_empty() {
return None;
@ -349,6 +338,32 @@ fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
Some(smallest)
}
/// Retrieve and return the schema of the given [ObjectMeta] parquet file based on
/// its metadata
pub async fn fetch_parquet_schema(
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
) -> DeltaResult<ArrowSchema> {
let metadata = load_parquet_metadata(store.clone(), file).await?;
let schema = metadata.file_metadata().schema_descr();
Ok(deltalake::parquet::arrow::parquet_to_arrow_schema(
schema, None,
)?)
}
/// Retrieve a the metadata for a given parquet file referenced by [ObjectMeta]
async fn load_parquet_metadata(
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
) -> DeltaResult<Arc<ParquetMetaData>> {
let reader = ParquetObjectReader::new(store.clone(), file);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
let metadata = builder.metadata();
deltalake::parquet::schema::printer::print_parquet_metadata(&mut std::io::stdout(), metadata);
Ok(metadata.clone())
}
#[cfg(test)]
mod tests {
use super::*;
@ -368,6 +383,33 @@ mod tests {
use super::*;
/// Return a simple test table for based on the fixtures
pub(crate) async fn test_table() -> DeltaTable {
let (_tempdir, store) =
create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
create_table_with(&files, store.clone())
.await
.expect("Failed to create table")
}
pub(crate) fn paths_to_objectmetas(slice: Vec<Path>) -> Vec<ObjectMeta> {
slice
.into_iter()
.map(|location| ObjectMeta {
location,
last_modified: Utc::now(),
size: 1,
e_tag: None,
})
.collect()
}
pub(crate) fn assert_unordered_eq<T>(left: &[T], right: &[T])
where
T: Eq + Hash + std::fmt::Debug,
@ -379,9 +421,7 @@ mod tests {
}
#[allow(dead_code)]
/**
* Helper function to use when debugging to list temp and other directories recursively
*/
/// Helper function to use when debugging to list temp and other directories recursively
fn list_directory(path: &std::path::Path) {
if path.is_dir() {
for entry in std::fs::read_dir(path).unwrap() {
@ -669,81 +709,6 @@ mod tests {
assert_eq!(files.len(), 2, "No files discovered");
}
/*
* Ensure that the append_to_table() function does not add redundant files already added to the
* Delta Table
*
* <https://github.com/buoyant-data/oxbow/issues/3>
*/
#[tokio::test]
async fn test_avoiding_adding_duplicate_files() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(
table.get_files().len(),
4,
"Did not find the right number of tables"
);
append_to_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
}
/*
* Discovered an issue where append_to_table can create a new empty commit if no files are
* provided. That's silly!
*/
#[tokio::test]
async fn test_avoid_appending_empty_list() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
// Start with an empty set
let files = vec![];
append_to_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
assert_eq!(
0,
table.version(),
"Unexpected version, should not have created a commit"
);
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
}
/*
* There are some cases where data will be laid out in a hive partition scheme but the parquet
* files may not contain the partitioning information. When using EXPORT DATA from BigQuery it
@ -775,40 +740,33 @@ mod tests {
);
}
/// This test is mostly to validate an approach for reading and loading schema
/// from a parquet file into an Arrow schema for evolution
#[tokio::test]
async fn test_remove_from_table() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
async fn test_load_parquet_metadata() -> DeltaResult<()> {
let location = Path::from(
"c2=foo1/part-00001-1c702e73-89b5-465a-9c6a-25f7559cd150.c000.snappy.parquet",
);
assert_eq!(0, table.version(), "Unexpected version");
let url = Url::from_file_path(std::fs::canonicalize(
"../../tests/data/hive/deltatbl-partitioned",
)?)
.expect("Failed to parse");
let storage = object_store_for(&url, None);
let meta = storage.head(&location).await.unwrap();
remove_from_table(&files, &mut table)
let schema = fetch_parquet_schema(storage.clone(), meta)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
.expect("Failed to load parquet file schema");
assert_eq!(
schema.fields.len(),
1,
table.version(),
"Unexpected version, should have created a commit"
"Expected only to find one field on the Parquet file's schema"
);
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
Ok(())
}
#[tokio::test]
async fn test_remove_unknown_from_table() {
async fn test_actions_for() -> DeltaResult<()> {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
@ -817,58 +775,136 @@ mod tests {
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
// Only creating the table with some of the files
let mut table = create_table_with(&files[0..1].to_vec(), store.clone())
// Creating the table with one of the discovered files, so the remaining three should be
// added later
let table = create_table_with(&vec![files[0].clone()], store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
// Removing _all_ the files we know about, this should not error
remove_from_table(&files, &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
let mods = TableMods {
adds: files,
..Default::default()
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
assert_eq!(
1,
table.version(),
"Unexpected version, should have created a commit"
actions.len(),
3,
"Expected an add action for every new file discovered"
);
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
Ok(())
}
#[tokio::test]
async fn test_remove_empty_set() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
async fn test_actions_for_with_redundant_files() {
let table = util::test_table().await;
let files = util::paths_to_objectmetas(table.get_files());
let mods = TableMods {
adds: files,
..Default::default()
};
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let mut table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
assert!(
schema.get_field_with_name("c2").is_ok(),
"The schema does not include the expected partition key `c2`"
);
assert_eq!(0, table.version(), "Unexpected version");
remove_from_table(&[], &mut table)
.await
.expect("Failed to append files");
table.load().await.expect("Failed to reload the table");
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
assert_eq!(
actions.len(),
0,
table.version(),
"Unexpected version, should not have created a commit"
"Expected no add actions for redundant files"
);
}
#[tokio::test]
async fn test_actions_for_with_removes() -> DeltaResult<()> {
let table = util::test_table().await;
let files = util::paths_to_objectmetas(table.get_files());
let mods = TableMods {
adds: vec![],
removes: files,
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
"Expected an add action for every new file discovered"
);
Ok(())
}
#[tokio::test]
async fn test_commit_with_no_actions() {
let mut table = util::test_table().await;
let initial_version = table.version();
let result = commit_to_table(&vec![], &mut table).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), initial_version);
}
#[tokio::test]
async fn test_commit_with_remove_actions() {
let mut table = util::test_table().await;
let initial_version = table.version();
let files = util::paths_to_objectmetas(table.get_files());
let mods = TableMods {
adds: vec![],
removes: files,
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
"Expected an add action for every new file discovered"
);
let result = commit_to_table(&actions, &mut table).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
initial_version + 1,
"Should have incremented the version"
);
}
#[tokio::test]
async fn test_commit_with_all_actions() {
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
// Creating the table with one of the discovered files, so the remaining three should be
// added later
let mut table = create_table_with(&vec![files[0].clone()], store.clone())
.await
.expect("Failed to create table");
let initial_version = table.version();
let mods = TableMods {
adds: files.clone(),
removes: vec![files[0].clone()],
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
"Expected an 3 add actions and 1 remove action"
);
let result = commit_to_table(&actions, &mut table).await;
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
initial_version + 1,
"Should have incremented the version"
);
table.load().await.expect("Failed to reload table");
assert_eq!(
table.get_files().len(),
3,
"Expected to only find three files on the table at this state"
);
}
}

View File

@ -27,5 +27,8 @@ toc::[]
| _null_
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
| `SCHEMA_EVOLUTION`
| _null_
| When set to any value this enables schema evolution by the Oxbow Lambda which will cause it to retrieve metadata for the last `.parquet` it reteives in an event notification and add new columns if necessary
|===

View File

@ -1,7 +1,8 @@
/*
* The lambda crate contains the Lambda specific implementation of oxbow.
*/
///
/// The oxbow lambda crate contains the Lambda-specific handling of the Oxbow lambda
///
/// While most of the key logic does exist in the oxbow and oxbow-lambda-shared crates, this
/// function glues that into the Lambda runtime
use aws_lambda_events::sqs::SqsEvent;
use deltalake::DeltaTableError;
use lambda_runtime::{service_fn, Error, LambdaEvent};
@ -20,8 +21,8 @@ async fn main() -> Result<(), anyhow::Error> {
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
info!("Starting oxbow");
info!("Starting the Lambda runtime");
info!("Starting oxbow");
let func = service_fn(func);
lambda_runtime::run(func)
.await
@ -31,6 +32,11 @@ async fn main() -> Result<(), anyhow::Error> {
async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
debug!("Receiving event: {:?}", event);
let can_evolve_schema: bool = std::env::var("SCHEMA_EVOLUTION").is_ok();
if can_evolve_schema {
info!("Schema evolution has been enabled based on the environment variable");
}
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event.payload)?,
Err(_) => s3_from_sqs(event.payload)?,
@ -59,7 +65,10 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
Ok(mut table) => {
info!("Opened table to append: {:?}", table);
match oxbow::append_to_table(table_mods.adds.as_slice(), &mut table).await {
let actions = oxbow::actions_for(table_mods, &table)
.expect("Failed to generate actions for the table modifications");
match oxbow::commit_to_table(&actions, &mut table).await {
Ok(version) => {
info!(
"Successfully appended version {} to table at {}",
@ -88,30 +97,6 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
return Err(Box::new(err));
}
}
if !table_mods.removes.is_empty() {
info!(
"{} Remove actions are expected in this operation",
table_mods.removes.len()
);
match oxbow::remove_from_table(table_mods.removes.as_slice(), &mut table).await
{
Ok(version) => {
info!(
"Successfully created version {} with remove actions",
version
);
}
Err(err) => {
error!(
"Failed to create removes on the table {}: {:?}",
location, err
);
let _ = oxbow::lock::release(lock, &lock_client).await;
return Err(Box::new(err));
}
}
}
}
Err(DeltaTableError::NotATable(_e)) => {
// create the table with our objects