Compare commits

...

5 Commits

Author SHA1 Message Date
R Tyler Croy bb5caee2f7 Introduce glue-create to handle initial table creation with AWS Athena 2024-04-21 23:07:14 +00:00
R Tyler Croy d8cdbdaf95 Update the GitHub Action to include glue-sync 2024-04-21 22:36:32 +00:00
R Tyler Croy 30f65c4713 Introduce a glue table updating lambda which is trigged by S3 Event Notifications
This introduces some logic to attempt to add additional columns which
appear on the Delta table into the Glue Data Catalog schema.

Unfortunately AWS Glue Data Catalog is eventually consistent so
operations are not immediate.

Gee whiz is this kind of a pain to test.
2024-04-21 22:33:13 +00:00
R Tyler Croy 9d88d396f1 Add the scaffolding for glue-sync 2024-04-21 17:29:52 +00:00
R Tyler Croy 13167d7b18 Refactor the build scripts to all point at the same origin 2024-04-21 17:25:01 +00:00
16 changed files with 857 additions and 13 deletions

View File

@ -126,3 +126,23 @@ jobs:
asset_path: ./target/lambda/sqs-ingest/bootstrap.zip
asset_name: sqs-ingest-lambda-bootstrap-${{ github.ref_name }}.zip
asset_content_type: application/zip
- name: Upload glue-create lambda
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/lambda/glue-create/bootstrap.zip
asset_name: glue-create-lambda-bootstrap-${{ github.ref_name }}.zip
asset_content_type: application/zip
- name: Upload glue-sync lambda
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ github.token }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/lambda/glue-sync/bootstrap.zip
asset_name: glue-sync-lambda-bootstrap-${{ github.ref_name }}.zip
asset_content_type: application/zip

View File

@ -21,7 +21,8 @@ chrono = "0.4.31"
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] }
deltalake = { version = "0.16.5", features = ["s3", "json"]}
tokio = { version = "=1", features = ["macros"] }
serde_json = "1"
regex = "=1"
serde_json = "=1"
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] }
url = { version = "2.3", features = ["serde"] }

View File

@ -12,16 +12,16 @@ check: ## Ensure that the crate meets the basic formatting and structure
(cd deployment && terraform fmt -check)
build: ## Build the crate with each set of features
cargo build
./ci/build.sh
build-release: check test ## Build the release versions of Lambdas
cargo lambda build --release --output-format zip
./ci/build-release.sh
deploy: check ## Deploy the examples
(cd deployment && terraform apply)
test: ## Run the crate's tests with each set of features
cargo test
./ci/test.sh
clean: ## Clean up resources from build
cargo clean

View File

@ -1,4 +1,3 @@
#!/bin/sh
cargo lambda build --release --output-format zip
exec cargo lambda build --release --output-format zip

View File

@ -1,3 +1,7 @@
#!/bin/sh
cargo lambda build
set -xe
cargo fmt --check
exec cargo build

View File

@ -1,3 +1,3 @@
#!/bin/sh
cargo test --verbose
exec cargo test --verbose

View File

