Introduce the meat of the group-events function

This helps in situations where singular tables are receiving a large influx of
events
This commit is contained in:
R Tyler Croy 2023-12-01 21:12:44 -08:00
parent 3e27d1c014
commit 45584233ba
6 changed files with 224 additions and 84 deletions

View File

@ -3,12 +3,13 @@
help: ## Show this help
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
.PHONY: all build build-release check test clean
.PHONY: all build build-release check test clean deployment
all: check build test ## Perform all the checks builds and testing
check: ## Ensure that the crate meets the basic formatting and structure
cargo fmt --check
cargo clippy
(cd deployment && terraform fmt -check)
build: ## Build the crate with each set of features
cargo build
@ -16,6 +17,9 @@ build: ## Build the crate with each set of features
build-release: check test ## Build the release versions of Lambdas
cargo lambda build --release --output-format zip
deployment: check ## Deploy the examples
(cd deployment && terraform apply)
test: ## Run the crate's tests with each set of features
cargo test

129
deployment/advanced.tf Normal file
View File

@ -0,0 +1,129 @@
# This terraform file implements a more advanced pattern for Oxbow which
# involves multiple queues to reduce lock contention with high traffic S3
# buckets
#
# Unlike the Simple example, the flow for events is slightly different for the
# advanced use-case:
#
# S3 Event Notifications -> SQS -> group-events -> SQS FIFO -> oxbow
resource "aws_s3_bucket" "parquets-advanced" {
bucket = "oxbow-advanced"
}
resource "aws_s3_bucket_notification" "advanced-bucket-notifications" {
bucket = aws_s3_bucket.parquets-advanced.id
queue {
queue_arn = aws_sqs_queue.group-events.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".parquet"
}
depends_on = [aws_lambda_permission.advanced-allow-bucket]
}
resource "aws_lambda_function" "oxbow-advanced" {
description = "An advanced lambda for converting parquet files to delta tables"
filename = "../target/lambda/oxbow-lambda/bootstrap.zip"
function_name = "oxbow-advanced"
role = aws_iam_role.iam_for_lambda.arn
handler = "provided"
runtime = "provided.al2"
environment {
variables = {
AWS_S3_LOCKING_PROVIDER = "dynamodb"
RUST_LOG = "deltalake=debug,oxbow=debug"
DYNAMO_LOCK_TABLE_NAME = aws_dynamodb_table.oxbow_advanced_locking.name
}
}
}
resource "aws_lambda_function" "group-events" {
description = "Group events for oxbow based on the table prefix"
filename = "../target/lambda/group-events/bootstrap.zip"
function_name = "group-events"
role = aws_iam_role.iam_for_lambda.arn
handler = "provided"
runtime = "provided.al2"
environment {
variables = {
RUST_LOG = "group-events=debug"
QUEUE_URL = aws_sqs_queue.oxbow-advanced-fifo.url
}
}
}
resource "aws_lambda_event_source_mapping" "group-events-trigger" {
event_source_arn = aws_sqs_queue.group-events.arn
function_name = aws_lambda_function.group-events.arn
}
resource "aws_lambda_event_source_mapping" "oxbow-advanced-trigger" {
event_source_arn = aws_sqs_queue.oxbow-advanced-fifo.arn
function_name = aws_lambda_function.oxbow-advanced.arn
}
resource "aws_sqs_queue" "oxbow-advanced-fifo" {
name = "oxbow-advanced.fifo"
policy = data.aws_iam_policy_document.queue.json
content_based_deduplication = true
fifo_queue = true
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.oxbow-advanced-dlq.arn
maxReceiveCount = 8
})
}
resource "aws_sqs_queue" "oxbow-advanced-dlq" {
name = "obxow-advanced-dlq.fifo"
fifo_queue = true
}
resource "aws_sqs_queue" "group-events" {
name = "group-events"
policy = data.aws_iam_policy_document.queue.json
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.group-events-dlq.arn
maxReceiveCount = 8
})
}
resource "aws_sqs_queue" "group-events-dlq" {
name = "group-events-dlq"
}
resource "aws_lambda_permission" "advanced-allow-bucket" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.group-events.arn
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.parquets.arn
}
# The DynamoDb table is used for providing safe concurrent writes to delta
# tables.
resource "aws_dynamodb_table" "oxbow_advanced_locking" {
name = "oxbow_advanced_lock_table"
billing_mode = "PROVISIONED"
# Default name of the partition key hard-coded in delta-rs
hash_key = "key"
read_capacity = 10
write_capacity = 10
attribute {
name = "key"
type = "S"
}
ttl {
attribute_name = "leaseDuration"
enabled = true
}
}

52
deployment/shared.tf Normal file
View File

@ -0,0 +1,52 @@
### Bootstrapping/configuration
###############################
variable "s3_bucket_arn" {
type = string
default = "*"
description = "The ARN for the S3 bucket that the oxbow function will watch"
}
variable "aws_access_key" {
type = string
default = ""
}
variable "aws_secret_key" {
type = string
default = ""
}
provider "aws" {
region = "us-west-2"
access_key = var.aws_access_key
secret_key = var.aws_secret_key
default_tags {
tags = {
ManagedBy = "Terraform"
environment = terraform.workspace
workspace = terraform.workspace
}
}
}
data "aws_iam_policy_document" "queue" {
statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:*"]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
# Allow all S3 buckets send notifications
values = ["arn:aws:s3:::*"]
}
}
}

View File

