Handle odd timestamp types when doing schema evolution

This refactors the code for supporting goofy Airbyte-generated types
on both table create and schema evolution
This commit is contained in:
R Tyler Croy 2024-04-20 21:27:50 +00:00
parent 924cb6855b
commit a5c501665e
2 changed files with 126 additions and 54 deletions

View File

@ -160,24 +160,7 @@ pub async fn create_table_with(
let mut conversions: Vec<Arc<deltalake::arrow::datatypes::Field>> = vec![];
for field in arrow_schema.fields().iter() {
match field.data_type() {
deltalake::arrow::datatypes::DataType::Timestamp(unit, tz) => match unit {
deltalake::arrow::datatypes::TimeUnit::Millisecond => {
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
let field = deltalake::arrow::datatypes::Field::new(
field.name(),
deltalake::arrow::datatypes::DataType::Timestamp(
deltalake::arrow::datatypes::TimeUnit::Microsecond,
tz.clone(),
),
field.is_nullable(),
);
conversions.push(Arc::new(field));
}
_ => conversions.push(field.clone()),
},
_ => conversions.push(field.clone()),
}
conversions.push(coerce_field(field.clone()));
}
let arrow_schema = ArrowSchema::new_with_metadata(conversions, arrow_schema.metadata.clone());
@ -244,45 +227,62 @@ pub async fn actions_for(
let adds = add_actions_for(&new_files);
let removes = remove_actions_for(&mods.removes);
let mut metadata = vec![];
if evolve_schema {
if let Some(last_file) = new_files.last() {
debug!("Attempting to evolve the schema for {table:?}");
let table_schema = table.get_schema()?;
// Cloning here to take an owned version of [DeltaTableMetaData] for later modification
let mut table_metadata = table.get_metadata()?.clone();
let file_schema =
fetch_parquet_schema(table.object_store().clone(), last_file.clone()).await?;
let mut new_schema: Vec<SchemaField> = table_schema.get_fields().clone();
for file_field in file_schema.fields() {
let name = file_field.name();
// If the table's schema doesn't have the field, add to our new schema
if table_schema.get_field_with_name(name).is_err() {
debug!("Found a new column `{name}` which will be added");
new_schema.push(SchemaField::new(
name.to_string(),
file_field.data_type().try_into()?,
true,
HashMap::default(),
));
}
let metadata = match evolve_schema {
true => match metadata_actions_for(&new_files, &table).await {
Ok(axns) => axns,
Err(err) => {
error!("Attempted schema evolution but received an unhandled error!: {err:?}");
vec![]
}
if new_schema.len() > table_schema.get_fields().len() {
let new_schema = deltalake::schema::SchemaTypeStruct::new(new_schema);
table_metadata.schema = new_schema;
table_metadata.created_time = Some(chrono::Utc::now().timestamp_millis());
metadata.push(Action::metaData(table_metadata.try_into()?));
}
}
}
},
false => vec![],
};
Ok([adds, removes, metadata].concat())
}
/// Return the new metadata actions if the files given can result in a schema migration
///
/// If nothing should happen then this will end up returning an empty set
async fn metadata_actions_for(
files: &[ObjectMeta],
table: &DeltaTable,
) -> DeltaResult<Vec<Action>> {
if let Some(last_file) = files.last() {
debug!("Attempting to evolve the schema for {table:?}");
let table_schema = table.get_schema()?;
// Cloning here to take an owned version of [DeltaTableMetaData] for later modification
let mut table_metadata = table.get_metadata()?.clone();
let file_schema =
fetch_parquet_schema(table.object_store().clone(), last_file.clone()).await?;
let mut new_schema: Vec<SchemaField> = table_schema.get_fields().clone();
for file_field in file_schema.fields() {
let name = file_field.name();
// If the table's schema doesn't have the field, add to our new schema
if table_schema.get_field_with_name(name).is_err() {
debug!("Found a new column `{name}` which will be added");
new_schema.push(SchemaField::new(
name.to_string(),
// These types can have timestmaps in them, so coerce them properly
coerce_field(file_field.clone()).data_type().try_into()?,
true,
HashMap::default(),
));
}
}
if new_schema.len() > table_schema.get_fields().len() {
let new_schema = deltalake::schema::SchemaTypeStruct::new(new_schema);
table_metadata.schema = new_schema;
table_metadata.created_time = Some(chrono::Utc::now().timestamp_millis());
return Ok(vec![Action::metaData(table_metadata.try_into()?)]);
}
}
Ok(vec![])
}
/// Take an iterator of files and determine what looks like a partition column from it
fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
// The HashSet is only to prevent collecting redundant partitions
@ -403,6 +403,30 @@ async fn load_parquet_metadata(
Ok(metadata.clone())
}
fn coerce_field(
field: deltalake::arrow::datatypes::FieldRef,
) -> deltalake::arrow::datatypes::FieldRef {
match field.data_type() {
deltalake::arrow::datatypes::DataType::Timestamp(unit, tz) => match unit {
deltalake::arrow::datatypes::TimeUnit::Millisecond => {
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
let field = deltalake::arrow::datatypes::Field::new(
field.name(),
deltalake::arrow::datatypes::DataType::Timestamp(
deltalake::arrow::datatypes::TimeUnit::Microsecond,
tz.clone(),
),
field.is_nullable(),
);
return Arc::new(field);
}
_ => {}
},
_ => {}
};
return field.clone();
}
#[cfg(test)]
mod tests {
use super::*;
@ -423,6 +447,8 @@ mod tests {
use super::*;
/// Return a simple test table for based on the fixtures
///
/// The table that is returned cannot be reloaded since the tempdir is dropped
pub(crate) async fn test_table() -> DeltaTable {
let (_tempdir, store) =
create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
@ -461,7 +487,7 @@ mod tests {
#[allow(dead_code)]
/// Helper function to use when debugging to list temp and other directories recursively
fn list_directory(path: &std::path::Path) {
pub(crate) fn list_directory(path: &std::path::Path) {
if path.is_dir() {
for entry in std::fs::read_dir(path).unwrap() {
let entry = entry.unwrap();
@ -997,4 +1023,49 @@ mod tests {
"Expected 4 add actions and a metadata action"
);
}
#[tokio::test]
async fn test_schema_evolution_with_timestamps() {
use fs_extra::{copy_items, dir::CopyOptions};
let (tempdir, _store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
// The store that comes back is not properly prefixed to the delta table that this test
// needs to work with
let table_url = Url::from_file_path(tempdir.path().join("deltatbl-partitioned"))
.expect("Failed to parse local path");
let store = object_store_for(&table_url, None);
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 4, "No files discovered");
let table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let options = CopyOptions::new();
// This file has some _alternative_ timestamps and can be used for testing :smile:
let new_file =
std::fs::canonicalize("../../tests/data/hive/faker_products/1701800282197_0.parquet")
.expect("Failed to canonicalize");
let _ =
copy_items(&[new_file], store.root_uri(), &options).expect("Failed to copy items over");
let files = vec![ObjectMeta {
location: Path::from("1701800282197_0.parquet"),
e_tag: None,
size: 11351,
last_modified: Utc::now(),
}];
let metadata = metadata_actions_for(&files, &table)
.await
.expect("Failed to generate metadata actions");
assert_eq!(
metadata.len(),
1,
"Execpted a scheam evolution metadata action"
);
}
}

View File

@ -65,7 +65,8 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
Ok(mut table) => {
info!("Opened table to append: {:?}", table);
let actions = oxbow::actions_for(table_mods, &table, can_evolve_schema).await
let actions = oxbow::actions_for(table_mods, &table, can_evolve_schema)
.await
.expect("Failed to generate actions for the table modifications");
match oxbow::commit_to_table(&actions, &mut table).await {