Merge pull request #19 from buoyant-data/webhook-support

Add a webhook lambda for appending JSONL
This commit is contained in:
R Tyler Croy 2024-03-12 11:11:41 -07:00 committed by GitHub
commit f163abb52e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 286 additions and 69 deletions

View File

@ -1,13 +1,13 @@
[workspace]
members = [
"cli",
"cli",
"crates/*",
"lambdas/*",
]
resolver = "2"
[workspace.package]
version = "0.9.1"
version = "0.10.0"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta"]
homepage = "https://github.com/buoyant-data/oxbow"

View File

@ -10,7 +10,10 @@ deltalake = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
dynamodb_lock = "0.6.1"
futures = "0.3.29"
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]}
rusoto_credential = { version = "0.47"}
[dev-dependencies]
chrono = "0.4.31"

View File

@ -16,6 +16,8 @@ use url::Url;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
pub mod lock;
/**
* convert is the main function to be called by the CLI or other "one shot" executors which just
* need to take a given location and convert it all at once

80
crates/oxbow/src/lock.rs Normal file
View File

@ -0,0 +1,80 @@
///
/// THe lock module contains some simple helpers for handling table locks in DynamoDB. This is
/// something required of deltalake 0.16.x and earlier.
///
use dynamodb_lock::Region;
use tracing::log::*;
use std::collections::HashMap;
///
///Wrapper aroudn [deltalake::open_table] which will open the table with the appropriate storage
///options needed for locking
pub async fn open_table(table_uri: &str) -> deltalake::DeltaResult<deltalake::DeltaTable> {
deltalake::open_table_with_storage_options(&table_uri, storage_options(table_uri)).await
}
/// Default storage options for using with `deltalake` calls
pub fn storage_options(table_uri: &str) -> HashMap<String, String> {
let mut storage_options: HashMap<String, String> = HashMap::default();
// Ensure that the DeltaTable we get back uses the table-name as a partition key
// when locking in DynamoDb: <https://github.com/buoyant-data/oxbow/issues/9>
//
// Without this setting each Lambda invocation will use the same default key `delta-rs`
// when locking in DynamoDb.
storage_options.insert(
"DYNAMO_LOCK_PARTITION_KEY_VALUE".into(),
format!("{table_uri}:delta"),
);
storage_options
}
///
/// Return a properly configured lock for the given `table_name`
pub fn client_for(table_name: &str) -> dynamodb_lock::DynamoDbLockClient {
let lock_options = dynamodb_lock::DynamoDbOptions {
lease_duration: 60,
partition_key_value: table_name.into(),
..Default::default()
};
dynamodb_lock::DynamoDbLockClient::for_region(Region::default()).with_options(lock_options)
}
/**
* Simple helper function to acquire a lock with DynamoDb
*
* This function will panic in the bizarre condition where the lock has expired upon acquisition
*/
pub async fn acquire(
key: &str,
lock_client: &dynamodb_lock::DynamoDbLockClient,
) -> dynamodb_lock::LockItem {
debug!("Attempting to retrieve a lock for {key:?}");
let lock = lock_client
.acquire_lock(Some(key))
.await
.expect("Failed to acquire a lock");
debug!("Lock acquired");
if lock.acquired_expired_lock {
error!("Somehow oxbow acquired an already expired lock, failing");
panic!("Failing in hopes of acquiring a fresh lock");
}
lock
}
/**
* Helper function to release a given [dynamodb_lock::LockItem].
*
* Will return true if the lock was successfully released.
*/
pub async fn release(
lock: dynamodb_lock::LockItem,
lock_client: &dynamodb_lock::DynamoDbLockClient,
) -> bool {
if let Err(e) = lock_client.release_lock(&lock).await {
error!("Failing to properly release the lock {:?}: {:?}", lock, e);
return false;
}
true
}

View File

