Compare commits
5 Commits
f922e93d30
...
bb5caee2f7
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | bb5caee2f7 | |
R Tyler Croy | d8cdbdaf95 | |
R Tyler Croy | 30f65c4713 | |
R Tyler Croy | 9d88d396f1 | |
R Tyler Croy | 13167d7b18 |
|
@ -126,3 +126,23 @@ jobs:
|
|||
asset_path: ./target/lambda/sqs-ingest/bootstrap.zip
|
||||
asset_name: sqs-ingest-lambda-bootstrap-${{ github.ref_name }}.zip
|
||||
asset_content_type: application/zip
|
||||
|
||||
- name: Upload glue-create lambda
|
||||
uses: actions/upload-release-asset@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
with:
|
||||
upload_url: ${{ steps.create_release.outputs.upload_url }}
|
||||
asset_path: ./target/lambda/glue-create/bootstrap.zip
|
||||
asset_name: glue-create-lambda-bootstrap-${{ github.ref_name }}.zip
|
||||
asset_content_type: application/zip
|
||||
|
||||
- name: Upload glue-sync lambda
|
||||
uses: actions/upload-release-asset@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
with:
|
||||
upload_url: ${{ steps.create_release.outputs.upload_url }}
|
||||
asset_path: ./target/lambda/glue-sync/bootstrap.zip
|
||||
asset_name: glue-sync-lambda-bootstrap-${{ github.ref_name }}.zip
|
||||
asset_content_type: application/zip
|
||||
|
|
|
@ -21,7 +21,8 @@ chrono = "0.4.31"
|
|||
aws_lambda_events = { version = "0.12.0", default-features = false, features = ["sns", "sqs", "s3"] }
|
||||
deltalake = { version = "0.16.5", features = ["s3", "json"]}
|
||||
tokio = { version = "=1", features = ["macros"] }
|
||||
serde_json = "1"
|
||||
regex = "=1"
|
||||
serde_json = "=1"
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "env-filter", "tracing-log"] }
|
||||
url = { version = "2.3", features = ["serde"] }
|
||||
|
|
6
Makefile
6
Makefile
|
@ -12,16 +12,16 @@ check: ## Ensure that the crate meets the basic formatting and structure
|
|||
(cd deployment && terraform fmt -check)
|
||||
|
||||
build: ## Build the crate with each set of features
|
||||
cargo build
|
||||
./ci/build.sh
|
||||
|
||||
build-release: check test ## Build the release versions of Lambdas
|
||||
cargo lambda build --release --output-format zip
|
||||
./ci/build-release.sh
|
||||
|
||||
deploy: check ## Deploy the examples
|
||||
(cd deployment && terraform apply)
|
||||
|
||||
test: ## Run the crate's tests with each set of features
|
||||
cargo test
|
||||
./ci/test.sh
|
||||
|
||||
clean: ## Clean up resources from build
|
||||
cargo clean
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
|
||||
cargo lambda build --release --output-format zip
|
||||
exec cargo lambda build --release --output-format zip
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
#!/bin/sh
|
||||
|
||||
cargo lambda build
|
||||
set -xe
|
||||
|
||||
cargo fmt --check
|
||||
|
||||
exec cargo build
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
#!/bin/sh
|
||||
|
||||
cargo test --verbose
|
||||
exec cargo test --verbose
|
||||
|
|
|
@ -228,7 +228,7 @@ pub async fn actions_for(
|
|||
let adds = add_actions_for(&new_files);
|
||||
let removes = remove_actions_for(&mods.removes);
|
||||
let metadata = match evolve_schema {
|
||||
true => match metadata_actions_for(&new_files, &table).await {
|
||||
true => match metadata_actions_for(&new_files, table).await {
|
||||
Ok(axns) => axns,
|
||||
Err(err) => {
|
||||
error!("Attempted schema evolution but received an unhandled error!: {err:?}");
|
||||
|
@ -424,7 +424,7 @@ fn coerce_field(
|
|||
},
|
||||
_ => {}
|
||||
};
|
||||
return field.clone();
|
||||
field.clone()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -842,7 +842,7 @@ mod tests {
|
|||
|
||||
// Creating the table with one of the discovered files, so the remaining three should be
|
||||
// added later
|
||||
let table = create_table_with(&vec![files[0].clone()], store.clone())
|
||||
let table = create_table_with(&[files[0].clone()], store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
|
||||
|
@ -950,7 +950,7 @@ mod tests {
|
|||
assert_eq!(files.len(), 4, "No files discovered");
|
||||
// Creating the table with one of the discovered files, so the remaining three should be
|
||||
// added later
|
||||
let mut table = create_table_with(&vec![files[0].clone()], store.clone())
|
||||
let mut table = create_table_with(&[files[0].clone()], store.clone())
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let initial_version = table.version();
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
/target
|
|
@ -0,0 +1,23 @@
|
|||
[package]
|
||||
name = "glue-create"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
aws_lambda_events = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
|
||||
|
||||
aws-sdk-athena = "=1"
|
||||
aws-sdk-glue = "=1"
|
||||
aws-config = { version = "=1", features = ["behavior-version-latest"] }
|
||||
lambda_runtime = { version = "0.11" }
|
|
@ -0,0 +1,38 @@
|
|||
ifdef::env-github[]
|
||||
:tip-caption: :bulb:
|
||||
:note-caption: :information_source:
|
||||
:important-caption: :heavy_exclamation_mark:
|
||||
:caution-caption: :fire:
|
||||
:warning-caption: :warning:
|
||||
endif::[]
|
||||
:toc: macro
|
||||
|
||||
= Glue Create
|
||||
|
||||
This is a simple Lambda which receives S3 bucket notifications when a new Delta
|
||||
Lake table is created and creates the appropriate Glue table entry in the data
|
||||
catalog.
|
||||
|
||||
The function exists to handle some of the custom renaming logic to accommodate
|
||||
the two-layer hierarchy in the AWS Glue Data Catalog (`database` and `table
|
||||
name`).
|
||||
|
||||
|
||||
toc::[]
|
||||
|
||||
|
||||
== Environment Variables
|
||||
|
||||
|===
|
||||
|
||||
| Name | Default Value | Notes
|
||||
|
||||
| `RUST_LOG`
|
||||
| `error`
|
||||
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
|
||||
|
||||
| `UNWRAP_SNS_ENVELOPE`
|
||||
| _null_
|
||||
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|
||||
|
||||
|===
|
|
@ -0,0 +1,264 @@
|
|||
use aws_lambda_events::s3::S3EventRecord;
|
||||
use aws_lambda_events::sqs::SqsEvent;
|
||||
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
|
||||
use regex::Regex;
|
||||
use tracing::log::*;
|
||||
|
||||
use oxbow_lambda_shared::*;
|
||||
|
||||
/// Environment variable which defines the regular express to process a database and table from the
|
||||
/// key names
|
||||
const GLUE_REGEX_ENV: &str = "GLUE_PATH_REGEX";
|
||||
/// Environment variable of the Athena workgroup to use when creating external tables in the
|
||||
/// catalog
|
||||
const ATHENA_WORKGROUP_ENV: &str = "ATHENA_WORKGROUP";
|
||||
/// Environment variable for the Data Source to use in Athena, this sohuld be a Glue catalog
|
||||
/// configured Data Source for Athena
|
||||
const ATHENA_DATA_SOURCE_ENV: &str = "ATHENA_DATA_SOURCE";
|
||||
const QUERY_TIMEOUT_SECS: i32 = 60;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct GlueTable {
|
||||
name: String,
|
||||
database: String,
|
||||
}
|
||||
|
||||
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
|
||||
info!("Invoking glue-create");
|
||||
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
|
||||
let glue = aws_sdk_glue::Client::new(&config);
|
||||
// The athena client is used for creating the table in the catalog with the
|
||||
// appropriate metadata.
|
||||
//
|
||||
// Rather than processing the Delta Table ourselves, Athena will:
|
||||
//
|
||||
// Unlike traditional Hive tables, Delta Lake table metadata are inferred from
|
||||
// the Delta Lake transaction log and synchronized directly to AWS Glue.
|
||||
//
|
||||
// <https://docs.aws.amazon.com/athena/latest/ug/delta-lake-tables.html>
|
||||
let athena = aws_sdk_athena::Client::new(&config);
|
||||
|
||||
let _ = std::env::var(GLUE_REGEX_ENV)
|
||||
.expect("The Lambda must have GLUE_PATH_REGEX defined in the environment");
|
||||
let _ = std::env::var(ATHENA_WORKGROUP_ENV)
|
||||
.expect("The Lambda must have ATHENA_WORKGROUP defined in the environment");
|
||||
let _ = std::env::var(ATHENA_DATA_SOURCE_ENV)
|
||||
.expect("The Lambda must have ATHENA_DATA_SOURCE defined in the environment");
|
||||
|
||||
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
|
||||
Ok(_) => s3_from_sns(event.payload)?,
|
||||
Err(_) => s3_from_sqs(event.payload)?,
|
||||
};
|
||||
debug!("processing records: {records:?}");
|
||||
|
||||
for record in records {
|
||||
process_record(record, &glue, &athena).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_record(
|
||||
record: S3EventRecord,
|
||||
glue: &aws_sdk_glue::Client,
|
||||
athena: &aws_sdk_athena::Client,
|
||||
) -> Result<(), Error> {
|
||||
debug!("Parsing keys out of {record:?}");
|
||||
let bucket = record
|
||||
.s3
|
||||
.bucket
|
||||
.name
|
||||
.expect("Failed to get bucket name out of payload");
|
||||
let key = record
|
||||
.s3
|
||||
.object
|
||||
.key
|
||||
.expect("Failed to get object key out of payload");
|
||||
|
||||
let pattern = std::env::var(GLUE_REGEX_ENV)
|
||||
.expect("Must have GLUE_PATH_REGEX defined in the environment");
|
||||
let pattern = Regex::new(&pattern).expect("Failed to compile the defined GLUE_PATH_REGEX");
|
||||
|
||||
// Map the key key to a GlueTable properly, currently only bucket notifications from the
|
||||
// aurora_raw/ space is supported.
|
||||
let glue_table = extract_table_from_key(&key, &pattern).expect(&format!(
|
||||
"Expected to be able to parse out a table name: {key:?}"
|
||||
));
|
||||
|
||||
match glue
|
||||
.get_table()
|
||||
.database_name(&glue_table.database)
|
||||
.name(&glue_table.name)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(_output) => {
|
||||
info!("The table {glue_table:?} already exists, nice.");
|
||||
Ok(())
|
||||
}
|
||||
Err(_e) => {
|
||||
info!("The table {glue_table:?} does not exist, creating it!");
|
||||
// Determine if the key is pointing to a delta table and then start loading
|
||||
if let Some(table_path) = key_is_delta_table(&key) {
|
||||
let uri = format!("s3://{bucket}/{table_path}");
|
||||
let work_group =
|
||||
std::env::var(ATHENA_WORKGROUP_ENV).expect("Failed to get work group");
|
||||
let catalog = match std::env::var(ATHENA_DATA_SOURCE_ENV) {
|
||||
Ok(val) => Some(val.to_string()),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
let exec_context = aws_sdk_athena::types::QueryExecutionContext::builder()
|
||||
.database(glue_table.database)
|
||||
.set_catalog(catalog)
|
||||
.build();
|
||||
|
||||
let query = format!(
|
||||
r#"CREATE EXTERNAL TABLE
|
||||
{0}
|
||||
LOCATION '{uri}'
|
||||
TBLPROPERTIES ('table_type' = 'DELTA')"#,
|
||||
glue_table.name
|
||||
);
|
||||
info!("Triggering athena with: {query}");
|
||||
let result = athena
|
||||
.start_query_execution()
|
||||
.work_group(work_group)
|
||||
.query_execution_context(exec_context)
|
||||
.query_string(&query)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!("Query response: {result:?}");
|
||||
if let Some(query_execution_id) = result.query_execution_id {
|
||||
debug!("Waiting for results from Athena");
|
||||
|
||||
for _i in 0..QUERY_TIMEOUT_SECS {
|
||||
let execution = athena
|
||||
.get_query_execution()
|
||||
.query_execution_id(&query_execution_id)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if let Some(execution) = execution.query_execution {
|
||||
if let Some(status) = execution.status {
|
||||
debug!("Current status of query execution: {status:?}");
|
||||
match status.state {
|
||||
Some(aws_sdk_athena::types::QueryExecutionState::Succeeded) => {
|
||||
break;
|
||||
}
|
||||
Some(aws_sdk_athena::types::QueryExecutionState::Failed) => {
|
||||
error!("Query failed! {status:?}");
|
||||
return Err(
|
||||
"Failed to query Athena for some reason!".into()
|
||||
);
|
||||
}
|
||||
// Safely ignore Running, Queued, and other statuses
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Incompleted query, waiting a sec");
|
||||
let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
error!("Invoked with a key that didn't match a delta table! {key}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
// disable printing the name of the module in every log line.
|
||||
.with_target(false)
|
||||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
run(service_fn(function_handler)).await
|
||||
}
|
||||
|
||||
///
|
||||
/// Use the `GLUE_TABLE_REGEX` to identify whether the key is a consistent with a table
|
||||
///
|
||||
fn extract_table_from_key(key: &str, regex: &Regex) -> Option<GlueTable> {
|
||||
if let Some(captured) = regex.captures(key) {
|
||||
let database = captured.name("database");
|
||||
let table = captured.name("table");
|
||||
if database.is_some() && table.is_some() {
|
||||
let database = database.unwrap().as_str();
|
||||
let name = table.unwrap().as_str();
|
||||
return Some(GlueTable {
|
||||
database: database.into(),
|
||||
name: name.into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Match the key against a regular expression of what a delta table looks like
|
||||
/// This will make it easier to figure out whether the modified bucket is a Delta Table so we can
|
||||
/// load it and look for it in the Glue catalog
|
||||
///
|
||||
/// Returns the key name for the delta table so you can open it
|
||||
fn key_is_delta_table(key: &str) -> Option<String> {
|
||||
let regex = Regex::new(r#"(?P<root>.*)\/_delta_log\/.*\.json"#)
|
||||
.expect("Failed to compile the regular expression");
|
||||
|
||||
if let Some(caps) = regex.captures(key) {
|
||||
if let Some(root) = caps.name("root") {
|
||||
return Some(root.as_str().into());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_extract_invalid_table() {
|
||||
let key = "data-lake/foo.parquet";
|
||||
let pattern = Regex::new("").expect("Failed to compile regex");
|
||||
assert_eq!(None, extract_table_from_key(key, &pattern));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_valid_table() {
|
||||
let key = r#"data-lake/testdb/testtable/_delta_log/0000000000.json"#;
|
||||
let pattern = r#"data-lake\/(?P<database>\w+)\/(?P<table>\w+\.?\w+)\/_delta_log\/.*.json"#;
|
||||
let pattern = Regex::new(pattern).expect("Failed to compile regex");
|
||||
let result = extract_table_from_key(key, &pattern);
|
||||
assert!(result.is_some());
|
||||
let info = result.unwrap();
|
||||
assert_eq!(info.database, "testdb");
|
||||
assert_eq!(info.name, "testtable");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unrelated_key() {
|
||||
let key = "prefix/some/key/that/don/match.parquet";
|
||||
let result = key_is_delta_table(key);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_matching_key() {
|
||||
let key = "prefix/testdb/testtable/_delta_log/00000000000000000000.json";
|
||||
let result = key_is_delta_table(key);
|
||||
assert!(result.is_some());
|
||||
if let Some(root) = result {
|
||||
assert_eq!(root, "prefix/testdb/testtable");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
/target
|
|
@ -0,0 +1,23 @@
|
|||
[package]
|
||||
name = "glue-sync"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
aws_lambda_events = { workspace = true }
|
||||
deltalake = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
||||
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
|
||||
|
||||
aws-sdk-glue = "1.26.0"
|
||||
aws-config = { version = "1.2.0", features = ["behavior-version-latest"] }
|
||||
lambda_runtime = { version = "0.11" }
|
|
@ -0,0 +1,36 @@
|
|||
ifdef::env-github[]
|
||||
:tip-caption: :bulb:
|
||||
:note-caption: :information_source:
|
||||
:important-caption: :heavy_exclamation_mark:
|
||||
:caution-caption: :fire:
|
||||
:warning-caption: :warning:
|
||||
endif::[]
|
||||
:toc: macro
|
||||
|
||||
= Glue Sync
|
||||
|
||||
The `glue-sync` Lambda helps keep an AWS Glue Datalog in sync with the schema
|
||||
of the Delta tables for which it receives S3 Event Notifications. Like most
|
||||
other Lambdas in the oxbow ecosystem it is intended to be triggered with S3
|
||||
Event Notifications and when a table schema is changed it will load the Delta
|
||||
table and compare the schema to a Glue Data Catalog entry,. modifying where
|
||||
necessary.
|
||||
|
||||
toc::[]
|
||||
|
||||
|
||||
== Environment Variables
|
||||
|
||||
|===
|
||||
|
||||
| Name | Default Value | Notes
|
||||
|
||||
| `RUST_LOG`
|
||||
| `error`
|
||||
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
|
||||
|
||||
| `UNWRAP_SNS_ENVELOPE`
|
||||
| _null_
|
||||
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|
||||
|
||||
|===
|
|
@ -0,0 +1,434 @@
|
|||
///
|
||||
/// The glue-sync Lambda is triggered off of S3 Event Notifications and can read a Delta table
|
||||
/// and attempt to add additional columns to the Glue Data Catalog schema
|
||||
///
|
||||
use aws_lambda_events::event::sqs::SqsEvent;
|
||||
use aws_sdk_glue::types::{Column, StorageDescriptor, Table, TableInput};
|
||||
use deltalake::{DeltaTable, SchemaDataType};
|
||||
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
|
||||
use regex::Regex;
|
||||
use tracing::log::*;
|
||||
|
||||
use oxbow_lambda_shared::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Environment variable which defines the regular express to process a database and table from the
|
||||
/// key names
|
||||
const GLUE_REGEX_ENV: &str = "GLUE_PATH_REGEX";
|
||||
|
||||
/// Helper struct to carry around a glue table's
|
||||
/// database and table name
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct GlueTable {
|
||||
name: String,
|
||||
database: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
// disable printing the name of the module in every log line.
|
||||
.with_target(false)
|
||||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
.init();
|
||||
let _ = std::env::var(GLUE_REGEX_ENV)
|
||||
.expect("The Lambda must have GLUE_PATH_REGEX defined in the environment");
|
||||
|
||||
info!("Starting the Lambda runtime");
|
||||
info!("Starting glue-sync");
|
||||
|
||||
run(service_fn(function_handler)).await
|
||||
}
|
||||
|
||||
/// Main lambda function handler
|
||||
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
|
||||
let pattern = std::env::var(GLUE_REGEX_ENV)
|
||||
.expect("Must have GLUE_PATH_REGEX defined in the environment");
|
||||
let pattern = Regex::new(&pattern).expect("Failed to compile the defined GLUE_PATH_REGEX");
|
||||
|
||||
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
|
||||
let client = aws_sdk_glue::Client::new(&config);
|
||||
|
||||
debug!("Receiving event: {:?}", event);
|
||||
|
||||
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
|
||||
Ok(_) => s3_from_sns(event.payload)?,
|
||||
Err(_) => s3_from_sqs(event.payload)?,
|
||||
};
|
||||
debug!("processing records: {records:?}");
|
||||
|
||||
for record in records {
|
||||
let bucket = record
|
||||
.s3
|
||||
.bucket
|
||||
.name
|
||||
.expect("Failed to get bucket name out of payload");
|
||||
let key = record
|
||||
.s3
|
||||
.object
|
||||
.key
|
||||
.expect("Failed to get object key out of payload");
|
||||
|
||||
// Map the key key to a GlueTable properly, currently only bucket notifications from the
|
||||
// aurora_raw/ space is supported.
|
||||
let parsed_table = extract_table_from_key(&key, &pattern).expect(&format!(
|
||||
"Expected to be able to parse out a table name: {key:?}"
|
||||
));
|
||||
|
||||
if let Some(glue_table) =
|
||||
fetch_table(&parsed_table.database, &parsed_table.name, &client).await
|
||||
{
|
||||
if let Some(table_path) = key_is_delta_table(&key) {
|
||||
debug!("loading table from {table_path}");
|
||||
let uri = format!("s3://{bucket}/{table_path}");
|
||||
let delta_table = deltalake::open_table(&uri).await?;
|
||||
|
||||
if let Some(descriptor) = storage_descriptor_from(&delta_table, &glue_table) {
|
||||
let parameters = match glue_table.parameters {
|
||||
None => None,
|
||||
Some(ref original) => {
|
||||
let mut updated = original.clone();
|
||||
updated.insert(
|
||||
"delta.lastUpdateVersion".into(),
|
||||
delta_table.version().to_string(),
|
||||
);
|
||||
Some(updated)
|
||||
}
|
||||
};
|
||||
let update = client
|
||||
.update_table()
|
||||
.database_name(&parsed_table.database)
|
||||
.table_input(
|
||||
TableInput::builder()
|
||||
.set_description(glue_table.description)
|
||||
.name(glue_table.name)
|
||||
.set_owner(glue_table.owner)
|
||||
.set_parameters(parameters)
|
||||
.set_partition_keys(glue_table.partition_keys)
|
||||
.set_retention(Some(glue_table.retention))
|
||||
.set_table_type(glue_table.table_type)
|
||||
.set_target_table(glue_table.target_table)
|
||||
.storage_descriptor(descriptor)
|
||||
.build()
|
||||
.expect("Failed to build table input"),
|
||||
)
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to update table");
|
||||
info!(
|
||||
"Update table for {}.{} sent: {update:?}",
|
||||
parsed_table.database, parsed_table.name
|
||||
);
|
||||
} else {
|
||||
debug!("No new StorageDescriptor created, nothing to do!");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
"The table {:?} did not exist in Glue already, skipping",
|
||||
parsed_table
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Compute the new [StorageDescriptor] for the given [DeltaTable]
|
||||
///
|
||||
fn storage_descriptor_from(
|
||||
delta_table: &DeltaTable,
|
||||
glue_table: &Table,
|
||||
) -> Option<StorageDescriptor> {
|
||||
if let Some(descriptor) = glue_table.storage_descriptor() {
|
||||
if let Some(cols) = &descriptor.columns {
|
||||
let delta_columns_for_glue = glue_columns_for(delta_table);
|
||||
let m: HashMap<String, Option<String>> =
|
||||
HashMap::from_iter(cols.iter().map(|c| (c.name.clone(), c.r#type.clone())));
|
||||
let delta_columns_for_glue: Vec<Column> = delta_columns_for_glue
|
||||
.into_iter()
|
||||
.filter(|c| !m.contains_key(&c.name))
|
||||
.collect();
|
||||
|
||||
if delta_columns_for_glue.is_empty() {
|
||||
debug!("There are no columns to add to glue, bailing");
|
||||
return None;
|
||||
}
|
||||
let descriptor = StorageDescriptor::builder()
|
||||
.set_location(descriptor.location.clone())
|
||||
.set_input_format(descriptor.input_format.clone())
|
||||
.set_output_format(descriptor.output_format.clone())
|
||||
.set_bucket_columns(descriptor.bucket_columns.clone())
|
||||
.set_serde_info(descriptor.serde_info.clone())
|
||||
.set_sort_columns(descriptor.sort_columns.clone())
|
||||
.set_parameters(descriptor.parameters.clone())
|
||||
.set_additional_locations(descriptor.additional_locations.clone())
|
||||
.set_columns(Some([cols.clone(), delta_columns_for_glue].concat()))
|
||||
.build();
|
||||
debug!("Computed descriptor: {descriptor:?}");
|
||||
return Some(descriptor);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Utility function to attempt to fetch a table from Glue with the provided [aws_sdk_glue::Client]
|
||||
async fn fetch_table(database: &str, table: &str, client: &aws_sdk_glue::Client) -> Option<Table> {
|
||||
let response = client
|
||||
.get_table()
|
||||
.database_name(database)
|
||||
.name(table)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
match response {
|
||||
Ok(output) => {
|
||||
return output.table;
|
||||
}
|
||||
Err(err) => match err {
|
||||
aws_sdk_glue::error::SdkError::ServiceError(e) => match e.err() {
|
||||
aws_sdk_glue::operation::get_table::GetTableError::EntityNotFoundException(_e) => {
|
||||
return None;
|
||||
}
|
||||
inner => {
|
||||
error!("Unexpected error while looking up {database}.{table}: {inner:?}");
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
error!("Unexpected fail oh no");
|
||||
}
|
||||
},
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return the primitive type mapping for glue
|
||||
///
|
||||
/// This is a separate function to accommodate reuse for arrays and structs
|
||||
fn glue_type_for(delta_type_name: &str) -> String {
|
||||
match delta_type_name {
|
||||
"integer" => "int".into(),
|
||||
"long" => "bigint".into(),
|
||||
"short" => "smallint".into(),
|
||||
"byte" => "char".into(),
|
||||
"date" => "string".into(),
|
||||
_ => delta_type_name.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a given [SchemaDataType] to the Glue type equivalent
|
||||
///
|
||||
/// This calls itself recursively to handle both primitives and structs/maps
|
||||
fn to_glue_type(data_type: &SchemaDataType) -> String {
|
||||
match data_type {
|
||||
SchemaDataType::primitive(name) => glue_type_for(name),
|
||||
SchemaDataType::r#struct(s) => {
|
||||
format!(
|
||||
"struct<{}>",
|
||||
s.get_fields()
|
||||
.iter()
|
||||
.map(|f| format!("{}:{}", f.get_name(), to_glue_type(f.get_type())))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",")
|
||||
)
|
||||
}
|
||||
SchemaDataType::map(m) => {
|
||||
format!(
|
||||
"map<{},{}>",
|
||||
to_glue_type(m.get_key_type()),
|
||||
to_glue_type(m.get_value_type())
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
// Default type which hopefully doesn't cause too many issues with unknown types
|
||||
"string".into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate the [Column] for Glue [aws_sdk_glue::types::StorageDescriptor] based on the provided
|
||||
/// [DeltaTable]
|
||||
fn glue_columns_for(table: &DeltaTable) -> Vec<Column> {
|
||||
if let Ok(schema) = table.get_schema() {
|
||||
return schema
|
||||
.get_fields()
|
||||
.iter()
|
||||
.map(|field| {
|
||||
Column::builder()
|
||||
.name(field.get_name())
|
||||
.r#type(to_glue_type(field.get_type()))
|
||||
.build()
|
||||
.expect("Failed to build column from Delta schema!")
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
vec![]
|
||||
}
|
||||
|
||||
///
|
||||
/// Use the `GLUE_TABLE_REGEX` to identify whether the key is a consistent with a table
|
||||
///
|
||||
fn extract_table_from_key(key: &str, regex: &Regex) -> Option<GlueTable> {
|
||||
if let Some(captured) = regex.captures(key) {
|
||||
let database = captured.name("database");
|
||||
let table = captured.name("table");
|
||||
if database.is_some() && table.is_some() {
|
||||
let database = database.unwrap().as_str();
|
||||
let name = table.unwrap().as_str();
|
||||
return Some(GlueTable {
|
||||
database: database.into(),
|
||||
name: name.into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Match the key against a regular expression of what a delta table looks like
|
||||
/// This will make it easier to figure out whether the modified bucket is a Delta Table so we can
|
||||
/// load it and look for it in the Glue catalog
|
||||
///
|
||||
/// Returns the key name for the delta table so you can open it
|
||||
fn key_is_delta_table(key: &str) -> Option<String> {
|
||||
let regex = Regex::new(r#"(?P<root>.*)\/_delta_log\/.*\.json"#)
|
||||
.expect("Failed to compile the regular expression");
|
||||
|
||||
if let Some(caps) = regex.captures(key) {
|
||||
if let Some(root) = caps.name("root") {
|
||||
return Some(root.as_str().into());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_glue_columns() {
|
||||
let table = "../../tests/data/hive/deltatbl-non-partitioned";
|
||||
let table = deltalake::open_table(&table)
|
||||
.await
|
||||
.expect("Failed to open table");
|
||||
|
||||
let columns = glue_columns_for(&table);
|
||||
assert_eq!(columns.len(), 2);
|
||||
assert_eq!(columns[0].name, "c1");
|
||||
assert_eq!(columns[1].name, "c2");
|
||||
}
|
||||
|
||||
/// This function is mostly used as a manual integration test since there are a lot of moving
|
||||
/// parts to validate with AWS Glue Data Catalog and it's very very eventually consistent
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn test_basic_integration() {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
// disable printing the name of the module in every log line.
|
||||
.with_target(false)
|
||||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
let buf = r#"
|
||||
{
|
||||
"Records": [
|
||||
{
|
||||
"eventVersion": "2.0",
|
||||
"eventSource": "aws:s3",
|
||||
"awsRegion": "us-east-1",
|
||||
"eventTime": "1970-01-01T00:00:00.000Z",
|
||||
"eventName": "ObjectCreated:Put",
|
||||
"userIdentity": {
|
||||
"principalId": "EXAMPLE"
|
||||
},
|
||||
"requestParameters": {
|
||||
"sourceIPAddress": "127.0.0.1"
|
||||
},
|
||||
"responseElements": {
|
||||
"x-amz-request-id": "EXAMPLE123456789",
|
||||
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
|
||||
},
|
||||
"s3": {
|
||||
"s3SchemaVersion": "1.0",
|
||||
"configurationId": "testConfigRule",
|
||||
"bucket": {
|
||||
"name": "oxbow-simple",
|
||||
"ownerIdentity": {
|
||||
"principalId": "EXAMPLE"
|
||||
},
|
||||
"arn": "arn:aws:s3:::example-bucket"
|
||||
},
|
||||
"object": {
|
||||
"key": "evolution/cincotest/_delta_log/0000.json",
|
||||
"size": 1024,
|
||||
"eTag": "0123456789abcdef0123456789abcdef",
|
||||
"sequencer": "0A1B2C3D4E5F678901"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
"#;
|
||||
|
||||
use aws_lambda_events::sqs::SqsMessage;
|
||||
let message: SqsMessage = SqsMessage {
|
||||
body: Some(buf.into()),
|
||||
..Default::default()
|
||||
};
|
||||
let event = SqsEvent {
|
||||
records: vec![message],
|
||||
};
|
||||
let lambda_event = LambdaEvent::new(event, lambda_runtime::Context::default());
|
||||
std::env::set_var(
|
||||
GLUE_REGEX_ENV,
|
||||
r#"(?P<database>\w+)\/(?P<table>\w+)\/_delta_log\/.*.json"#,
|
||||
);
|
||||
let result = function_handler(lambda_event).await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_invalid_table() {
|
||||
let key = "data-lake/foo.parquet";
|
||||
let pattern = Regex::new("").expect("Failed to compile regex");
|
||||
assert_eq!(None, extract_table_from_key(key, &pattern));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_valid_table() {
|
||||
let key = r#"data-lake/test/my_big_long_table_name/_delta_log/0000000000.json"#;
|
||||
let pattern = r#"data-lake\/(?P<database>\w+)\/(?P<table>\w+\.?\w+)\/_delta_log\/.*.json"#;
|
||||
let pattern = Regex::new(pattern).expect("Failed to compile regex");
|
||||
let result = extract_table_from_key(key, &pattern);
|
||||
assert!(result.is_some());
|
||||
|
||||
let info = result.unwrap();
|
||||
assert_eq!(info.database, "test");
|
||||
assert_eq!(info.name, "my_big_long_table_name");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unrelated_key() {
|
||||
let key = "beep/some/key/that/don/match.parquet";
|
||||
let result = key_is_delta_table(key);
|
||||
assert!(result.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_matching_key() {
|
||||
let key = "data-lake/some-other-prefix/test/my_big_long_table_name/_delta_log/00000000000000000000.json";
|
||||
let result = key_is_delta_table(key);
|
||||
assert!(result.is_some());
|
||||
if let Some(root) = result {
|
||||
assert_eq!(
|
||||
root,
|
||||
"data-lake/some-other-prefix/test/my_big_long_table_name"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -78,7 +78,7 @@ mod tests {
|
|||
let buf = r#"{
|
||||
"body" : "{\"key\" : \"value\"}"
|
||||
}"#;
|
||||
let message: SqsMessage = serde_json::from_str(&buf).expect("Failed to deserialize");
|
||||
let message: SqsMessage = serde_json::from_str(buf).expect("Failed to deserialize");
|
||||
let messages = vec![
|
||||
message.clone(),
|
||||
message.clone(),
|
||||
|
|
Loading…
Reference in New Issue