Implement rudimentary schema evolution based on parquet file schema discovery

This commit is contained in:
R Tyler Croy 2024-04-20 09:16:18 -07:00
parent 46a1c10835
commit 924cb6855b
2 changed files with 98 additions and 8 deletions

View File

@ -229,7 +229,11 @@ pub async fn commit_to_table(actions: &Vec<Action>, table: &mut DeltaTable) -> D
}
/// Generate the list of [Action]s based on the given [TableMods]
pub fn actions_for(mods: &TableMods, table: &DeltaTable) -> DeltaResult<Vec<Action>> {
pub async fn actions_for(
mods: &TableMods,
table: &DeltaTable,
evolve_schema: bool,
) -> DeltaResult<Vec<Action>> {
let existing_files = table.get_files();
let new_files: Vec<ObjectMeta> = mods
.adds
@ -240,8 +244,43 @@ pub fn actions_for(mods: &TableMods, table: &DeltaTable) -> DeltaResult<Vec<Acti
let adds = add_actions_for(&new_files);
let removes = remove_actions_for(&mods.removes);
let mut metadata = vec![];
Ok([adds, removes].concat())
if evolve_schema {
if let Some(last_file) = new_files.last() {
debug!("Attempting to evolve the schema for {table:?}");
let table_schema = table.get_schema()?;
// Cloning here to take an owned version of [DeltaTableMetaData] for later modification
let mut table_metadata = table.get_metadata()?.clone();
let file_schema =
fetch_parquet_schema(table.object_store().clone(), last_file.clone()).await?;
let mut new_schema: Vec<SchemaField> = table_schema.get_fields().clone();
for file_field in file_schema.fields() {
let name = file_field.name();
// If the table's schema doesn't have the field, add to our new schema
if table_schema.get_field_with_name(name).is_err() {
debug!("Found a new column `{name}` which will be added");
new_schema.push(SchemaField::new(
name.to_string(),
file_field.data_type().try_into()?,
true,
HashMap::default(),
));
}
}
if new_schema.len() > table_schema.get_fields().len() {
let new_schema = deltalake::schema::SchemaTypeStruct::new(new_schema);
table_metadata.schema = new_schema;
table_metadata.created_time = Some(chrono::Utc::now().timestamp_millis());
metadata.push(Action::metaData(table_metadata.try_into()?));
}
}
}
Ok([adds, removes, metadata].concat())
}
/// Take an iterator of files and determine what looks like a partition column from it
@ -786,7 +825,9 @@ mod tests {
..Default::default()
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
let actions = actions_for(&mods, &table, false)
.await
.expect("Failed to curate actions");
assert_eq!(
actions.len(),
3,
@ -805,7 +846,9 @@ mod tests {
..Default::default()
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
let actions = actions_for(&mods, &table, false)
.await
.expect("Failed to curate actions");
assert_eq!(
actions.len(),
0,
@ -822,7 +865,9 @@ mod tests {
removes: files,
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
let actions = actions_for(&mods, &table, false)
.await
.expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
@ -851,7 +896,9 @@ mod tests {
adds: vec![],
removes: files,
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
let actions = actions_for(&mods, &table, false)
.await
.expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
@ -886,7 +933,9 @@ mod tests {
adds: files.clone(),
removes: vec![files[0].clone()],
};
let actions = actions_for(&mods, &table).expect("Failed to curate actions");
let actions = actions_for(&mods, &table, false)
.await
.expect("Failed to curate actions");
assert_eq!(
actions.len(),
4,
@ -907,4 +956,45 @@ mod tests {
"Expected to only find three files on the table at this state"
);
}
#[tokio::test]
async fn test_schema_evolution() {
use deltalake::operations::create::CreateBuilder;
let (_tempdir, store) =
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let table = CreateBuilder::new()
.with_object_store(store.clone())
.with_column(
"party_time",
SchemaDataType::primitive("string".into()),
true,
None,
)
.with_save_mode(SaveMode::Ignore)
.await
.expect("Failed to create a test table");
let initial_version = table.version();
assert_eq!(initial_version, 0);
let adds = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(adds.len(), 4, "No files discovered");
let mods = TableMods {
adds,
..Default::default()
};
let actions = actions_for(&mods, &table, true)
.await
.expect("Failed to get actions");
assert_eq!(
actions.len(),
5,
"Expected 4 add actions and a metadata action"
);
}
}

View File

@ -65,7 +65,7 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
Ok(mut table) => {
info!("Opened table to append: {:?}", table);
let actions = oxbow::actions_for(table_mods, &table)
let actions = oxbow::actions_for(table_mods, &table, can_evolve_schema).await
.expect("Failed to generate actions for the table modifications");
match oxbow::commit_to_table(&actions, &mut table).await {