Add an integration test for the creation of tables
This commit is contained in:
parent
6fb337f96a
commit
cadd6da179
|
@ -17,3 +17,6 @@ tokio = { version = "1", features = ["macros"] }
|
|||
serde_json = "1"
|
||||
serde_yaml = "0"
|
||||
url = "2"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "*"
|
||||
|
|
49
src/lib.rs
49
src/lib.rs
|
@ -1,18 +1,37 @@
|
|||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use deltalake::action::Protocol;
|
||||
use deltalake::schema::SchemaTypeStruct;
|
||||
use deltalake::{DeltaTableBuilder, DeltaTableMetaData};
|
||||
use log::*;
|
||||
use url::Url;
|
||||
|
||||
/*
|
||||
* Create a Delta Table with a schema
|
||||
*/
|
||||
pub async fn create_table_with_schema(
|
||||
output_url: Url,
|
||||
table_schema: SchemaTypeStruct,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
debug!("Attempting to create table in: {}", output_url.as_str());
|
||||
let mut table = DeltaTableBuilder::from_uri(output_url.as_str()).build()?;
|
||||
|
||||
let protocol = Protocol {
|
||||
min_reader_version: 1,
|
||||
min_writer_version: 1,
|
||||
};
|
||||
|
||||
let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());
|
||||
table.create(metadata, protocol, None, None).await.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
* Let's createa a Delta Table
|
||||
*/
|
||||
pub async fn create_table(output_url: Url, schema_path: PathBuf) -> Result<(), anyhow::Error> {
|
||||
use deltalake::action::Protocol;
|
||||
use deltalake::{DeltaTableBuilder, DeltaTableMetaData};
|
||||
use std::collections::HashMap;
|
||||
|
||||
debug!(
|
||||
"Creating table at {} with the schema from: {:?}",
|
||||
output_url, schema_path
|
||||
|
@ -26,22 +45,10 @@ pub async fn create_table(output_url: Url, schema_path: PathBuf) -> Result<(), a
|
|||
}
|
||||
|
||||
let reader = std::fs::File::open(schema_path.as_path())?;
|
||||
let table_schema: deltalake::schema::SchemaTypeStruct = serde_yaml::from_reader(reader)?;
|
||||
let table_schema: SchemaTypeStruct = serde_yaml::from_reader(reader)?;
|
||||
debug!("Read YAML in: {:?}", table_schema);
|
||||
|
||||
debug!("Attempting to create table in: {}", output_url.as_str());
|
||||
let mut table = DeltaTableBuilder::from_uri(output_url.as_str()).build()?;
|
||||
|
||||
let protocol = Protocol {
|
||||
min_reader_version: 1,
|
||||
min_writer_version: 1,
|
||||
};
|
||||
|
||||
let metadata = DeltaTableMetaData::new(None, None, None, table_schema, vec![], HashMap::new());
|
||||
|
||||
table.create(metadata, protocol, None, None).await.unwrap();
|
||||
|
||||
Ok(())
|
||||
create_table_with_schema(output_url, table_schema).await
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -106,7 +113,7 @@ pub fn yamlpath_to_table(root: &str, local_path: &Path) -> Result<PathBuf, anyho
|
|||
/*
|
||||
* This function takes a file path and returns a full file:// URL
|
||||
*/
|
||||
fn filepath_to_url(path: &PathBuf) -> Result<Url, anyhow::Error> {
|
||||
pub fn filepath_to_url(path: &PathBuf) -> Result<Url, anyhow::Error> {
|
||||
match Url::from_file_path(&path) {
|
||||
Ok(url) => Ok(url),
|
||||
Err(_) => {
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
use carto::*;
|
||||
use url::Url;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_table() -> Result<(), anyhow::Error> {
|
||||
let table_dir = tempfile::tempdir()?;
|
||||
let table_url = carto::filepath_to_url(&table_dir.into_path())?;
|
||||
|
||||
let yaml = r#"
|
||||
---
|
||||
type: struct
|
||||
fields:
|
||||
- name: timestamp
|
||||
type: timestamp
|
||||
nullable: false
|
||||
# Make this optional
|
||||
metadata:
|
||||
"#;
|
||||
|
||||
let schema = serde_yaml::from_str(yaml)?;
|
||||
let _table = carto::create_table_with_schema(table_url.clone(), schema).await?;
|
||||
let _loaded = deltalake::open_table(table_url).await?;
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue