On second thought pass through the DeltaTable and use that as truth for the table schema

This does not yet handle schema evolution
This commit is contained in:
R Tyler Croy 2023-01-04 21:09:46 -08:00
parent b03d1ad1cf
commit af9540198f
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
1 changed files with 9 additions and 5 deletions

View File

@ -54,7 +54,7 @@ async fn main() -> Result<(), anyhow::Error> {
RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter");
let records = fetch_readings();
let batch = convert_to_batch(&records);
let batch = convert_to_batch(&table, &records);
writer.write(batch).await?;
@ -160,10 +160,14 @@ fn fetch_readings() -> Vec<WeatherRecord> {
* transformation is quite tricky in Rust, whereas in Python or another loosely
* typed language it might be simpler.
*/
fn convert_to_batch(records: &Vec<WeatherRecord>) -> RecordBatch {
let schema = WeatherRecord::schema();
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(&schema)
.expect("Failed to convert schema tro ArrowSchema");
fn convert_to_batch(table: &DeltaTable, records: &Vec<WeatherRecord>) -> RecordBatch {
let metadata = table
.get_metadata()
.expect("Failed to get metadata for the table");
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(
&metadata.schema.clone(),
)
.expect("Failed to convert to arrow schema");
let arrow_schema_ref = Arc::new(arrow_schema);
let mut ts = vec![];