Demonstrate converting a Schema to an ArrowSchema and avoiding the pass of a writer to convert_to_batch

This commit is contained in:
R Tyler Croy 2023-01-04 21:06:52 -08:00
parent d00ca8354f
commit b03d1ad1cf
No known key found for this signature in database
GPG Key ID: E5C92681BEF6CEA2
1 changed files with 42 additions and 36 deletions

View File

@ -54,7 +54,7 @@ async fn main() -> Result<(), anyhow::Error> {
RecordBatchWriter::for_table(&table).expect("Failed to make RecordBatchWriter");
let records = fetch_readings();
let batch = convert_to_batch(&writer, &records);
let batch = convert_to_batch(&records);
writer.write(batch).await?;
@ -67,39 +67,6 @@ async fn main() -> Result<(), anyhow::Error> {
Ok(())
}
/*
* Pilfered from writer/test_utils.rs in delta-rs
*/
async fn create_initialized_table(table_path: &Path) -> DeltaTable {
let mut table = DeltaTableBuilder::from_uri(table_path.to_str().unwrap())
.build()
.unwrap();
let table_schema = WeatherRecord::schema();
let mut commit_info = serde_json::Map::<String, serde_json::Value>::new();
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("CREATE TABLE".to_string()),
);
commit_info.insert(
"userName".to_string(),
serde_json::Value::String("test user".to_string()),
);
let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 1,
};
let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());
table
.create(metadata, protocol, Some(commit_info), None)
.await
.unwrap();
table
}
// Creating a simple type alias for improved readability
type Fahrenheit = i32;
@ -193,7 +160,12 @@ fn fetch_readings() -> Vec<WeatherRecord> {
* transformation is quite tricky in Rust, whereas in Python or another loosely
* typed language it might be simpler.
*/
fn convert_to_batch(writer: &RecordBatchWriter, records: &Vec<WeatherRecord>) -> RecordBatch {
fn convert_to_batch(records: &Vec<WeatherRecord>) -> RecordBatch {
let schema = WeatherRecord::schema();
let arrow_schema = <deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(&schema)
.expect("Failed to convert schema tro ArrowSchema");
let arrow_schema_ref = Arc::new(arrow_schema);
let mut ts = vec![];
let mut temp = vec![];
let mut lat = vec![];
@ -213,7 +185,41 @@ fn convert_to_batch(writer: &RecordBatchWriter, records: &Vec<WeatherRecord>) ->
Arc::new(Float64Array::from(long)),
];
RecordBatch::try_new(writer.arrow_schema(), arrow_array).expect("Failed to create RecordBatch")
RecordBatch::try_new(arrow_schema_ref, arrow_array).expect("Failed to create RecordBatch")
}
/*
* Pilfered from writer/test_utils.rs in delta-rs. This code will basically create a new Delta
* Table in an existing directory that doesn't currently contain a Delta table
*/
async fn create_initialized_table(table_path: &Path) -> DeltaTable {
let mut table = DeltaTableBuilder::from_uri(table_path.to_str().unwrap())
.build()
.unwrap();
let table_schema = WeatherRecord::schema();
let mut commit_info = serde_json::Map::<String, serde_json::Value>::new();
commit_info.insert(
"operation".to_string(),
serde_json::Value::String("CREATE TABLE".to_string()),
);
commit_info.insert(
"userName".to_string(),
serde_json::Value::String("test user".to_string()),
);
let protocol = Protocol {
min_reader_version: 1,
min_writer_version: 1,
};
let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());
table
.create(metadata, protocol, Some(commit_info), None)
.await
.unwrap();
table
}
#[cfg(test)]