On table creation modify the timestamp data type for simplicity's sake

The `deltalake` crate should likely be improved to avoid having issues
with Timestamps with millisecond precison since the protocol supports
them
(https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#instant-semantics-timestamps-normalized-to-utc)
but this unblocks behavior now. 🤔
This commit is contained in:
R Tyler Croy 2023-12-05 16:40:05 -08:00
parent 03caba50f8
commit c2d6f27b0c
3 changed files with 45 additions and 2 deletions

View File

@ -7,7 +7,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.8.0"
version = "0.8.1"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta"]
homepage = "https://github.com/buoyant-data/oxbow"

View File

@ -1,6 +1,7 @@
/*
* The lib module contains the business logic of oxbow, regardless of the interface implementation
*/
use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::parquet::arrow::async_reader::{
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
};
@ -137,7 +138,32 @@ pub async fn create_table_with(
.clone();
debug!("Read schema from Parquet file: {:?}", arrow_schema);
let schema: deltalake::schema::Schema = Schema::try_from(arrow_schema)
let mut conversions: Vec<Arc<deltalake::arrow::datatypes::Field>> = vec![];
for field in arrow_schema.fields().iter() {
match field.data_type() {
deltalake::arrow::datatypes::DataType::Timestamp(unit, tz) => match unit {
deltalake::arrow::datatypes::TimeUnit::Millisecond => {
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
let field = deltalake::arrow::datatypes::Field::new(
field.name(),
deltalake::arrow::datatypes::DataType::Timestamp(
deltalake::arrow::datatypes::TimeUnit::Microsecond,
tz.clone(),
),
field.is_nullable(),
);
conversions.push(Arc::new(field));
}
_ => conversions.push(field.clone()),
},
_ => conversions.push(field.clone()),
}
}
let arrow_schema = ArrowSchema::new_with_metadata(conversions, arrow_schema.metadata.clone());
let schema: deltalake::schema::Schema = Schema::try_from(&arrow_schema)
.expect("Failed to convert the schema for creating the table");
let mut columns = schema.get_fields().clone();
@ -537,6 +563,23 @@ mod tests {
assert!(result.is_err());
}
/*
* Attempt to create a table with a parquet file containing a timestamp column with millisecond
* precision
*/
#[tokio::test]
async fn create_table_with_millis_timestamp() {
let (_tempdir, store) = util::create_temp_path_with("../../tests/data/hive/faker_products");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
assert_eq!(files.len(), 1, "No files discovered");
let result = create_table_with(&files, store.clone()).await;
assert!(result.is_ok(), "Failed to create: {result:?}");
}
#[tokio::test]
async fn attempt_to_convert_without_auth() {
let region = std::env::var("AWS_REGION");

Binary file not shown.