Compare commits
4 Commits
795f095094
...
f922e93d30
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | f922e93d30 | |
R Tyler Croy | a5c501665e | |
R Tyler Croy | 924cb6855b | |
R Tyler Croy | 46a1c10835 |
|
@ -26,8 +26,7 @@ jobs:
|
|||
repo: cargo-lambda/cargo-lambda
|
||||
platform: linux # Other valid options: 'windows' or 'darwin'
|
||||
arch: x86_64 # Other valid options for linux: 'aarch64'
|
||||
# Add your build steps below
|
||||
- name: Build
|
||||
run: cargo lambda build
|
||||
run: ./ci/build.sh
|
||||
- name: Run tests
|
||||
run: cargo test --verbose
|
||||
run: ./ci/test.sh
|
||||
|
|
|
@ -7,7 +7,7 @@ members = [
|
|||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.11.2"
|
||||
version = "0.12.0"
|
||||
edition = "2021"
|
||||
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
|
||||
homepage = "https://github.com/buoyant-data/oxbow"
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* This Jenkinsfile is for internal use
|
||||
*/
|
||||
|
||||
pipeline {
|
||||
agent {
|
||||
label 'rust'
|
||||
}
|
||||
|
||||
stages {
|
||||
stage('Checkout') {
|
||||
steps {
|
||||
checkout scm
|
||||
}
|
||||
}
|
||||
stage('Prepare') {
|
||||
steps {
|
||||
sh './ci/setup.sh'
|
||||
}
|
||||
}
|
||||
stage('Build') {
|
||||
steps {
|
||||
sh './ci/build.sh'
|
||||
}
|
||||
}
|
||||
stage('Test') {
|
||||
steps {
|
||||
sh './ci/test.sh'
|
||||
}
|
||||
}
|
||||
stage('Release') {
|
||||
steps {
|
||||
sh './ci/build-release.sh'
|
||||
archiveArtifacts artifacts: 'target/lambda/**/*.zip', fingerprint: true, onlyIfSuccessful: true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// vim: ft=groovy sw=2 ts=2 et
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/sh
|
||||
|
||||
|
||||
cargo lambda build --release --output-format zip
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
cargo lambda build
|
|
@ -0,0 +1,16 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
which cargo-lambda
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
cargo install cargo-lambda
|
||||
fi;
|
||||
|
||||
which virtualenv
|
||||
|
||||
if [ $? -ne 0 ]; then
|
||||
echo ">> Virtualenv is required in order to setup cargo-lambda here!"
|
||||
exit 1;
|
||||
fi;
|
||||
|
||||
virtualenv venv
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
cargo test --verbose
|
|
@ -17,3 +17,5 @@ tracing = { workspace = true }
|
|||
serde = { version = "=1", features = ["rc"] }
|
||||
serde_json = "=1"
|
||||
urlencoding = "=2"
|
||||
|
||||
oxbow = { path = "../oxbow" }
|
||||
|
|
|
@ -8,14 +8,14 @@ use chrono::prelude::*;
|
|||
use deltalake::{DeltaResult, ObjectMeta, Path};
|
||||
use tracing::log::*;
|
||||
|
||||
use oxbow::TableMods;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/**
|
||||
* Return wholly new [`S3EventRecord`] objects with their the [`S3Object`] `url_decoded_key`
|
||||
* properly filled in
|
||||
*
|
||||
* For whatever reason `aws_lambda_events` does not properly handle this
|
||||
*/
|
||||
///
|
||||
/// Return wholly new [`S3EventRecord`] objects with their the [`S3Object`] `url_decoded_key`
|
||||
/// properly filled in
|
||||
///
|
||||
/// For whatever reason `aws_lambda_events` does not properly handle this
|
||||
pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRecord> {
|
||||
use urlencoding::decode;
|
||||
|
||||
|
@ -37,22 +37,12 @@ pub fn records_with_url_decoded_keys(records: &[S3EventRecord]) -> Vec<S3EventRe
|
|||
.collect()
|
||||
}
|
||||
|
||||
/// Struct to keep track of the table modifications needing to be made based on
|
||||
/// [S3EventRecord] objects .
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TableMods {
|
||||
pub adds: Vec<ObjectMeta>,
|
||||
pub removes: Vec<ObjectMeta>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Group the objects from the notification based on the delta tables they should be added to.
|
||||
*
|
||||
* There's a possibility that an S3 bucket notification will have objects mixed in which should be
|
||||
* destined for different delta tables. Rather than re-opening/loading the table for each object as
|
||||
* we iterate the records, we can group them based on the delta table and then create the
|
||||
* appropriate transactions
|
||||
*/
|
||||
/// Group the objects from the notification based on the delta tables they should be added to.
|
||||
///
|
||||
/// There's a possibility that an S3 bucket notification will have objects mixed in which should be
|
||||
/// destined for different delta tables. Rather than re-opening/loading the table for each object as
|
||||
/// we iterate the records, we can group them based on the delta table and then create the
|
||||
/// appropriate transactions
|
||||
pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, TableMods> {
|
||||
let mut mods = HashMap::new();
|
||||
|
||||
|
@ -81,13 +71,11 @@ pub fn objects_by_table(records: &[S3EventRecord]) -> HashMap<String, TableMods>
|
|||
mods
|
||||
}
|
||||
|
||||
/**
|
||||
* Infer the log path from the given object path.
|
||||
*
|
||||
* The location of `_delta_log/` can technically be _anywhere_ but for convention's
|
||||
* sake oxbow will attempt to put the `_delta_log/` some place predictable to ensure that
|
||||
* `add` actions in the log can use relative file paths for newly added objects
|
||||
*/
|
||||
/// Infer the log path from the given object path.
|
||||
///
|
||||
/// The location of `_delta_log/` can technically be _anywhere_ but for convention's
|
||||
/// sake oxbow will attempt to put the `_delta_log/` some place predictable to ensure that
|
||||
/// `add` actions in the log can use relative file paths for newly added objects
|
||||
pub fn infer_log_path_from(path: &str) -> String {
|
||||
use std::path::{Component, Path};
|
||||
|
||||
|
@ -189,12 +177,11 @@ pub fn s3_from_sns(event: SqsEvent) -> DeltaResult<Vec<S3EventRecord>> {
|
|||
Ok(records)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert an [`S3Object`] into an [`ObjectMeta`] for use in the creation of Delta transactions
|
||||
*
|
||||
* This is a _lossy_ conversion since the two structs do not share the same set of information,
|
||||
* therefore this conversion is really only taking the path of the object and the size
|
||||
*/
|
||||
///
|
||||
/// Convert an [`S3Object`] into an [`ObjectMeta`] for use in the creation of Delta transactions
|
||||
///
|
||||
/// This is a _lossy_ conversion since the two structs do not share the same set of information,
|
||||
/// therefore this conversion is really only taking the path of the object and the size
|
||||
fn into_object_meta(s3object: &S3Object, prune_prefix: Option<&str>) -> ObjectMeta {
|
||||
let location = s3object.url_decoded_key.clone().unwrap_or("".to_string());
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ repository.workspace = true
|
|||
homepage.workspace = true
|
||||
|
||||
[dependencies]
|
||||
chrono = { workpace = true }
|
||||
chrono = { workspace = true }
|
||||
deltalake = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
@ -17,7 +17,6 @@ rusoto_core = { version = "0.47", default-features = false, features = ["rustls"
|
|||
rusoto_credential = { version = "0.47"}
|
||||
|
||||
[dev-dependencies]
|
||||
chrono = "0.4.31"
|
||||
fs_extra = "=1"
|
||||
tempfile = "*"
|
||||
tokio = { workspace = true }
|
||||
|
|
|
@ -5,6 +5,7 @@ use deltalake::arrow::datatypes::Schema as ArrowSchema;
|
|||
use deltalake::parquet::arrow::async_reader::{
|
||||
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
|
||||
};
|
||||
use deltalake::parquet::file::metadata::ParquetMetaData;
|
||||
use deltalake::partitions::DeltaTablePartition;
|
||||
use deltalake::protocol::*;
|
||||
use deltalake::storage::DeltaObjectStore;
|
||||
|
@ -19,10 +20,19 @@ use std::sync::Arc;
|
|||
pub mod lock;
|
||||
pub mod write;
|
||||
|
||||
/**
|
||||
* convert is the main function to be called by the CLI or other "one shot" executors which just
|
||||
* need to take a given location and convert it all at once
|
||||
*/
|
||||
/// TableMods helps to keep track of files which should be added or removed in a given
|
||||
/// invocation of Oxbow-based utilities.
|
||||
///
|
||||
/// Typically this struct is populated by S3 events which would indicate a file(s) has been added
|
||||
/// or removed.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct TableMods {
|
||||
pub adds: Vec<ObjectMeta>,
|
||||
pub removes: Vec<ObjectMeta>,
|
||||
}
|
||||
|
||||
/// convert is the main function to be called by the CLI or other "one shot" executors which just
|
||||
/// need to take a given location and convert it all at once
|
||||
pub async fn convert(
|
||||
location: &str,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
|
@ -63,9 +73,9 @@ pub async fn convert(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the ObjectStore for the given location
|
||||
*/
|
||||
///
|
||||
/// Create the [ObjectStore] for the given location
|
||||
///
|
||||
pub fn object_store_for(
|
||||
location: &Url,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
|
@ -77,9 +87,13 @@ pub fn object_store_for(
|
|||
Arc::new(DeltaObjectStore::try_new(location.clone(), options).expect("Failed to make store"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover `.parquet` files which are present in the location
|
||||
*/
|
||||
///
|
||||
/// Discover `.parquet` files which are present in the location
|
||||
///
|
||||
/// **NOTE**: This will not _stop_ at any particular point and can list an entire
|
||||
/// prefix. While it will not recursively list, it should only be used in cases
|
||||
/// where the potential number of parquet files is quite small.
|
||||
///
|
||||
pub async fn discover_parquet_files(
|
||||
store: Arc<DeltaObjectStore>,
|
||||
) -> deltalake::DeltaResult<Vec<ObjectMeta>> {
|
||||
|
@ -100,6 +114,8 @@ pub async fn discover_parquet_files(
|
|||
if !filename.ends_with(".checkpoint.parquet") {
|
||||
debug!("Discovered file: {:?}", meta);
|
||||
result.push(meta);
|
||||
} else {
|
||||
warn!("Was asked to discover parquet files on what appears to already be a table, and found checkpoint files: {filename}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -111,11 +127,11 @@ pub async fn discover_parquet_files(
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Delta table with the given series of files at the specified location
|
||||
*/
|
||||
///
|
||||
/// Create a Delta table with a given series of files at the specified location
|
||||
///
|
||||
pub async fn create_table_with(
|
||||
files: &Vec<ObjectMeta>,
|
||||
files: &[ObjectMeta],
|
||||
store: Arc<DeltaObjectStore>,
|
||||
) -> DeltaResult<DeltaTable> {
|
||||
use deltalake::operations::create::CreateBuilder;
|
||||
|
@ -144,24 +160,7 @@ pub async fn create_table_with(
|
|||
let mut conversions: Vec<Arc<deltalake::arrow::datatypes::Field>> = vec![];
|
||||
|
||||
for field in arrow_schema.fields().iter() {
|
||||
match field.data_type() {
|
||||
deltalake::arrow::datatypes::DataType::Timestamp(unit, tz) => match unit {
|
||||
deltalake::arrow::datatypes::TimeUnit::Millisecond => {
|
||||
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
|
||||
let field = deltalake::arrow::datatypes::Field::new(
|
||||
field.name(),
|
||||
deltalake::arrow::datatypes::DataType::Timestamp(
|
||||
deltalake::arrow::datatypes::TimeUnit::Microsecond,
|
||||
tz.clone(),
|
||||
),
|
||||
field.is_nullable(),
|
||||
);
|
||||
conversions.push(Arc::new(field));
|
||||
}
|
||||
_ => conversions.push(field.clone()),
|
||||
},
|
||||
_ => conversions.push(field.clone()),
|
||||
}
|
||||
conversions.push(coerce_field(field.clone()));
|
||||
}
|
||||
|
||||
let arrow_schema = ArrowSchema::new_with_metadata(conversions, arrow_schema.metadata.clone());
|
||||
|
@ -197,59 +196,94 @@ pub async fn create_table_with(
|
|||
.await
|
||||
}
|
||||
|
||||
/**
|
||||
* Append the given files to an already existing and initialized Delta Table
|
||||
*/
|
||||
pub async fn append_to_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
|
||||
/// Commit the given [Action]s to the [DeltaTable]
|
||||
pub async fn commit_to_table(actions: &Vec<Action>, table: &mut DeltaTable) -> DeltaResult<i64> {
|
||||
if actions.is_empty() {
|
||||
return Ok(table.version());
|
||||
}
|
||||
deltalake::operations::transaction::commit(
|
||||
table.object_store().as_ref(),
|
||||
actions,
|
||||
DeltaOperation::Update { predicate: None },
|
||||
table.get_state(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Generate the list of [Action]s based on the given [TableMods]
|
||||
pub async fn actions_for(
|
||||
mods: &TableMods,
|
||||
table: &DeltaTable,
|
||||
evolve_schema: bool,
|
||||
) -> DeltaResult<Vec<Action>> {
|
||||
let existing_files = table.get_files();
|
||||
let new_files: Vec<ObjectMeta> = files
|
||||
let new_files: Vec<ObjectMeta> = mods
|
||||
.adds
|
||||
.iter()
|
||||
.filter(|f| !existing_files.contains(&f.location))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if new_files.is_empty() {
|
||||
debug!("No new files to add on {table:?}, skipping a commit");
|
||||
return Ok(table.version());
|
||||
}
|
||||
|
||||
let actions = add_actions_for(&new_files);
|
||||
|
||||
deltalake::operations::transaction::commit(
|
||||
table.object_store().as_ref(),
|
||||
&actions,
|
||||
DeltaOperation::Write {
|
||||
mode: SaveMode::Append,
|
||||
partition_by: Some(partition_columns_from(files)),
|
||||
predicate: None,
|
||||
let adds = add_actions_for(&new_files);
|
||||
let removes = remove_actions_for(&mods.removes);
|
||||
let metadata = match evolve_schema {
|
||||
true => match metadata_actions_for(&new_files, &table).await {
|
||||
Ok(axns) => axns,
|
||||
Err(err) => {
|
||||
error!("Attempted schema evolution but received an unhandled error!: {err:?}");
|
||||
vec![]
|
||||
}
|
||||
},
|
||||
table.get_state(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
false => vec![],
|
||||
};
|
||||
|
||||
Ok([adds, removes, metadata].concat())
|
||||
}
|
||||
|
||||
/// Remove the given files from an already existing and initialized [DeltaTable]
|
||||
pub async fn remove_from_table(files: &[ObjectMeta], table: &mut DeltaTable) -> DeltaResult<i64> {
|
||||
let actions = remove_actions_for(files);
|
||||
/// Return the new metadata actions if the files given can result in a schema migration
|
||||
///
|
||||
/// If nothing should happen then this will end up returning an empty set
|
||||
async fn metadata_actions_for(
|
||||
files: &[ObjectMeta],
|
||||
table: &DeltaTable,
|
||||
) -> DeltaResult<Vec<Action>> {
|
||||
if let Some(last_file) = files.last() {
|
||||
debug!("Attempting to evolve the schema for {table:?}");
|
||||
let table_schema = table.get_schema()?;
|
||||
// Cloning here to take an owned version of [DeltaTableMetaData] for later modification
|
||||
let mut table_metadata = table.get_metadata()?.clone();
|
||||
|
||||
if actions.is_empty() {
|
||||
return Ok(table.version());
|
||||
let file_schema =
|
||||
fetch_parquet_schema(table.object_store().clone(), last_file.clone()).await?;
|
||||
let mut new_schema: Vec<SchemaField> = table_schema.get_fields().clone();
|
||||
|
||||
for file_field in file_schema.fields() {
|
||||
let name = file_field.name();
|
||||
// If the table's schema doesn't have the field, add to our new schema
|
||||
if table_schema.get_field_with_name(name).is_err() {
|
||||
debug!("Found a new column `{name}` which will be added");
|
||||
new_schema.push(SchemaField::new(
|
||||
name.to_string(),
|
||||
// These types can have timestmaps in them, so coerce them properly
|
||||
coerce_field(file_field.clone()).data_type().try_into()?,
|
||||
true,
|
||||
HashMap::default(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if new_schema.len() > table_schema.get_fields().len() {
|
||||
let new_schema = deltalake::schema::SchemaTypeStruct::new(new_schema);
|
||||
table_metadata.schema = new_schema;
|
||||
table_metadata.created_time = Some(chrono::Utc::now().timestamp_millis());
|
||||
return Ok(vec![Action::metaData(table_metadata.try_into()?)]);
|
||||
}
|
||||
}
|
||||
|
||||
deltalake::operations::transaction::commit(
|
||||
table.object_store().as_ref(),
|
||||
&actions,
|
||||
DeltaOperation::Delete { predicate: None },
|
||||
table.get_state(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
/**
|
||||
* Take an iterator of files and determine what looks like a partition column from it
|
||||
*/
|
||||
/// Take an iterator of files and determine what looks like a partition column from it
|
||||
fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
|
||||
// The HashSet is only to prevent collecting redundant partitions
|
||||
let mut results = HashSet::new();
|
||||
|
@ -270,9 +304,7 @@ fn partition_columns_from(files: &[ObjectMeta]) -> Vec<String> {
|
|||
results.into_iter().collect()
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the partitions from the given path buf
|
||||
*/
|
||||
/// Return all the partitions from the given path buf
|
||||
fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
|
||||
path_str
|
||||
.split('/')
|
||||
|
@ -280,12 +312,10 @@ fn partitions_from(path_str: &str) -> Vec<DeltaTablePartition> {
|
|||
.collect()
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a series of Add actions for the given ObjectMeta entries
|
||||
*
|
||||
* This is a critical translation layer between discovered parquet files and how those would be
|
||||
* represented inside of the log
|
||||
*/
|
||||
/// Provide a series of Add actions for the given ObjectMeta entries
|
||||
///
|
||||
/// This is a critical translation layer between discovered parquet files and how those would be
|
||||
/// represented inside of the log
|
||||
pub fn add_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
|
||||
files
|
||||
.iter()
|
||||
|
@ -328,12 +358,10 @@ pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
|
|||
.collect()
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the smallest file from the given set of files.
|
||||
*
|
||||
* This can be useful to find the smallest possible parquet file to load from the set in order to
|
||||
* discern schema information
|
||||
*/
|
||||
/// Return the smallest file from the given set of files.
|
||||
///
|
||||
/// This can be useful to find the smallest possible parquet file to load from the set in order to
|
||||
/// discern schema information
|
||||
fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
|
||||
if files.is_empty() {
|
||||
return None;
|
||||
|
@ -349,6 +377,56 @@ fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
|
|||
Some(smallest)
|
||||
}
|
||||
|
||||
/// Retrieve and return the schema of the given [ObjectMeta] parquet file based on
|
||||
/// its metadata
|
||||
pub async fn fetch_parquet_schema(
|
||||
store: Arc<dyn ObjectStore>,
|
||||
file: ObjectMeta,
|
||||
) -> DeltaResult<ArrowSchema> {
|
||||
let metadata = load_parquet_metadata(store.clone(), file).await?;
|
||||
let schema = metadata.file_metadata().schema_descr();
|
||||
Ok(deltalake::parquet::arrow::parquet_to_arrow_schema(
|
||||
schema, None,
|
||||
)?)
|
||||
}
|
||||
|
||||
/// Retrieve a the metadata for a given parquet file referenced by [ObjectMeta]
|
||||
async fn load_parquet_metadata(
|
||||
store: Arc<dyn ObjectStore>,
|
||||
file: ObjectMeta,
|
||||
) -> DeltaResult<Arc<ParquetMetaData>> {
|
||||
let reader = ParquetObjectReader::new(store.clone(), file);
|
||||
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
|
||||
let metadata = builder.metadata();
|
||||
|
||||
deltalake::parquet::schema::printer::print_parquet_metadata(&mut std::io::stdout(), metadata);
|
||||
Ok(metadata.clone())
|
||||
}
|
||||
|
||||
fn coerce_field(
|
||||
field: deltalake::arrow::datatypes::FieldRef,
|
||||
) -> deltalake::arrow::datatypes::FieldRef {
|
||||
match field.data_type() {
|
||||
deltalake::arrow::datatypes::DataType::Timestamp(unit, tz) => match unit {
|
||||
deltalake::arrow::datatypes::TimeUnit::Millisecond => {
|
||||
warn!("I have been asked to create a table with a Timestamp(millis) column ({}) that I cannot handle. Cowardly setting the Delta schema to pretend it is a Timestamp(micros)", field.name());
|
||||
let field = deltalake::arrow::datatypes::Field::new(
|
||||
field.name(),
|
||||
deltalake::arrow::datatypes::DataType::Timestamp(
|
||||
deltalake::arrow::datatypes::TimeUnit::Microsecond,
|
||||
tz.clone(),
|
||||
),
|
||||
field.is_nullable(),
|
||||
);
|
||||
return Arc::new(field);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
};
|
||||
return field.clone();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -368,6 +446,35 @@ mod tests {
|
|||
|
||||
use super::*;
|
||||
|
||||
/// Return a simple test table for based on the fixtures
|
||||
///
|
||||
/// The table that is returned cannot be reloaded since the tempdir is dropped
|
||||
pub(crate) async fn test_table() -> DeltaTable {
|
||||
let (_tempdir, store) =
|
||||
create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table")
|
||||
}
|
||||
|
||||
pub(crate) fn paths_to_objectmetas(slice: Vec<Path>) -> Vec<ObjectMeta> {
|
||||
slice
|
||||
.into_iter()
|
||||
.map(|location| ObjectMeta {
|
||||
location,
|
||||
last_modified: Utc::now(),
|
||||
size: 1,
|
||||
e_tag: None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn assert_unordered_eq<T>(left: &[T], right: &[T])
|
||||
where
|
||||
T: Eq + Hash + std::fmt::Debug,
|
||||
|
@ -379,10 +486,8 @@ mod tests {
|
|||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
/**
|
||||
* Helper function to use when debugging to list temp and other directories recursively
|
||||
*/
|
||||
fn list_directory(path: &std::path::Path) {
|
||||
/// Helper function to use when debugging to list temp and other directories recursively
|
||||
pub(crate) fn list_directory(path: &std::path::Path) {
|
||||
if path.is_dir() {
|
||||
for entry in std::fs::read_dir(path).unwrap() {
|
||||
let entry = entry.unwrap();
|
||||
|
@ -669,81 +774,6 @@ mod tests {
|
|||
assert_eq!(files.len(), 2, "No files discovered");
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that the append_to_table() function does not add redundant files already added to the
|
||||
* Delta Table
|
||||
*
|
||||
* <https://github.com/buoyant-data/oxbow/issues/3>
|
||||
*/
|
||||
#[tokio::test]
|
||||
async fn test_avoiding_adding_duplicate_files() {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
let mut table = create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let schema = table.get_schema().expect("Failed to get schema");
|
||||
assert!(
|
||||
schema.get_field_with_name("c2").is_ok(),
|
||||
"The schema does not include the expected partition key `c2`"
|
||||
);
|
||||
assert_eq!(
|
||||
table.get_files().len(),
|
||||
4,
|
||||
"Did not find the right number of tables"
|
||||
);
|
||||
|
||||
append_to_table(&files, &mut table)
|
||||
.await
|
||||
.expect("Failed to append files");
|
||||
table.load().await.expect("Failed to reload the table");
|
||||
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
|
||||
}
|
||||
|
||||
/*
|
||||
* Discovered an issue where append_to_table can create a new empty commit if no files are
|
||||
* provided. That's silly!
|
||||
*/
|
||||
#[tokio::test]
|
||||
async fn test_avoid_appending_empty_list() {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
let mut table = create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let schema = table.get_schema().expect("Failed to get schema");
|
||||
assert!(
|
||||
schema.get_field_with_name("c2").is_ok(),
|
||||
"The schema does not include the expected partition key `c2`"
|
||||
);
|
||||
assert_eq!(0, table.version(), "Unexpected version");
|
||||
// Start with an empty set
|
||||
let files = vec![];
|
||||
|
||||
append_to_table(&files, &mut table)
|
||||
.await
|
||||
.expect("Failed to append files");
|
||||
table.load().await.expect("Failed to reload the table");
|
||||
assert_eq!(
|
||||
0,
|
||||
table.version(),
|
||||
"Unexpected version, should not have created a commit"
|
||||
);
|
||||
assert_eq!(table.get_files().len(), 4, "Found redundant files!");
|
||||
}
|
||||
|
||||
/*
|
||||
* There are some cases where data will be laid out in a hive partition scheme but the parquet
|
||||
* files may not contain the partitioning information. When using EXPORT DATA from BigQuery it
|
||||
|
@ -775,40 +805,33 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
/// This test is mostly to validate an approach for reading and loading schema
|
||||
/// from a parquet file into an Arrow schema for evolution
|
||||
#[tokio::test]
|
||||
async fn test_remove_from_table() {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
let mut table = create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let schema = table.get_schema().expect("Failed to get schema");
|
||||
assert!(
|
||||
schema.get_field_with_name("c2").is_ok(),
|
||||
"The schema does not include the expected partition key `c2`"
|
||||
async fn test_load_parquet_metadata() -> DeltaResult<()> {
|
||||
let location = Path::from(
|
||||
"c2=foo1/part-00001-1c702e73-89b5-465a-9c6a-25f7559cd150.c000.snappy.parquet",
|
||||
);
|
||||
assert_eq!(0, table.version(), "Unexpected version");
|
||||
let url = Url::from_file_path(std::fs::canonicalize(
|
||||
"../../tests/data/hive/deltatbl-partitioned",
|
||||
)?)
|
||||
.expect("Failed to parse");
|
||||
let storage = object_store_for(&url, None);
|
||||
let meta = storage.head(&location).await.unwrap();
|
||||
|
||||
remove_from_table(&files, &mut table)
|
||||
let schema = fetch_parquet_schema(storage.clone(), meta)
|
||||
.await
|
||||
.expect("Failed to append files");
|
||||
table.load().await.expect("Failed to reload the table");
|
||||
.expect("Failed to load parquet file schema");
|
||||
assert_eq!(
|
||||
schema.fields.len(),
|
||||
1,
|
||||
table.version(),
|
||||
"Unexpected version, should have created a commit"
|
||||
"Expected only to find one field on the Parquet file's schema"
|
||||
);
|
||||
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_unknown_from_table() {
|
||||
async fn test_actions_for() -> DeltaResult<()> {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
|
@ -817,58 +840,232 @@ mod tests {
|
|||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
// Only creating the table with some of the files
|
||||
let mut table = create_table_with(&files[0..1].to_vec(), store.clone())
|
||||
// Creating the table with one of the discovered files, so the remaining three should be
|
||||
// added later
|
||||
let table = create_table_with(&vec![files[0].clone()], store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let schema = table.get_schema().expect("Failed to get schema");
|
||||
assert!(
|
||||
schema.get_field_with_name("c2").is_ok(),
|
||||
"The schema does not include the expected partition key `c2`"
|
||||
);
|
||||
assert_eq!(0, table.version(), "Unexpected version");
|
||||
|
||||
// Removing _all_ the files we know about, this should not error
|
||||
remove_from_table(&files, &mut table)
|
||||
let mods = TableMods {
|
||||
adds: files,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let actions = actions_for(&mods, &table, false)
|
||||
.await
|
||||
.expect("Failed to append files");
|
||||
table.load().await.expect("Failed to reload the table");
|
||||
.expect("Failed to curate actions");
|
||||
assert_eq!(
|
||||
1,
|
||||
table.version(),
|
||||
"Unexpected version, should have created a commit"
|
||||
actions.len(),
|
||||
3,
|
||||
"Expected an add action for every new file discovered"
|
||||
);
|
||||
assert_eq!(table.get_files().len(), 0, "Found redundant files!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_empty_set() {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
async fn test_actions_for_with_redundant_files() {
|
||||
let table = util::test_table().await;
|
||||
let files = util::paths_to_objectmetas(table.get_files());
|
||||
let mods = TableMods {
|
||||
adds: files,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
let actions = actions_for(&mods, &table, false)
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
let mut table = create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let schema = table.get_schema().expect("Failed to get schema");
|
||||
assert!(
|
||||
schema.get_field_with_name("c2").is_ok(),
|
||||
"The schema does not include the expected partition key `c2`"
|
||||
);
|
||||
assert_eq!(0, table.version(), "Unexpected version");
|
||||
|
||||
remove_from_table(&[], &mut table)
|
||||
.await
|
||||
.expect("Failed to append files");
|
||||
table.load().await.expect("Failed to reload the table");
|
||||
.expect("Failed to curate actions");
|
||||
assert_eq!(
|
||||
actions.len(),
|
||||
0,
|
||||
table.version(),
|
||||
"Unexpected version, should not have created a commit"
|
||||
"Expected no add actions for redundant files"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_actions_for_with_removes() -> DeltaResult<()> {
|
||||
let table = util::test_table().await;
|
||||
let files = util::paths_to_objectmetas(table.get_files());
|
||||
let mods = TableMods {
|
||||
adds: vec![],
|
||||
removes: files,
|
||||
};
|
||||
|
||||
let actions = actions_for(&mods, &table, false)
|
||||
.await
|
||||
.expect("Failed to curate actions");
|
||||
assert_eq!(
|
||||
actions.len(),
|
||||
4,
|
||||
"Expected an add action for every new file discovered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_no_actions() {
|
||||
let mut table = util::test_table().await;
|
||||
let initial_version = table.version();
|
||||
let result = commit_to_table(&vec![], &mut table).await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap(), initial_version);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_remove_actions() {
|
||||
let mut table = util::test_table().await;
|
||||
let initial_version = table.version();
|
||||
|
||||
let files = util::paths_to_objectmetas(table.get_files());
|
||||
let mods = TableMods {
|
||||
adds: vec![],
|
||||
removes: files,
|
||||
};
|
||||
let actions = actions_for(&mods, &table, false)
|
||||
.await
|
||||
.expect("Failed to curate actions");
|
||||
assert_eq!(
|
||||
actions.len(),
|
||||
4,
|
||||
"Expected an add action for every new file discovered"
|
||||
);
|
||||
|
||||
let result = commit_to_table(&actions, &mut table).await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(
|
||||
result.unwrap(),
|
||||
initial_version + 1,
|
||||
"Should have incremented the version"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_commit_with_all_actions() {
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
// Creating the table with one of the discovered files, so the remaining three should be
|
||||
// added later
|
||||
let mut table = create_table_with(&vec![files[0].clone()], store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let initial_version = table.version();
|
||||
|
||||
let mods = TableMods {
|
||||
adds: files.clone(),
|
||||
removes: vec![files[0].clone()],
|
||||
};
|
||||
let actions = actions_for(&mods, &table, false)
|
||||
.await
|
||||
.expect("Failed to curate actions");
|
||||
assert_eq!(
|
||||
actions.len(),
|
||||
4,
|
||||
"Expected an 3 add actions and 1 remove action"
|
||||
);
|
||||
|
||||
let result = commit_to_table(&actions, &mut table).await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(
|
||||
result.unwrap(),
|
||||
initial_version + 1,
|
||||
"Should have incremented the version"
|
||||
);
|
||||
table.load().await.expect("Failed to reload table");
|
||||
assert_eq!(
|
||||
table.get_files().len(),
|
||||
3,
|
||||
"Expected to only find three files on the table at this state"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema_evolution() {
|
||||
use deltalake::operations::create::CreateBuilder;
|
||||
|
||||
let (_tempdir, store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
let table = CreateBuilder::new()
|
||||
.with_object_store(store.clone())
|
||||
.with_column(
|
||||
"party_time",
|
||||
SchemaDataType::primitive("string".into()),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.with_save_mode(SaveMode::Ignore)
|
||||
.await
|
||||
.expect("Failed to create a test table");
|
||||
|
||||
let initial_version = table.version();
|
||||
assert_eq!(initial_version, 0);
|
||||
|
||||
let adds = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(adds.len(), 4, "No files discovered");
|
||||
let mods = TableMods {
|
||||
adds,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let actions = actions_for(&mods, &table, true)
|
||||
.await
|
||||
.expect("Failed to get actions");
|
||||
assert_eq!(
|
||||
actions.len(),
|
||||
5,
|
||||
"Expected 4 add actions and a metadata action"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema_evolution_with_timestamps() {
|
||||
use fs_extra::{copy_items, dir::CopyOptions};
|
||||
let (tempdir, _store) =
|
||||
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
|
||||
|
||||
// The store that comes back is not properly prefixed to the delta table that this test
|
||||
// needs to work with
|
||||
let table_url = Url::from_file_path(tempdir.path().join("deltatbl-partitioned"))
|
||||
.expect("Failed to parse local path");
|
||||
let store = object_store_for(&table_url, None);
|
||||
|
||||
let files = discover_parquet_files(store.clone())
|
||||
.await
|
||||
.expect("Failed to discover parquet files");
|
||||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
|
||||
let table = create_table_with(&files, store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
let options = CopyOptions::new();
|
||||
// This file has some _alternative_ timestamps and can be used for testing :smile:
|
||||
let new_file =
|
||||
std::fs::canonicalize("../../tests/data/hive/faker_products/1701800282197_0.parquet")
|
||||
.expect("Failed to canonicalize");
|
||||
let _ =
|
||||
copy_items(&[new_file], store.root_uri(), &options).expect("Failed to copy items over");
|
||||
|
||||
let files = vec![ObjectMeta {
|
||||
location: Path::from("1701800282197_0.parquet"),
|
||||
e_tag: None,
|
||||
size: 11351,
|
||||
last_modified: Utc::now(),
|
||||
}];
|
||||
let metadata = metadata_actions_for(&files, &table)
|
||||
.await
|
||||
.expect("Failed to generate metadata actions");
|
||||
assert_eq!(
|
||||
metadata.len(),
|
||||
1,
|
||||
"Execpted a scheam evolution metadata action"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,5 +27,8 @@ toc::[]
|
|||
| _null_
|
||||
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|
||||
|
||||
| `SCHEMA_EVOLUTION`
|
||||
| _null_
|
||||
| When set to any value this enables schema evolution by the Oxbow Lambda which will cause it to retrieve metadata for the last `.parquet` it reteives in an event notification and add new columns if necessary
|
||||
|
||||
|===
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
/*
|
||||
* The lambda crate contains the Lambda specific implementation of oxbow.
|
||||
*/
|
||||
|
||||
///
|
||||
/// The oxbow lambda crate contains the Lambda-specific handling of the Oxbow lambda
|
||||
///
|
||||
/// While most of the key logic does exist in the oxbow and oxbow-lambda-shared crates, this
|
||||
/// function glues that into the Lambda runtime
|
||||
use aws_lambda_events::sqs::SqsEvent;
|
||||
use deltalake::DeltaTableError;
|
||||
use lambda_runtime::{service_fn, Error, LambdaEvent};
|
||||
|
@ -20,8 +21,8 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
.init();
|
||||
info!("Starting oxbow");
|
||||
info!("Starting the Lambda runtime");
|
||||
info!("Starting oxbow");
|
||||
let func = service_fn(func);
|
||||
lambda_runtime::run(func)
|
||||
.await
|
||||
|
@ -31,6 +32,11 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
|
||||
async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
|
||||
debug!("Receiving event: {:?}", event);
|
||||
let can_evolve_schema: bool = std::env::var("SCHEMA_EVOLUTION").is_ok();
|
||||
if can_evolve_schema {
|
||||
info!("Schema evolution has been enabled based on the environment variable");
|
||||
}
|
||||
|
||||
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
|
||||
Ok(_) => s3_from_sns(event.payload)?,
|
||||
Err(_) => s3_from_sqs(event.payload)?,
|
||||
|
@ -59,7 +65,11 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
|
|||
Ok(mut table) => {
|
||||
info!("Opened table to append: {:?}", table);
|
||||
|
||||
match oxbow::append_to_table(table_mods.adds.as_slice(), &mut table).await {
|
||||
let actions = oxbow::actions_for(table_mods, &table, can_evolve_schema)
|
||||
.await
|
||||
.expect("Failed to generate actions for the table modifications");
|
||||
|
||||
match oxbow::commit_to_table(&actions, &mut table).await {
|
||||
Ok(version) => {
|
||||
info!(
|
||||
"Successfully appended version {} to table at {}",
|
||||
|
@ -88,30 +98,6 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
|
|||
return Err(Box::new(err));
|
||||
}
|
||||
}
|
||||
|
||||
if !table_mods.removes.is_empty() {
|
||||
info!(
|
||||
"{} Remove actions are expected in this operation",
|
||||
table_mods.removes.len()
|
||||
);
|
||||
match oxbow::remove_from_table(table_mods.removes.as_slice(), &mut table).await
|
||||
{
|
||||
Ok(version) => {
|
||||
info!(
|
||||
"Successfully created version {} with remove actions",
|
||||
version
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to create removes on the table {}: {:?}",
|
||||
location, err
|
||||
);
|
||||
let _ = oxbow::lock::release(lock, &lock_client).await;
|
||||
return Err(Box::new(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(DeltaTableError::NotATable(_e)) => {
|
||||
// create the table with our objects
|
||||
|
|
Loading…
Reference in New Issue