@ -4,7 +4,6 @@
use aws_lambda_events::sqs::SqsEvent;
use deltalake::DeltaTableError;
use dynamodb_lock::Region;
use lambda_runtime::{service_fn, Error, LambdaEvent};
use serde_json::Value;
use tracing::log::*;
@ -12,8 +11,6 @@ use url::Url;
use oxbow_lambda_shared::*;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tracing_subscriber::fmt()
@ -55,27 +52,10 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
let table_mods = by_table
.get(table_name)
.expect("Failed to get the files for a table, impossible!");
let mut storage_options: HashMap<String, String> = HashMap::default();
// Ensure that the DeltaTable we get back uses the table-name as a partition key
// when locking in DynamoDb: <https://github.com/buoyant-data/oxbow/issues/9>
//
// Without this setting each Lambda invocation will use the same default key `delta-rs`
// when locking in DynamoDb.
storage_options.insert(
"DYNAMO_LOCK_PARTITION_KEY_VALUE".into(),
format!("{table_name}:delta"),
);
let lock_options = dynamodb_lock::DynamoDbOptions {
lease_duration: 60,
partition_key_value: table_name.into(),
..Default::default()
};
let lock_client = dynamodb_lock::DynamoDbLockClient::for_region(Region::default())
.with_options(lock_options);
let lock = acquire_lock(table_name, &lock_client).await;
let lock_client = oxbow::lock::client_for(&table_name);
let lock = oxbow::lock::acquire(table_name, &lock_client).await;
match deltalake::open_table_with_storage_options(&table_name, storage_options.clone()).await
{
match oxbow::lock::open_table(&table_name).await {
Ok(mut table) => {
info!("Opened table to append: {:?}", table);
@ -104,7 +84,7 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
}
Err(err) => {
error!("Failed to append to the table {}: {:?}", location, err);
let _ = release_lock(lock, &lock_client).await;
let _ = oxbow::lock::release(lock, &lock_client).await;
return Err(Box::new(err));
}
}
@ -127,7 +107,7 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
"Failed to create removes on the table {}: {:?}",
location, err
);
let _ = release_lock(lock, &lock_client).await;
let _ = oxbow::lock::release(lock, &lock_client).await;
return Err(Box::new(err));
}
}
@ -136,7 +116,9 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
Err(DeltaTableError::NotATable(_e)) => {
// create the table with our objects
info!("Creating new Delta table at: {location}");
let table = oxbow::convert(table_name, Some(storage_options)).await;
let table =
oxbow::convert(table_name, Some(oxbow::lock::storage_options(&table_name)))
.await;
info!("Created table at: {location}");
if table.is_err() {
@ -146,56 +128,17 @@ async fn func<'a>(event: LambdaEvent<SqsEvent>) -> Result<Value, Error> {
}
}
Err(source) => {
let _ = release_lock(lock, &lock_client).await;
let _ = oxbow::lock::release(lock, &lock_client).await;
error!("Failed to open the Delta table for some reason: {source:?}");
return Err(Box::new(source));
}
}
let _ = release_lock(lock, &lock_client).await;
let _ = oxbow::lock::release(lock, &lock_client).await;
}
Ok("[]".into())
}
/**
* Simple helper function to acquire a lock with DynamoDb
*
* This function will panic in the bizarre condition where the lock has expired upon acquisition
*/
async fn acquire_lock(
key: &str,
lock_client: &dynamodb_lock::DynamoDbLockClient,
) -> dynamodb_lock::LockItem {
debug!("Attempting to retrieve a lock for {key:?}");
let lock = lock_client
.acquire_lock(Some(key))
.await
.expect("Failed to acquire a lock");
debug!("Lock acquired");
if lock.acquired_expired_lock {
error!("Somehow oxbow acquired an already expired lock, failing");
panic!("Failing in hopes of acquiring a fresh lock");
}
lock
}
/**
* Helper function to release a given [dynamodb_lock::LockItem].
*
* Will return true if the lock was successfully released.
*/
async fn release_lock(
lock: dynamodb_lock::LockItem,
lock_client: &dynamodb_lock::DynamoDbLockClient,
) -> bool {
if let Err(e) = lock_client.release_lock(&lock).await {
error!("Failing to properly release the lock {:?}: {:?}", lock, e);
return false;
}
true
}
#[cfg(test)]
mod tests {}