@ -228,7 +228,7 @@ pub async fn actions_for(
let adds = add_actions_for(&new_files);
let removes = remove_actions_for(&mods.removes);
let metadata = match evolve_schema {
true => match metadata_actions_for(&new_files, &table).await {
true => match metadata_actions_for(&new_files, table).await {
Ok(axns) => axns,
Err(err) => {
error!("Attempted schema evolution but received an unhandled error!: {err:?}");
@ -424,7 +424,7 @@ fn coerce_field(
},
_ => {}
};
return field.clone();
field.clone()
}
#[cfg(test)]
@ -842,7 +842,7 @@ mod tests {
// Creating the table with one of the discovered files, so the remaining three should be
// added later
let table = create_table_with(&vec![files[0].clone()], store.clone())
let table = create_table_with(&[files[0].clone()], store.clone())
.await
.expect("Failed to create table");
@ -950,7 +950,7 @@ mod tests {
assert_eq!(files.len(), 4, "No files discovered");
// Creating the table with one of the discovered files, so the remaining three should be
// added later
let mut table = create_table_with(&vec![files[0].clone()], store.clone())
let mut table = create_table_with(&[files[0].clone()], store.clone())
.await
.expect("Failed to create table");
let initial_version = table.version();

1
lambdas/glue-create/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,23 @@
[package]
name = "glue-create"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true
[dependencies]
anyhow = { workspace = true }
aws_lambda_events = { workspace = true }
regex = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
aws-sdk-athena = "=1"
aws-sdk-glue = "=1"
aws-config = { version = "=1", features = ["behavior-version-latest"] }
lambda_runtime = { version = "0.11" }

View File

@ -0,0 +1,38 @@
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro
= Glue Create
This is a simple Lambda which receives S3 bucket notifications when a new Delta
Lake table is created and creates the appropriate Glue table entry in the data
catalog.
The function exists to handle some of the custom renaming logic to accommodate
the two-layer hierarchy in the AWS Glue Data Catalog (`database` and `table
name`).
toc::[]
== Environment Variables
|===
| Name | Default Value | Notes
| `RUST_LOG`
| `error`
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
| `UNWRAP_SNS_ENVELOPE`
| _null_
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|===

View File

@ -0,0 +1,264 @@
use aws_lambda_events::s3::S3EventRecord;
use aws_lambda_events::sqs::SqsEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use regex::Regex;
use tracing::log::*;
use oxbow_lambda_shared::*;
/// Environment variable which defines the regular express to process a database and table from the
/// key names
const GLUE_REGEX_ENV: &str = "GLUE_PATH_REGEX";
/// Environment variable of the Athena workgroup to use when creating external tables in the
/// catalog
const ATHENA_WORKGROUP_ENV: &str = "ATHENA_WORKGROUP";
/// Environment variable for the Data Source to use in Athena, this sohuld be a Glue catalog
/// configured Data Source for Athena
const ATHENA_DATA_SOURCE_ENV: &str = "ATHENA_DATA_SOURCE";
const QUERY_TIMEOUT_SECS: i32 = 60;
#[derive(Debug, PartialEq)]
struct GlueTable {
name: String,
database: String,
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
info!("Invoking glue-create");
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let glue = aws_sdk_glue::Client::new(&config);
// The athena client is used for creating the table in the catalog with the
// appropriate metadata.
//
// Rather than processing the Delta Table ourselves, Athena will:
//
// Unlike traditional Hive tables, Delta Lake table metadata are inferred from
// the Delta Lake transaction log and synchronized directly to AWS Glue.
//
// <https://docs.aws.amazon.com/athena/latest/ug/delta-lake-tables.html>
let athena = aws_sdk_athena::Client::new(&config);
let _ = std::env::var(GLUE_REGEX_ENV)
.expect("The Lambda must have GLUE_PATH_REGEX defined in the environment");
let _ = std::env::var(ATHENA_WORKGROUP_ENV)
.expect("The Lambda must have ATHENA_WORKGROUP defined in the environment");
let _ = std::env::var(ATHENA_DATA_SOURCE_ENV)
.expect("The Lambda must have ATHENA_DATA_SOURCE defined in the environment");
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event.payload)?,
Err(_) => s3_from_sqs(event.payload)?,
};
debug!("processing records: {records:?}");
for record in records {
process_record(record, &glue, &athena).await?;
}
Ok(())
}
async fn process_record(
record: S3EventRecord,
glue: &aws_sdk_glue::Client,
athena: &aws_sdk_athena::Client,
) -> Result<(), Error> {
debug!("Parsing keys out of {record:?}");
let bucket = record
.s3
.bucket
.name
.expect("Failed to get bucket name out of payload");
let key = record
.s3
.object
.key
.expect("Failed to get object key out of payload");
let pattern = std::env::var(GLUE_REGEX_ENV)
.expect("Must have GLUE_PATH_REGEX defined in the environment");
let pattern = Regex::new(&pattern).expect("Failed to compile the defined GLUE_PATH_REGEX");
// Map the key key to a GlueTable properly, currently only bucket notifications from the
// aurora_raw/ space is supported.
let glue_table = extract_table_from_key(&key, &pattern).expect(&format!(
"Expected to be able to parse out a table name: {key:?}"
));
match glue
.get_table()
.database_name(&glue_table.database)
.name(&glue_table.name)
.send()
.await
{
Ok(_output) => {
info!("The table {glue_table:?} already exists, nice.");
Ok(())
}
Err(_e) => {
info!("The table {glue_table:?} does not exist, creating it!");
// Determine if the key is pointing to a delta table and then start loading
if let Some(table_path) = key_is_delta_table(&key) {
let uri = format!("s3://{bucket}/{table_path}");
let work_group =
std::env::var(ATHENA_WORKGROUP_ENV).expect("Failed to get work group");
let catalog = match std::env::var(ATHENA_DATA_SOURCE_ENV) {
Ok(val) => Some(val.to_string()),
Err(_) => None,
};
let exec_context = aws_sdk_athena::types::QueryExecutionContext::builder()
.database(glue_table.database)
.set_catalog(catalog)
.build();
let query = format!(
r#"CREATE EXTERNAL TABLE
{0}
LOCATION '{uri}'
TBLPROPERTIES ('table_type' = 'DELTA')"#,
glue_table.name
);
info!("Triggering athena with: {query}");
let result = athena
.start_query_execution()
.work_group(work_group)
.query_execution_context(exec_context)
.query_string(&query)
.send()
.await?;
debug!("Query response: {result:?}");
if let Some(query_execution_id) = result.query_execution_id {
debug!("Waiting for results from Athena");
for _i in 0..QUERY_TIMEOUT_SECS {
let execution = athena
.get_query_execution()
.query_execution_id(&query_execution_id)
.send()
.await?;
if let Some(execution) = execution.query_execution {
if let Some(status) = execution.status {
debug!("Current status of query execution: {status:?}");
match status.state {
Some(aws_sdk_athena::types::QueryExecutionState::Succeeded) => {
break;
}
Some(aws_sdk_athena::types::QueryExecutionState::Failed) => {
error!("Query failed! {status:?}");
return Err(
"Failed to query Athena for some reason!".into()
);
}
// Safely ignore Running, Queued, and other statuses
_ => {}
}
}
}
debug!("Incompleted query, waiting a sec");
let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
Ok(())
} else {
error!("Invoked with a key that didn't match a delta table! {key}");
Ok(())
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
///
/// Use the `GLUE_TABLE_REGEX` to identify whether the key is a consistent with a table
///
fn extract_table_from_key(key: &str, regex: &Regex) -> Option<GlueTable> {
if let Some(captured) = regex.captures(key) {
let database = captured.name("database");
let table = captured.name("table");
if database.is_some() && table.is_some() {
let database = database.unwrap().as_str();
let name = table.unwrap().as_str();
return Some(GlueTable {
database: database.into(),
name: name.into(),
});
}
}
None
}
/// Match the key against a regular expression of what a delta table looks like
/// This will make it easier to figure out whether the modified bucket is a Delta Table so we can
/// load it and look for it in the Glue catalog
///
/// Returns the key name for the delta table so you can open it
fn key_is_delta_table(key: &str) -> Option<String> {
let regex = Regex::new(r#"(?P<root>.*)\/_delta_log\/.*\.json"#)
.expect("Failed to compile the regular expression");
if let Some(caps) = regex.captures(key) {
if let Some(root) = caps.name("root") {
return Some(root.as_str().into());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_invalid_table() {
let key = "data-lake/foo.parquet";
let pattern = Regex::new("").expect("Failed to compile regex");
assert_eq!(None, extract_table_from_key(key, &pattern));
}
#[test]
fn test_extract_valid_table() {
let key = r#"data-lake/testdb/testtable/_delta_log/0000000000.json"#;
let pattern = r#"data-lake\/(?P<database>\w+)\/(?P<table>\w+\.?\w+)\/_delta_log\/.*.json"#;
let pattern = Regex::new(pattern).expect("Failed to compile regex");
let result = extract_table_from_key(key, &pattern);
assert!(result.is_some());
let info = result.unwrap();
assert_eq!(info.database, "testdb");
assert_eq!(info.name, "testtable");
}
#[test]
fn test_unrelated_key() {
let key = "prefix/some/key/that/don/match.parquet";
let result = key_is_delta_table(key);
assert!(result.is_none());
}
#[test]
fn test_matching_key() {
let key = "prefix/testdb/testtable/_delta_log/00000000000000000000.json";
let result = key_is_delta_table(key);
assert!(result.is_some());
if let Some(root) = result {
assert_eq!(root, "prefix/testdb/testtable");
}
}
}

1
lambdas/glue-sync/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,23 @@
[package]
name = "glue-sync"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true
[dependencies]
anyhow = { workspace = true }
aws_lambda_events = { workspace = true }
deltalake = { workspace = true }
regex = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
aws-sdk-glue = "1.26.0"
aws-config = { version = "1.2.0", features = ["behavior-version-latest"] }
lambda_runtime = { version = "0.11" }

View File

@ -0,0 +1,36 @@
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro
= Glue Sync
The `glue-sync` Lambda helps keep an AWS Glue Datalog in sync with the schema
of the Delta tables for which it receives S3 Event Notifications. Like most
other Lambdas in the oxbow ecosystem it is intended to be triggered with S3
Event Notifications and when a table schema is changed it will load the Delta
table and compare the schema to a Glue Data Catalog entry,. modifying where
necessary.
toc::[]
== Environment Variables
|===
| Name | Default Value | Notes
| `RUST_LOG`
| `error`
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
| `UNWRAP_SNS_ENVELOPE`
| _null_
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|===

View File

@ -0,0 +1,434 @@
///
/// The glue-sync Lambda is triggered off of S3 Event Notifications and can read a Delta table
/// and attempt to add additional columns to the Glue Data Catalog schema
///
use aws_lambda_events::event::sqs::SqsEvent;
use aws_sdk_glue::types::{Column, StorageDescriptor, Table, TableInput};
use deltalake::{DeltaTable, SchemaDataType};
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use regex::Regex;
use tracing::log::*;
use oxbow_lambda_shared::*;
use std::collections::HashMap;
/// Environment variable which defines the regular express to process a database and table from the
/// key names
const GLUE_REGEX_ENV: &str = "GLUE_PATH_REGEX";
/// Helper struct to carry around a glue table's
/// database and table name
#[derive(Debug, PartialEq)]
struct GlueTable {
name: String,
database: String,
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
let _ = std::env::var(GLUE_REGEX_ENV)
.expect("The Lambda must have GLUE_PATH_REGEX defined in the environment");
info!("Starting the Lambda runtime");
info!("Starting glue-sync");
run(service_fn(function_handler)).await
}
/// Main lambda function handler
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let pattern = std::env::var(GLUE_REGEX_ENV)
.expect("Must have GLUE_PATH_REGEX defined in the environment");
let pattern = Regex::new(&pattern).expect("Failed to compile the defined GLUE_PATH_REGEX");
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = aws_sdk_glue::Client::new(&config);
debug!("Receiving event: {:?}", event);
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event.payload)?,
Err(_) => s3_from_sqs(event.payload)?,
};
debug!("processing records: {records:?}");
for record in records {
let bucket = record
.s3
.bucket
.name
.expect("Failed to get bucket name out of payload");
let key = record
.s3
.object
.key
.expect("Failed to get object key out of payload");
// Map the key key to a GlueTable properly, currently only bucket notifications from the
// aurora_raw/ space is supported.
let parsed_table = extract_table_from_key(&key, &pattern).expect(&format!(
"Expected to be able to parse out a table name: {key:?}"
));
if let Some(glue_table) =
fetch_table(&parsed_table.database, &parsed_table.name, &client).await
{
if let Some(table_path) = key_is_delta_table(&key) {
debug!("loading table from {table_path}");
let uri = format!("s3://{bucket}/{table_path}");
let delta_table = deltalake::open_table(&uri).await?;
if let Some(descriptor) = storage_descriptor_from(&delta_table, &glue_table) {
let parameters = match glue_table.parameters {
None => None,
Some(ref original) => {
let mut updated = original.clone();
updated.insert(
"delta.lastUpdateVersion".into(),
delta_table.version().to_string(),
);
Some(updated)
}
};
let update = client
.update_table()
.database_name(&parsed_table.database)
.table_input(
TableInput::builder()
.set_description(glue_table.description)
.name(glue_table.name)
.set_owner(glue_table.owner)
.set_parameters(parameters)
.set_partition_keys(glue_table.partition_keys)
.set_retention(Some(glue_table.retention))
.set_table_type(glue_table.table_type)
.set_target_table(glue_table.target_table)
.storage_descriptor(descriptor)
.build()
.expect("Failed to build table input"),
)
.send()
.await
.expect("Failed to update table");
info!(
"Update table for {}.{} sent: {update:?}",
parsed_table.database, parsed_table.name
);
} else {
debug!("No new StorageDescriptor created, nothing to do!");
}
}
} else {
info!(
"The table {:?} did not exist in Glue already, skipping",
parsed_table
);
}
}
Ok(())
}
///
/// Compute the new [StorageDescriptor] for the given [DeltaTable]
///
fn storage_descriptor_from(
delta_table: &DeltaTable,
glue_table: &Table,
) -> Option<StorageDescriptor> {
if let Some(descriptor) = glue_table.storage_descriptor() {
if let Some(cols) = &descriptor.columns {
let delta_columns_for_glue = glue_columns_for(delta_table);
let m: HashMap<String, Option<String>> =
HashMap::from_iter(cols.iter().map(|c| (c.name.clone(), c.r#type.clone())));
let delta_columns_for_glue: Vec<Column> = delta_columns_for_glue
.into_iter()
.filter(|c| !m.contains_key(&c.name))
.collect();
if delta_columns_for_glue.is_empty() {
debug!("There are no columns to add to glue, bailing");
return None;
}
let descriptor = StorageDescriptor::builder()
.set_location(descriptor.location.clone())
.set_input_format(descriptor.input_format.clone())
.set_output_format(descriptor.output_format.clone())
.set_bucket_columns(descriptor.bucket_columns.clone())
.set_serde_info(descriptor.serde_info.clone())
.set_sort_columns(descriptor.sort_columns.clone())
.set_parameters(descriptor.parameters.clone())
.set_additional_locations(descriptor.additional_locations.clone())
.set_columns(Some([cols.clone(), delta_columns_for_glue].concat()))
.build();
debug!("Computed descriptor: {descriptor:?}");
return Some(descriptor);
}
}
None
}
/// Utility function to attempt to fetch a table from Glue with the provided [aws_sdk_glue::Client]
async fn fetch_table(database: &str, table: &str, client: &aws_sdk_glue::Client) -> Option<Table> {
let response = client
.get_table()
.database_name(database)
.name(table)
.send()
.await;
match response {
Ok(output) => {
return output.table;
}
Err(err) => match err {
aws_sdk_glue::error::SdkError::ServiceError(e) => match e.err() {
aws_sdk_glue::operation::get_table::GetTableError::EntityNotFoundException(_e) => {
return None;
}
inner => {
error!("Unexpected error while looking up {database}.{table}: {inner:?}");
}
},
_ => {
error!("Unexpected fail oh no");
}
},
}
None
}
/// Return the primitive type mapping for glue
///
/// This is a separate function to accommodate reuse for arrays and structs
fn glue_type_for(delta_type_name: &str) -> String {
match delta_type_name {
"integer" => "int".into(),
"long" => "bigint".into(),
"short" => "smallint".into(),
"byte" => "char".into(),
"date" => "string".into(),
_ => delta_type_name.into(),
}
}
/// Convert a given [SchemaDataType] to the Glue type equivalent
///
/// This calls itself recursively to handle both primitives and structs/maps
fn to_glue_type(data_type: &SchemaDataType) -> String {
match data_type {
SchemaDataType::primitive(name) => glue_type_for(name),
SchemaDataType::r#struct(s) => {
format!(
"struct<{}>",
s.get_fields()
.iter()
.map(|f| format!("{}:{}", f.get_name(), to_glue_type(f.get_type())))
.collect::<Vec<String>>()
.join(",")
)
}
SchemaDataType::map(m) => {
format!(
"map<{},{}>",
to_glue_type(m.get_key_type()),
to_glue_type(m.get_value_type())
)
}
_ => {
// Default type which hopefully doesn't cause too many issues with unknown types
"string".into()
}
}
}
/// Generate the [Column] for Glue [aws_sdk_glue::types::StorageDescriptor] based on the provided
/// [DeltaTable]
fn glue_columns_for(table: &DeltaTable) -> Vec<Column> {
if let Ok(schema) = table.get_schema() {
return schema
.get_fields()
.iter()
.map(|field| {
Column::builder()
.name(field.get_name())
.r#type(to_glue_type(field.get_type()))
.build()
.expect("Failed to build column from Delta schema!")
})
.collect();
}
vec![]
}
///
/// Use the `GLUE_TABLE_REGEX` to identify whether the key is a consistent with a table
///
fn extract_table_from_key(key: &str, regex: &Regex) -> Option<GlueTable> {
if let Some(captured) = regex.captures(key) {
let database = captured.name("database");
let table = captured.name("table");
if database.is_some() && table.is_some() {
let database = database.unwrap().as_str();
let name = table.unwrap().as_str();
return Some(GlueTable {
database: database.into(),
name: name.into(),
});
}
}
None
}
/// Match the key against a regular expression of what a delta table looks like
/// This will make it easier to figure out whether the modified bucket is a Delta Table so we can
/// load it and look for it in the Glue catalog
///
/// Returns the key name for the delta table so you can open it
fn key_is_delta_table(key: &str) -> Option<String> {
let regex = Regex::new(r#"(?P<root>.*)\/_delta_log\/.*\.json"#)
.expect("Failed to compile the regular expression");
if let Some(caps) = regex.captures(key) {
if let Some(root) = caps.name("root") {
return Some(root.as_str().into());
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_glue_columns() {
let table = "../../tests/data/hive/deltatbl-non-partitioned";
let table = deltalake::open_table(&table)
.await
.expect("Failed to open table");
let columns = glue_columns_for(&table);
assert_eq!(columns.len(), 2);
assert_eq!(columns[0].name, "c1");
assert_eq!(columns[1].name, "c2");
}
/// This function is mostly used as a manual integration test since there are a lot of moving
/// parts to validate with AWS Glue Data Catalog and it's very very eventually consistent
#[ignore]
#[tokio::test]
async fn test_basic_integration() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
let buf = r#"
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "oxbow-simple",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::example-bucket"
},
"object": {
"key": "evolution/cincotest/_delta_log/0000.json",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
"#;
use aws_lambda_events::sqs::SqsMessage;
let message: SqsMessage = SqsMessage {
body: Some(buf.into()),
..Default::default()
};
let event = SqsEvent {
records: vec![message],
};
let lambda_event = LambdaEvent::new(event, lambda_runtime::Context::default());
std::env::set_var(
GLUE_REGEX_ENV,
r#"(?P<database>\w+)\/(?P<table>\w+)\/_delta_log\/.*.json"#,
);
let result = function_handler(lambda_event).await;
assert!(result.is_ok());
}
#[test]
fn test_extract_invalid_table() {
let key = "data-lake/foo.parquet";
let pattern = Regex::new("").expect("Failed to compile regex");
assert_eq!(None, extract_table_from_key(key, &pattern));
}
#[test]
fn test_extract_valid_table() {
let key = r#"data-lake/test/my_big_long_table_name/_delta_log/0000000000.json"#;
let pattern = r#"data-lake\/(?P<database>\w+)\/(?P<table>\w+\.?\w+)\/_delta_log\/.*.json"#;
let pattern = Regex::new(pattern).expect("Failed to compile regex");
let result = extract_table_from_key(key, &pattern);
assert!(result.is_some());
let info = result.unwrap();
assert_eq!(info.database, "test");
assert_eq!(info.name, "my_big_long_table_name");
}
#[test]
fn test_unrelated_key() {
let key = "beep/some/key/that/don/match.parquet";
let result = key_is_delta_table(key);
assert!(result.is_none());
}
#[test]
fn test_matching_key() {
let key = "data-lake/some-other-prefix/test/my_big_long_table_name/_delta_log/00000000000000000000.json";
let result = key_is_delta_table(key);
assert!(result.is_some());
if let Some(root) = result {
assert_eq!(
root,
"data-lake/some-other-prefix/test/my_big_long_table_name"
);
}
}
}

View File

@ -78,7 +78,7 @@ mod tests {
let buf = r#"{
"body" : "{\"key\" : \"value\"}"
}"#;
let message: SqsMessage = serde_json::from_str(&buf).expect("Failed to deserialize");
let message: SqsMessage = serde_json::from_str(buf).expect("Failed to deserialize");
let messages = vec![
message.clone(),
message.clone(),