From 559c9777985c24a7c5c72f1dd579d9efa949fc5e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 12 Jun 2021 12:22:24 -0700 Subject: [PATCH] Tinkering around but consuming messages from SQS in standalone mode Needed to do some manual testing first to ensure that this would properly consume from SQS and handle the events properly --- Cargo.lock | 7 +++++ Cargo.toml | 8 ++++-- src/main.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c8ff02..f0f5431 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "anyhow" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" + [[package]] name = "arrayvec" version = "0.5.2" @@ -364,6 +370,7 @@ dependencies = [ name = "delta-s3-loader" version = "0.1.0" dependencies = [ + "anyhow", "arrow", "aws_lambda_events", "clap", diff --git a/Cargo.toml b/Cargo.toml index 831830f..30024cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,10 @@ authors = ["R. Tyler Croy "] edition = "2018" [dependencies] +anyhow = "*" +# This crate is not only usesful for Lambda since it has an S3Event structure +# which is useful in the standalone mode as well +aws_lambda_events = "*" arrow = "4" clap = "2" #deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "main", features = ["s3"] } @@ -25,8 +29,6 @@ tokio = { version = "1.0", features = ["macros"]} # Needed for the lambda feature lambda_runtime = { version = "0.3", optional = true } -aws_lambda_events = { version = "0.4", optional = true } - [features] -lambda = ["lambda_runtime", "aws_lambda_events"] +lambda = ["lambda_runtime"] diff --git a/src/main.rs b/src/main.rs index f3135a5..4031bb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,30 @@ /* * The bulk of the application */ -use clap::{App, Arg}; +use aws_lambda_events::event::s3::S3Event; +use clap::{App, Arg, ArgMatches}; use log::*; #[cfg(feature = "lambda")] mod lambda; mod writer; -fn preboot() { +/** + * Preboot is responsible for the processing of arguments and other initialization + * tasks common to both the standalone and lambda modes of operation + */ +fn preboot() -> ArgMatches<'static> { + dotenv::dotenv().ok(); + #[cfg(debug_assertions)] pretty_env_logger::init(); - dotenv::dotenv().ok(); - info!( "Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"] ); - let _matches = App::new("delta-s3-loader") + + App::new("delta-s3-loader") .version(env!["CARGO_PKG_VERSION"]) .author("rtyler@brokenco.de") .about("Watch S3 buckets for new data to load into a Delta Table") @@ -42,19 +48,72 @@ fn preboot() { .takes_value(true), ) .arg( - Arg::with_name("SQS queue ARN") + Arg::with_name("queue") .short("q") .long("queue") - .value_name("QUEUE_ARN") - .help("ARN of the SQS queue to consume, *required* in standalone mode") + .env("QUEUE_URL") + .value_name("URL") + .help("URL of the SQS queue to consume, *required* in standalone mode") .takes_value(true), ) - .get_matches(); + .get_matches() } #[cfg(not(feature = "lambda"))] -fn main() { - preboot(); +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + use rusoto_core::*; + use rusoto_sqs::*; + + let args = preboot(); + if let Some(queue) = args.value_of("queue") { + info!("Running in standalone mode with the queue: {}", queue); + let client = SqsClient::new(Region::default()); + + loop { + let rcv = ReceiveMessageRequest { + queue_url: queue.to_string(), + max_number_of_messages: Some(10), + wait_time_seconds: Some(5), + ..Default::default() + }; + + let res = client.receive_message(rcv).await?; + debug!("Message(s) received: {:?}", res); + + if let Some(messages) = &res.messages { + for message in messages { + if let Some(body) = &message.body { + match serde_json::from_str::(&body) { + Ok(event) => { + debug!("Parsed an event to do something with: {:?}", event); + } + Err(err) => { + error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err); + } + } + } else { + debug!("Message had no body: {:?}", &message); + } + + if let Some(receipt) = &message.receipt_handle { + let delete = DeleteMessageRequest { + queue_url: queue.to_string(), + receipt_handle: receipt.to_string(), + }; + // TODO: More gracefully handle this error + client.delete_message(delete).await?; + } else { + warn!("Somehow a message without a receipt handle was received!"); + } + } + } + break; + } + } else { + panic!("When running in standalone mode the `queue` argument (or QUEUE_URL env variable) must be present"); + } + Ok(()) } #[cfg(feature = "lambda")]