1
lambdas/webhook/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,20 @@
[package]
name = "webhook"
version = "0.1.0"
edition.workspace = true
[dependencies]
http = "1.1.0"
lambda_http = "0.10.0"
oxbow = { path = "../../crates/oxbow" }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
deltalake = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
arrow-json = "50.0.0"
arrow-schema = "50.0.0"

View File

@ -0,0 +1,39 @@
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro
= Webhook Lmabda
The webhook lambda can help receive JSONL formatted webhook payloads and conver
those into append-only writes to pre-configured Delta tables. It is meant to be
deployed once per Delta table, that is to say that if you need to do ingestion
for multiple Delta tables the best option is to deploy multiple copies of the
Webhook lambda, each with a different configuration.
toc::[]
== Environment Variables
|===
| Name | Default Value | Notes
| `RUST_LOG`
| `error`
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
| `PRESHARED_WEBHOOK_SECRET`
| _null_
| **Required** if this is not set the Lambda will not run properly!
| `DELTA_TABLE_URI`
| _null_
| **Required** the S3 location of the Delta table which should be appended to.
|===

129
lambdas/webhook/src/main.rs Normal file
View File

@ -0,0 +1,129 @@
use deltalake::arrow::datatypes::Schema as ArrowSchema;
///
/// The webhook lambda can receive JSONL formatted events and append them to a pre-configured Delta
/// table
use deltalake::arrow::json::reader::ReaderBuilder;
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
use deltalake::DeltaTable;
use lambda_http::{run, service_fn, tracing, Body, Error, Request, Response};
use tracing::log::*;
use std::io::Cursor;
/// Function responsible for opening the table and actually appending values to it
async fn append_values(mut table: DeltaTable, jsonl: &str) -> Result<DeltaTable, Error> {
let cursor = Cursor::new(jsonl);
let schema = table.get_schema()?;
let schema = ArrowSchema::try_from(schema)?;
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
let mut writer = RecordBatchWriter::for_table(&table)?;
while let Some(Ok(batch)) = reader.next() {
println!("batch: {batch:?}");
writer.write(batch).await?;
}
writer.flush_and_commit(&mut table).await?;
Ok(table)
}
/// Main function handler which does the basics around HTTP management for the Lambda
async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
let table_uri = std::env::var("DELTA_TABLE_URI").expect("Failed to get `DELTA_TABLE_URI`");
let secret = std::env::var("PRESHARED_WEBHOOK_SECRET")
.expect("Failed to get `PRESHARED_WEBHOOK_SECRET`");
if secret != event.headers()["Authorization"] {
return Ok(Response::builder()
.status(401)
.body("Invalid authorization".into())
.map_err(Box::new)?);
}
let response = match event.method() {
&http::method::Method::POST => {
debug!("Processing POST");
let body = event.body();
match body {
lambda_http::Body::Text(buf) => {
debug!("Deserializing body text into a Value");
let table = oxbow::lock::open_table(&table_uri).await?;
append_values(table, &buf).await?;
}
others => {
warn!("Unsupported body payload type: {others:?}");
}
}
Response::builder()
.status(200)
.header("content-type", "application/json")
.body("{}".into())
.map_err(Box::new)?
}
others => {
warn!("Received method I cannot support: {others:?}");
Response::builder()
.status(400)
.body("I ony speak POST".into())
.map_err(Box::new)?
}
};
Ok(response)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
let _ = std::env::var("DELTA_TABLE_URI")
.expect("THe `DELTA_TABLE_URI` environment must be set in the environment");
info!("Starting webhook lambda handler");
run(service_fn(function_handler)).await
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_append_values() -> deltalake::DeltaResult<()> {
use deltalake::schema::SchemaDataType;
use deltalake::*;
let table = DeltaOps::try_from_uri("memory://")
.await?
.create()
.with_table_name("test")
.with_column(
"id",
SchemaDataType::primitive("integer".into()),
true,
None,
)
.with_column(
"name",
SchemaDataType::primitive("string".into()),
true,
None,
)
.await?;
let jsonl = r#"
{"id" : 0, "name" : "Ben"}
{"id" : 1, "name" : "Chris"}
"#;
let table = append_values(table, jsonl)
.await
.expect("Failed to do nothing");
assert_eq!(table.version(), 1);
Ok(())
}
}