Restructure the workspace a bit more to pave the way for shared tooling

This commit is contained in:
R Tyler Croy 2023-11-24 15:09:42 -08:00
parent ff295e05af
commit f5b7c98cd0
10 changed files with 79 additions and 15 deletions

View File

@ -1,8 +1,8 @@
[workspace]
members = [
"cli",
"crates/*",
"lambdas/*",
"shared",
]
resolver = "2"

View File

@ -12,4 +12,4 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
gumdrop = "=0.8"
oxbow = { path = "../shared" }
oxbow = { path = "../crates/oxbow" }

View File

@ -362,7 +362,7 @@ mod tests {
#[tokio::test]
async fn discover_parquet_files_full_dir() {
let path = std::fs::canonicalize("../tests/data/hive/deltatbl-non-partitioned")
let path = std::fs::canonicalize("../../tests/data/hive/deltatbl-non-partitioned")
.expect("Failed to canonicalize");
let url = Url::from_file_path(path).expect("Failed to parse local path");
let store = object_store_for(&url, None);
@ -500,7 +500,7 @@ mod tests {
#[tokio::test]
async fn create_schema_for_partitioned_path() {
let (_tempdir, store) =
util::create_temp_path_with("../tests/data/hive/deltatbl-partitioned");
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
.expect("Failed to discover parquet files");
@ -529,7 +529,7 @@ mod tests {
#[tokio::test]
async fn create_table_without_files() {
let (_tempdir, store) =
util::create_temp_path_with("../tests/data/hive/deltatbl-partitioned");
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = vec![];
@ -556,7 +556,7 @@ mod tests {
#[tokio::test]
async fn test_avoid_discovering_checkpoints() {
let test_dir =
std::fs::canonicalize("../tests/data/hive/deltatbl-non-partitioned-with-checkpoint")
std::fs::canonicalize("../../tests/data/hive/deltatbl-non-partitioned-with-checkpoint")
.expect("Failed to canonicalize");
let url = Url::from_file_path(test_dir).expect("Failed to parse local path");
let store = object_store_for(&url, None);
@ -576,7 +576,7 @@ mod tests {
#[tokio::test]
async fn test_avoiding_adding_duplicate_files() {
let (_tempdir, store) =
util::create_temp_path_with("../tests/data/hive/deltatbl-partitioned");
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await
@ -611,7 +611,7 @@ mod tests {
#[tokio::test]
async fn test_avoid_appending_empty_list() {
let (_tempdir, store) =
util::create_temp_path_with("../tests/data/hive/deltatbl-partitioned");
util::create_temp_path_with("../../tests/data/hive/deltatbl-partitioned");
let files = discover_parquet_files(store.clone())
.await

1
lambdas/group-events/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

View File

@ -0,0 +1,18 @@
[package]
name = "group-events"
version.workspace = true
edition.workspace = true
repository.workspace = true
homepage.workspace = true
[dependencies]
anyhow = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
oxbow = { path = "../../crates/oxbow" }
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["sqs"]}
lambda_runtime = { version = "0.8" }

View File

@ -0,0 +1,16 @@
= Events grouping
This lambda function exists as an optional add-on for Oxbow when there are
large number of events being generated by S3 Event Notifications which share a
common prefix. For example, if an external system produces a substantial number
of `PutObject` calls for `.parquet` files with the prefix
`databases/external/dw_sync/` this can result in the `oxbow-lambda` being
inveoked with high amounts of concurrency for the same Delta table, leading to
lock contention in AWS S3.
THe events grouping function aims to address this issue by acting as an
intermediary on the events pipeline and translating unordered SQS messages from
its trigger, into FIFO messages with a specific group identifier for an SQS
FIFO destination. By grouping on the prefix (e.g.
`databases/external/dw_sync/`) the `oxbow-lambda` can then be triggered from
the SQS FIFO queue where concurrent function invocations will have a far lower likelihood for lock contention.

View File

@ -0,0 +1,34 @@
use aws_lambda_events::event::sqs::SqsEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::log::*;
/**
* The `func` function is the main Lambda entrypoint and handles receiving the messages in order to
* output them with a group IDs
*/
async fn func(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
debug!("Receiving event: {:?}", event);
// Extract some useful information from the request
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
info!("Starting events grouping lambda");
run(service_fn(func)).await
}
#[cfg(test)]
mod tests {
use super::*;
}

View File

@ -14,7 +14,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
oxbow = { path = "../../shared" }
oxbow = { path = "../../crates/oxbow" }
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["s3", "sqs"]}
dynamodb_lock = "0.6.1"
@ -25,6 +25,3 @@ rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]
serde = { version = "=1", features = ["rc"] }
serde_json = "=1"
urlencoding = "=2"
# lambda requirements
# <https://github.com/cargo-lambda/cargo-lambda/issues/566>
#cc = "=1.0.83"

View File

@ -1,7 +1,5 @@
/*
* The lambda module contains the Lambda specific implementation of oxbox.
*
* This can be compiled with the `lambda` feature
* The lambda crate contains the Lambda specific implementation of oxbow.
*/
use aws_lambda_events::s3::{S3Event, S3EventRecord, S3Object};