Compare commits

...

3 Commits

Author SHA1 Message Date
R Tyler Croy 6d1bc34b86 Introduce the bulk of sqs-ingest with some refactorings for the webhook
The webhook and sqs-ingest lambdas both effectively need to take strings
of data and append them to a configured Delta Lake table, so the shared
code comes "up" into the oxbow crate
2024-04-08 08:40:05 -07:00
R Tyler Croy 84eba62314 Create the scaffolding for sqs-ingest
Similar to kafka-delta-ingest, but just for SQS!
2024-04-05 15:43:01 -07:00
R Tyler Croy a39e49697a Update the webhook documentation with important settings for use 2024-04-05 15:42:43 -07:00
13 changed files with 359 additions and 154 deletions

View File

@ -7,12 +7,12 @@ members = [
resolver = "2"
[workspace.package]
version = "0.10.3"
version = "0.11.0"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta"]
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
homepage = "https://github.com/buoyant-data/oxbow"
repository = "https://github.com/buoyant-data/oxbow"
description = "Tooling for converting directories of Apache Parquet files into Delta Lake tables"
description = "Toolbox for converting or generating Delta Lake tables with AWS Lambda and more"
license-file = "LICENSE.txt"
[workspace.dependencies]

View File

@ -6,6 +6,7 @@ repository.workspace = true
homepage.workspace = true
[dependencies]
chrono = { workpace = true }
deltalake = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }

View File

@ -1,6 +1,6 @@
/*
* The lib module contains the business logic of oxbow, regardless of the interface implementation
*/
///
/// The lib module contains the business logic of oxbow, regardless of the interface implementation
///
use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::parquet::arrow::async_reader::{
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
pub mod lock;
pub mod write;
/**
* convert is the main function to be called by the CLI or other "one shot" executors which just
@ -333,7 +334,7 @@ pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
* This can be useful to find the smallest possible parquet file to load from the set in order to
* discern schema information
*/
fn find_smallest_file(files: &Vec<ObjectMeta>) -> Option<&ObjectMeta> {
fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
if files.is_empty() {
return None;
}

147
crates/oxbow/src/write.rs Normal file
View File

@ -0,0 +1,147 @@
use chrono::prelude::*;
use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::arrow::json::reader::ReaderBuilder;
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
use deltalake::{DeltaResult, DeltaTable};
use std::io::Cursor;
use std::sync::Arc;
use tracing::log::*;
/// Function responsible for appending values to an opened [DeltaTable]
pub async fn append_values(mut table: DeltaTable, jsonl: &str) -> DeltaResult<DeltaTable> {
let cursor = Cursor::new(jsonl);
let schema = table.get_schema()?;
let schema = ArrowSchema::try_from(schema)?;
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
let mut writer = RecordBatchWriter::for_table(&table)?;
while let Some(Ok(batch)) = reader.next() {
let batch = augment_with_ds(&batch);
debug!("Augmented: {batch:?}");
writer.write(batch).await?;
}
let version = writer.flush_and_commit(&mut table).await?;
info!("Successfully flushed v{version} to Delta table");
if version % 10 == 0 {
// Reload the table to make sure we have the latest version to checkpoint
let _ = table.load().await;
if table.version() == version {
match deltalake::checkpoints::create_checkpoint(&table).await {
Ok(_) => info!("Successfully created checkpoint"),
Err(e) => {
error!("Failed to create checkpoint for {table:?}: {e:?}")
}
}
} else {
error!(
"The table was reloaded to create a checkpoint but a new version already exists!"
);
}
}
Ok(table)
}
///
/// Augment the given [RecordBatch] with another column that represents `ds`, treated
/// as the date-stampe, e.g. `2024-01-01`.
///
fn augment_with_ds(batch: &RecordBatch) -> RecordBatch {
use deltalake::arrow::array::{Array, StringArray};
// If the schema doesn't have a `ds` then don't try to add one
if batch.column_by_name("ds").is_none() {
info!("The schema of the destination table doesn't have `ds` so not adding the value");
return batch.clone();
}
let mut ds = vec![];
let now = Utc::now();
let datestamp = now.format("%Y-%m-%d").to_string();
for _index in 0..batch.num_rows() {
ds.push(datestamp.clone());
}
let mut columns: Vec<Arc<dyn Array>> = vec![];
for field in &batch.schema().fields {
if field.name() != "ds" {
if let Some(column) = batch.column_by_name(field.name()) {
columns.push(column.clone());
}
} else {
columns.push(Arc::new(StringArray::from(ds.clone())));
}
}
RecordBatch::try_new(batch.schema(), columns).expect("Failed to transpose `ds` onto batch")
}
#[cfg(test)]
mod tests {
use super::*;
use deltalake::*;
async fn setup_test_table() -> DeltaResult<DeltaTable> {
DeltaOps::try_from_uri("memory://")
.await?
.create()
.with_table_name("test")
.with_column(
"id",
SchemaDataType::primitive("integer".into()),
true,
None,
)
.with_column("ds", SchemaDataType::primitive("string".into()), true, None)
.with_column(
"name",
SchemaDataType::primitive("string".into()),
true,
None,
)
.await
}
#[tokio::test]
async fn test_append_values() -> DeltaResult<()> {
let table = setup_test_table().await?;
let jsonl = r#"
{"id" : 0, "name" : "Ben"}
{"id" : 1, "name" : "Chris"}
"#;
let table = append_values(table, jsonl)
.await
.expect("Failed to do nothing");
assert_eq!(table.version(), 1);
Ok(())
}
#[tokio::test]
async fn test_augment_with_ds() -> DeltaResult<()> {
let table = setup_test_table().await?;
let jsonl = r#"
{"id" : 0, "name" : "Ben"}
{"id" : 1, "name" : "Chris"}
"#;
let cursor = Cursor::new(jsonl);
let schema = table.get_schema()?;
let schema = ArrowSchema::try_from(schema)?;
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
while let Some(Ok(batch)) = reader.next() {
let batch = augment_with_ds(&batch);
println!("{:?}", batch);
assert_ne!(None, batch.column_by_name("ds"));
}
Ok(())
}
}

View File

@ -19,7 +19,7 @@ oxbow = { path = "../../crates/oxbow" }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
dynamodb_lock = "0.6.1"
lambda_runtime = { version = "0.8" }
lambda_runtime = { version = "0.11" }
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]}
rusoto_credential = { version = "0.47"}
rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"] }

