From 114c1b6b511db6b5cb7fa72ab86db6f76e181f9a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 12 Dec 2023 12:57:19 -0800 Subject: [PATCH] Prevent duplicate column definitions showing up in the delta schema In some scenarios Big Query can inline a partition column in output parquet files and some deduplication needs to happen on columns before the initial commit on the table gets created Sponsored-by: Scribd, Inc. --- crates/oxbow/src/lib.rs | 49 +++++++++++++++--- ...tioned2_ds=2023-12-12_000000000000.parquet | Bin 0 -> 924 bytes ...tioned2_ds=2023-12-12_000000000001.parquet | Bin 0 -> 945 bytes 3 files changed, 42 insertions(+), 7 deletions(-) create mode 100644 tests/data/hive/gcs-export/ds=2023-12-12/testing_oxbow-partitioned2_ds=2023-12-12_000000000000.parquet create mode 100644 tests/data/hive/gcs-export/ds=2023-12-12/testing_oxbow-partitioned2_ds=2023-12-12_000000000001.parquet diff --git a/crates/oxbow/src/lib.rs b/crates/oxbow/src/lib.rs index 9829023..60d458e 100644 --- a/crates/oxbow/src/lib.rs +++ b/crates/oxbow/src/lib.rs @@ -167,14 +167,18 @@ pub async fn create_table_with( .expect("Failed to convert the schema for creating the table"); let mut columns = schema.get_fields().clone(); + for partition in &partitions { - let field = SchemaField::new( - partition.into(), - SchemaDataType::primitive("string".into()), - true, - HashMap::new(), - ); - columns.push(field); + // Only add the partition if it does not already exist in the schema + if schema.get_field_with_name(partition).is_err() { + let field = SchemaField::new( + partition.into(), + SchemaDataType::primitive("string".into()), + true, + HashMap::new(), + ); + columns.push(field); + } } /* @@ -684,4 +688,35 @@ mod tests { ); assert_eq!(table.get_files().len(), 4, "Found redundant files!"); } + + /* + * There are some cases where data will be laid out in a hive partition scheme but the parquet + * files may not contain the partitioning information. When using EXPORT DATA from BigQuery it + * will automatically insert the hive-style partition information into the parquet schema + */ + #[tokio::test] + async fn test_avoid_duplicate_partition_columns() { + let (_tempdir, store) = util::create_temp_path_with("../../tests/data/hive/gcs-export"); + + let files = discover_parquet_files(store.clone()) + .await + .expect("Failed to discover parquet files"); + assert_eq!(files.len(), 2, "No files discovered"); + + let table = create_table_with(&files, store.clone()) + .await + .expect("Failed to create table"); + let schema = table.get_schema().expect("Failed to get schema"); + let fields: Vec<&str> = schema.get_fields().iter().map(|f| f.get_name()).collect(); + + let mut uniq = HashSet::new(); + for field in &fields { + uniq.insert(field.clone()); + } + assert_eq!( + uniq.len(), + fields.len(), + "There were not unique fields, that probably means a `ds` column is doubled up" + ); + } } diff --git a/tests/data/hive/gcs-export/ds=2023-12-12/testing_oxbow-partitioned2_ds=2023-12-12_000000000000.parquet b/tests/data/hive/gcs-export/ds=2023-12-12/testing_oxbow-partitioned2_ds=2023-12-12_000000000000.parquet new file mode 100644 index 0000000000000000000000000000000000000000..942104ae2807d3246ef2ff43e112bc258a6237f7 GIT binary patch literal 924 zcmchW!A`hf%pa+Y@&(Kx-EM30eB3LJ^*Lq zv-l96oI+rMR8PcanwjqG&V2o+o!s2^(LsrFj2xXNfDZtJcyQV656BR(iOo)#Ll<3n z4Q=0M!@Zb}@$zPjgL-EY>!RhjBII{DLcoJJMy5u}`>TfNP#*_5P~Xx)>5kxXJuh@* z(p*=;*=*?B_yF$fYzOB(rc+%!r1L$*Z>~Q08q%IWs8tEsz{XZVYDQN)I>rmVZN^3c zv@2F=pVjiymFRD@JTH=2mKa?uOtL0kv0%m2s-~7TX^YzEm+}>Fz{U6qLN}k@Co_#M zV7uP5Ds=!I00zCmu+tyVLcluK+ldWT zRK+E<^vr4=?dda~yo_;>YbLN_H5Jt|`AvjiaN~v1l4Gg;6vJv$9tAm2-b6qm8rVqT zs-%gWg?$@33wIW|egR)K=V0cKIm*w{fl|BoT_>LTiyiBR85<^_OY)5;wD}OH5eu1X zSWB5HB|WY2DXOWZmTYH0t3XSBS!q91f%?9*XR$XAywHi4p||7SGGoi+ds)7gMHV#M zeG|Um54bhC08@?JJ8$lw3Rr3Kq?JvW2*TMN4>e1bI{6$kQQIwHjec<&