@ -1,9 +1,24 @@
# This Terraform file is necessary to configure the basic
# infrastructure around the Optimize lambda function
# infrastructure around the Oxbow lambda function
resource "aws_s3_bucket" "parquets" {
bucket = "oxbow-simple"
}
resource "aws_s3_bucket_notification" "bucket-notifications" {
bucket = aws_s3_bucket.parquets.id
queue {
queue_arn = aws_sqs_queue.oxbow.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".parquet"
}
depends_on = [aws_lambda_permission.allow-bucket]
}
resource "aws_lambda_function" "oxbow" {
description = "A simple lambda for converting parquet files to delta tables"
filename = "target/lambda/oxbow-lambda/bootstrap.zip"
filename = "../target/lambda/oxbow-lambda/bootstrap.zip"
function_name = "oxbow-delta-lake-conversion"
role = aws_iam_role.iam_for_lambda.arn
handler = "provided"
@ -37,11 +52,7 @@ resource "aws_sqs_queue" "oxbow_dlq" {
name = "obxow-notification-dlq"
}
resource "aws_s3_bucket" "parquets" {
bucket = "oxbow-dev-parquet"
}
resource "aws_lambda_permission" "allow_bucket" {
resource "aws_lambda_permission" "allow-bucket" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.oxbow.arn
@ -49,18 +60,6 @@ resource "aws_lambda_permission" "allow_bucket" {
source_arn = aws_s3_bucket.parquets.arn
}
resource "aws_s3_bucket_notification" "bucket_notification" {
bucket = aws_s3_bucket.parquets.id
queue {
queue_arn = aws_sqs_queue.oxbow.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".parquet"
}
depends_on = [aws_lambda_permission.allow_bucket]
}
data "aws_iam_policy_document" "assume_role" {
statement {
effect = "Allow"
@ -83,7 +82,7 @@ resource "aws_iam_policy" "lambda_permissions" {
Statement = [
{
Action = ["dynamodb:*"]
Resource = aws_dynamodb_table.oxbow_locking.arn
Resource = ["*"]
Effect = "Allow"
},
{
@ -93,34 +92,13 @@ resource "aws_iam_policy" "lambda_permissions" {
},
{
Action = ["sqs:*"]
Resource = aws_sqs_queue.oxbow.arn
Resource = "*"
Effect = "Allow"
}
]
})
}
data "aws_iam_policy_document" "queue" {
statement {
effect = "Allow"
principals {
type = "*"
identifiers = ["*"]
}
actions = ["sqs:SendMessage"]
# Hard-coding an ARN like syntax here because of the dependency cycle
resources = ["arn:aws:sqs:*:*:oxbow-notification-queue"]
condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [aws_s3_bucket.parquets.arn]
}
}
}
resource "aws_iam_role" "iam_for_lambda" {
name = "iam_for_oxbow_lambda"
assume_role_policy = data.aws_iam_policy_document.assume_role.json
@ -151,34 +129,3 @@ resource "aws_dynamodb_table" "oxbow_locking" {
}
}
### Bootstrapping/configuration
variable "s3_bucket_arn" {
type = string
default = "*"
description = "The ARN for the S3 bucket that the optimize function will optimize"
}
variable "aws_access_key" {
type = string
default = ""
}
variable "aws_secret_key" {
type = string
default = ""
}
provider "aws" {
region = "us-west-2"
access_key = var.aws_access_key
secret_key = var.aws_secret_key
default_tags {
tags = {
ManagedBy = "Terraform"
environment = terraform.workspace
workspace = terraform.workspace
}
}
}

View File

@ -20,3 +20,4 @@ aws-config = { version = "1.0.1", features = ["behavior-version-latest"] }
aws_lambda_events = { version = "0.10.0", default-features = false, features = ["sqs"]}
aws-sdk-sqs = "1.1.0"
lambda_runtime = { version = "0.8" }
uuid = { version = "1.6.1", features = ["v4"] }

View File

@ -1,7 +1,9 @@
use aws_lambda_events::event::sqs::SqsEvent;
use aws_lambda_events::s3::{S3Event, S3EventRecord};
use aws_sdk_sqs::types::SendMessageBatchRequestEntry;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::log::*;
use uuid::Uuid;
use std::collections::HashMap;
@ -22,7 +24,6 @@ async fn func(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let queue_url = std::env::var("QUEUE_URL").expect("Failed to get the FIFO output queue");
use aws_sdk_sqs::types::SendMessageBatchRequestEntry;
let mut entries: Vec<SendMessageBatchRequestEntry> = vec![];
for (group_id, records) in segmented.iter() {
info!(
@ -30,11 +31,12 @@ async fn func(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
records.len(),
group_id
);
let uuid = Uuid::new_v4();
let body = S3Event {
records: records.to_vec(),
};
let entry = SendMessageBatchRequestEntry::builder()
.id(group_id.to_string())
.id(uuid.simple().to_string())
.message_body(serde_json::to_string(&body)?)
.message_group_id(group_id.to_string())
.build()?;
@ -42,14 +44,19 @@ async fn func(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
}
debug!("Ordered entries to send: {entries:?}");
let response = client
.send_message_batch()
.queue_url(queue_url.clone())
.set_entries(Some(entries))
.send()
.await?;
debug!("SQS response: {response:?}");
info!("Successfully batched events into SQS FIFO at {queue_url}");
if !entries.is_empty() {
info!("Relaying {} entries to {queue_url}", entries.len());
let response = client
.send_message_batch()
.queue_url(queue_url.clone())
.set_entries(Some(entries))
.send()
.await?;
debug!("SQS response: {response:?}");
info!("Successfully batched events into SQS FIFO at {queue_url}");
} else {
info!("No entries to send");
}
Ok(())
}