1
lambdas/sqs-ingest/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,21 @@
[package]
name = "sqs-ingest"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true
[dependencies]
lambda_runtime = { version = "0.11" }
oxbow = { path = "../../crates/oxbow" }
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
anyhow = { workspace = true }
aws_lambda_events = { workspace = true, default-features = false, features = ["sqs"] }
chrono = { workspace = true }
deltalake = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -0,0 +1,48 @@
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro
= SQS Ingest
The `sqs-ingest` is the AWS SQS equivalent of
link:https://github.com/delta-io/kafka-delta-ingest[kafka-delta-ingest] insofar
that it allows the ingestion of JSON data from SQS and translatest each record
into a row appended onto a link:https://delta.io[Delta Lake] table.
toc::[]
for simplicity's sake this Lambda is intended to only work with a single Delta
table. For ingestion to multiple Delta tables, deploy different instances of
this Lambda triggered by different SQS queues.
image::diagram.png[Flow]
== Environment Variables
|===
| Name | Default Value | Notes
| `RUST_LOG`
| `error`
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
| `DELTA_TABLE_URI`
|
| Set to the `s3://` URL of the Delta table which should be appended
| `AWS_S3_LOCKING_PROVIDER`
|
| Set to `dynamodb` to enable safe concurrent writes to the table
| `DYNAMO_LOCK_TABLE_NAME`
|
| Set to the DynamoDB table used for locking, required for safe concurrent writes.
|===

View File

@ -0,0 +1,18 @@
digraph {
label="SQS Ingest";
labelloc=t;
node[shape=rect];
app[label="Application", style=dashed];
{
rank=same;
rankdir=LR;
sqs[shape=cds, label="SQS queue"];
lambda[label="sqs-ingest Lambda"];
delta[shape=cylinder, label="Delta table"];
}
app -> sqs -> lambda -> delta;
}
// vim: ft=dot sw=2 ts=2 et

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

