Prevent duplicate column definitions showing up in the delta schema

In some scenarios Big Query can inline a partition column in output
parquet files and some deduplication needs to happen on columns before
the initial commit on the table gets created

Sponsored-by: Scribd, Inc.
This commit is contained in:
R Tyler Croy 2023-12-12 12:57:19 -08:00
parent bfac6c2dd0
commit 114c1b6b51
3 changed files with 42 additions and 7 deletions

View File

@ -167,14 +167,18 @@ pub async fn create_table_with(
.expect("Failed to convert the schema for creating the table");
let mut columns = schema.get_fields().clone();
for partition in &partitions {
let field = SchemaField::new(
partition.into(),
SchemaDataType::primitive("string".into()),
true,
HashMap::new(),
);
columns.push(field);
// Only add the partition if it does not already exist in the schema
if schema.get_field_with_name(partition).is_err() {
let field = SchemaField::new(
partition.into(),
SchemaDataType::primitive("string".into()),
true,
HashMap::new(),
);
columns.push(field);
}
}
/*
@ -684,4 +688,35 @@ mod tests {
);
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
}
/*
* There are some cases where data will be laid out in a hive partition scheme but the parquet
* files may not contain the partitioning information. When using EXPORT DATA from BigQuery it
* will automatically insert the hive-style partition information into the parquet schema
*/
#[tokio::test]
async fn test_avoid_duplicate_partition_columns() {
let (_tempdir, store) = util::create_temp_path_with("../../tests/data/hive/gcs-export");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 2, "No files discovered");
let table = create_table_with(&files, store.clone())
.await
.expect("Failed to create table");
let schema = table.get_schema().expect("Failed to get schema");
let fields: Vec<&str> = schema.get_fields().iter().map(|f| f.get_name()).collect();
let mut uniq = HashSet::new();
for field in &fields {
uniq.insert(field.clone());
}
assert_eq!(
uniq.len(),
fields.len(),
"There were not unique fields, that probably means a `ds` column is doubled up"
);
}
}