From b03d1ad1cf4ebb5a8a854a09f31b797ae64fc16c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 4 Jan 2023 21:06:52 -0800 Subject: [PATCH] Demonstrate converting a Schema to an ArrowSchema and avoiding the pass of a writer to convert_to_batch --- src/main.rs | 78 ++++++++++++++++++++++++++++------------------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index bcfcc6d..c3e61b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), anyhow::Error> { RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter"); let records = fetch_readings(); - let batch = convert_to_batch(&writer, &records); + let batch = convert_to_batch(&records); writer.write(batch).await?; @@ -67,39 +67,6 @@ async fn main() -> Result<(), anyhow::Error> { Ok(()) } -/* - * Pilfered from writer/test_utils.rs in delta-rs - */ -async fn create_initialized_table(table_path: &Path) -> DeltaTable { - let mut table = DeltaTableBuilder::from_uri(table_path.to_str().unwrap()) - .build() - .unwrap(); - let table_schema = WeatherRecord::schema(); - let mut commit_info = serde_json::Map::::new(); - commit_info.insert( - "operation".to_string(), - serde_json::Value::String("CREATE TABLE".to_string()), - ); - commit_info.insert( - "userName".to_string(), - serde_json::Value::String("test user".to_string()), - ); - - let protocol = Protocol { - min_reader_version: 1, - min_writer_version: 1, - }; - - let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new()); - - table - .create(metadata, protocol, Some(commit_info), None) - .await - .unwrap(); - - table -} - // Creating a simple type alias for improved readability type Fahrenheit = i32; @@ -193,7 +160,12 @@ fn fetch_readings() -> Vec { * transformation is quite tricky in Rust, whereas in Python or another loosely * typed language it might be simpler. */ -fn convert_to_batch(writer: &RecordBatchWriter, records: &Vec) -> RecordBatch { +fn convert_to_batch(records: &Vec) -> RecordBatch { + let schema = WeatherRecord::schema(); + let arrow_schema = >::try_from(&schema) + .expect("Failed to convert schema tro ArrowSchema"); + let arrow_schema_ref = Arc::new(arrow_schema); + let mut ts = vec![]; let mut temp = vec![]; let mut lat = vec![]; @@ -213,7 +185,41 @@ fn convert_to_batch(writer: &RecordBatchWriter, records: &Vec) -> Arc::new(Float64Array::from(long)), ]; - RecordBatch::try_new(writer.arrow_schema(), arrow_array).expect("Failed to create RecordBatch") + RecordBatch::try_new(arrow_schema_ref, arrow_array).expect("Failed to create RecordBatch") +} + +/* + * Pilfered from writer/test_utils.rs in delta-rs. This code will basically create a new Delta + * Table in an existing directory that doesn't currently contain a Delta table + */ +async fn create_initialized_table(table_path: &Path) -> DeltaTable { + let mut table = DeltaTableBuilder::from_uri(table_path.to_str().unwrap()) + .build() + .unwrap(); + let table_schema = WeatherRecord::schema(); + let mut commit_info = serde_json::Map::::new(); + commit_info.insert( + "operation".to_string(), + serde_json::Value::String("CREATE TABLE".to_string()), + ); + commit_info.insert( + "userName".to_string(), + serde_json::Value::String("test user".to_string()), + ); + + let protocol = Protocol { + min_reader_version: 1, + min_writer_version: 1, + }; + + let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new()); + + table + .create(metadata, protocol, Some(commit_info), None) + .await + .unwrap(); + + table } #[cfg(test)]