View File

@ -0,0 +1,96 @@
///
/// The sqs-ingest function is for ingesting simple JSON payloads from SQS
/// and appending them to the configured Delta table
///
use aws_lambda_events::event::sqs::{SqsEvent, SqsMessage};
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use tracing::log::*;
use oxbow::write::*;
use std::env;
/// This is the primary invocation point for the lambda and should do the heavy lifting
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let table_uri = std::env::var("DELTA_TABLE_URI").expect("Failed to get `DELTA_TABLE_URI`");
debug!("payload received: {:?}", event.payload.records);
let jsonl = extract_json_from_records(&event.payload.records);
debug!("jsonl generated: {jsonl}");
if !jsonl.is_empty() {
let table = oxbow::lock::open_table(&table_uri).await?;
match append_values(table, &jsonl).await {
Ok(_) => {}
Err(e) => {
error!("Failed to append the values to configured Delta table: {e:?}");
return Err(Box::new(e));
}
}
} else {
error!("An empty payload was extracted which doesn't seem right!");
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
let _ =
env::var("DELTA_TABLE_URI").expect("The `DELTA_TABLE_URI` must be set in the environment");
match env::var("DYNAMO_LOCK_TABLE_NAME") {
Ok(_) => {}
Err(_) => {
warn!("sqs-ingest SHOULD have `DYNAMO_LOCK_TABLE_NAME` set to a valid name, and should have AWS_S3_LOCKING_PROVIDER=dynamodb set so that concurrent writes can be performed safely.");
}
}
info!("Starting sqs-ingest");
run(service_fn(function_handler)).await
}
/// Convert the `body` payloads from [SqsMessage] entities into JSONL
/// which can be passed into the [oxbow::write::append_values] function
fn extract_json_from_records(records: &[SqsMessage]) -> String {
records
.iter()
.filter(|m| m.body.is_some())
.map(|m| m.body.as_ref().unwrap().clone())
.collect::<Vec<String>>()
.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_data() {
let buf = r#"{
"body" : "{\"key\" : \"value\"}"
}"#;
let message: SqsMessage = serde_json::from_str(&buf).expect("Failed to deserialize");
let messages = vec![
message.clone(),
message.clone(),
message.clone(),
SqsMessage::default(),
];
let jsonl = extract_json_from_records(&messages);
let expected = r#"{"key" : "value"}
{"key" : "value"}
{"key" : "value"}"#;
assert_eq!(expected, jsonl);
}
}

View File

@ -35,5 +35,12 @@ toc::[]
| _null_
| **Required** the S3 location of the Delta table which should be appended to.
| `AWS_S3_LOCKING_PROVIDER`
|
| Set to `dynamodb` to enable safe concurrent writes to the table
| `DYNAMO_LOCK_TABLE_NAME`
|
| Set to the DynamoDB table used for locking, required for safe concurrent writes.
|===

View File

