Enable the group-events lambda to use UNWRAP_SNS_ENVELOPE like the others

This commit is contained in:
R Tyler Croy 2024-04-10 15:27:01 -07:00
parent 24edaba873
commit 795f095094
3 changed files with 23 additions and 2 deletions

View File

@ -7,7 +7,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.11.1"
version = "0.11.2"
edition = "2021"
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
homepage = "https://github.com/buoyant-data/oxbow"

View File

@ -14,3 +14,21 @@ its trigger, into FIFO messages with a specific group identifier for an SQS
FIFO destination. By grouping on the prefix (e.g.
`databases/external/dw_sync/`) the `oxbow-lambda` can then be triggered from
the SQS FIFO queue where concurrent function invocations will have a far lower likelihood for lock contention.
== Environment Variables
|===
| Name | Default Value | Notes
| `RUST_LOG`
| `error`
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
| `UNWRAP_SNS_ENVELOPE`
| _null_
| Should only be used if S3 Event Notifications are first going to SNS and then routing to SQS for Oxbow
|===

View File

@ -18,7 +18,10 @@ async fn func(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let client = aws_sdk_sqs::Client::new(&config);
debug!("Receiving event: {:?}", event);
let records = s3_from_sqs(event.payload)?;
let records = match std::env::var("UNWRAP_SNS_ENVELOPE") {
Ok(_) => s3_from_sns(event.payload)?,
Err(_) => s3_from_sqs(event.payload)?,
};
let segmented = segmented_by_prefix(&records)?;
debug!("Segmented into the following keys: {:?}", segmented.keys());