@ -1,54 +1,10 @@
///
/// The webhook lambda can receive JSONL formatted events and append them to a pre-configured Delta
/// table
use chrono::prelude::*;
use deltalake::arrow::array::RecordBatch;
use deltalake::arrow::datatypes::Schema as ArrowSchema;
use deltalake::arrow::json::reader::ReaderBuilder;
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
use deltalake::DeltaTable;
use lambda_http::{run, service_fn, tracing, Body, Error, Request, Response};
use tracing::log::*;
use std::io::Cursor;
use std::sync::Arc;
/// Function responsible for opening the table and actually appending values to it
async fn append_values(mut table: DeltaTable, jsonl: &str) -> Result<DeltaTable, Error> {
let cursor = Cursor::new(jsonl);
let schema = table.get_schema()?;
let schema = ArrowSchema::try_from(schema)?;
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
let mut writer = RecordBatchWriter::for_table(&table)?;
while let Some(Ok(batch)) = reader.next() {
let batch = augment_with_ds(&batch);
debug!("Augmented: {batch:?}");
writer.write(batch).await?;
}
let version = writer.flush_and_commit(&mut table).await?;
info!("Successfully flushed v{version} to Delta table");
if version % 10 == 0 {
// Reload the table to make sure we have the latest version to checkpoint
let _ = table.load().await;
if table.version() == version {
match deltalake::checkpoints::create_checkpoint(&table).await {
Ok(_) => info!("Successfully created checkpoint"),
Err(e) => {
error!("Failed to create checkpoint for {table:?}: {e:?}")
}
}
} else {
error!(
"The table was reloaded to create a checkpoint but a new version already exists!"
);
}
}
Ok(table)
}
use oxbow::write::*;
/// Main function handler which does the basics around HTTP management for the Lambda
async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
@ -76,7 +32,7 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
Ok(_) => {}
Err(e) => {
error!("Failed to append the values to configured Delta table: {e:?}");
return Err(e);
return Err(Box::new(e));
}
}
}
@ -112,106 +68,15 @@ async fn main() -> Result<(), Error> {
.init();
let _ = std::env::var("DELTA_TABLE_URI")
.expect("THe `DELTA_TABLE_URI` environment must be set in the environment");
info!("Starting webhook lambda handler");
match std::env::var("DYNAMO_LOCK_TABLE_NAME") {
Ok(_) => {}
Err(_) => {
warn!("sqs-ingest SHOULD have `DYNAMO_LOCK_TABLE_NAME` set to a valid name, and should have AWS_S3_LOCKING_PROVIDER=dynamodb set so that concurrent writes can be performed safely.");
}
}
run(service_fn(function_handler)).await
}
///
/// Augment the given [RecordBatch] with another column that represents `ds`, treated
/// as the date-stampe, e.g. `2024-01-01`.
///
fn augment_with_ds(batch: &RecordBatch) -> RecordBatch {
use deltalake::arrow::array::{Array, StringArray};
// If the schema doesn't have a `ds` then don't try to add one
if batch.column_by_name("ds").is_none() {
info!("The schema of the destination table doesn't have `ds` so not adding the value");
return batch.clone();
}
let mut ds = vec![];
let now = Utc::now();
let datestamp = now.format("%Y-%m-%d").to_string();
for _index in 0..batch.num_rows() {
ds.push(datestamp.clone());
}
let mut columns: Vec<Arc<dyn Array>> = vec![];
for field in &batch.schema().fields {
if field.name() != "ds" {
if let Some(column) = batch.column_by_name(field.name()) {
columns.push(column.clone());
}
} else {
columns.push(Arc::new(StringArray::from(ds.clone())));
}
}
RecordBatch::try_new(batch.schema(), columns).expect("Failed to transpose `ds` onto batch")
}
#[cfg(test)]
mod tests {
use super::*;
use deltalake::*;
async fn setup_test_table() -> DeltaResult<DeltaTable> {
DeltaOps::try_from_uri("memory://")
.await?
.create()
.with_table_name("test")
.with_column(
"id",
SchemaDataType::primitive("integer".into()),
true,
None,
)
.with_column("ds", SchemaDataType::primitive("string".into()), true, None)
.with_column(
"name",
SchemaDataType::primitive("string".into()),
true,
None,
)
.await
}
#[tokio::test]
async fn test_append_values() -> DeltaResult<()> {
let table = setup_test_table().await?;
let jsonl = r#"
{"id" : 0, "name" : "Ben"}
{"id" : 1, "name" : "Chris"}
"#;
let table = append_values(table, jsonl)
.await
.expect("Failed to do nothing");
assert_eq!(table.version(), 1);
Ok(())
}
#[tokio::test]
async fn test_augment_with_ds() -> DeltaResult<()> {
let table = setup_test_table().await?;
let jsonl = r#"
{"id" : 0, "name" : "Ben"}
{"id" : 1, "name" : "Chris"}
"#;
let cursor = Cursor::new(jsonl);
let schema = table.get_schema()?;
let schema = ArrowSchema::try_from(schema)?;
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
while let Some(Ok(batch)) = reader.next() {
let batch = augment_with_ds(&batch);
println!("{:?}", batch);
assert_ne!(None, batch.column_by_name("ds"));
}
Ok(())
}
}