diff --git a/Cargo.lock b/Cargo.lock index 1c8ff02..f0f5431 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,6 +39,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "anyhow" +version = "1.0.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" + [[package]] name = "arrayvec" version = "0.5.2" @@ -364,6 +370,7 @@ dependencies = [ name = "delta-s3-loader" version = "0.1.0" dependencies = [ + "anyhow", "arrow", "aws_lambda_events", "clap", diff --git a/Cargo.toml b/Cargo.toml index 831830f..30024cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,10 @@ authors = ["R. Tyler Croy "] edition = "2018" [dependencies] +anyhow = "*" +# This crate is not only usesful for Lambda since it has an S3Event structure +# which is useful in the standalone mode as well +aws_lambda_events = "*" arrow = "4" clap = "2" #deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "main", features = ["s3"] } @@ -25,8 +29,6 @@ tokio = { version = "1.0", features = ["macros"]} # Needed for the lambda feature lambda_runtime = { version = "0.3", optional = true } -aws_lambda_events = { version = "0.4", optional = true } - [features] -lambda = ["lambda_runtime", "aws_lambda_events"] +lambda = ["lambda_runtime"] diff --git a/src/main.rs b/src/main.rs index f3135a5..4031bb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,24 +1,30 @@ /* * The bulk of the application */ -use clap::{App, Arg}; +use aws_lambda_events::event::s3::S3Event; +use clap::{App, Arg, ArgMatches}; use log::*; #[cfg(feature = "lambda")] mod lambda; mod writer; -fn preboot() { +/** + * Preboot is responsible for the processing of arguments and other initialization + * tasks common to both the standalone and lambda modes of operation + */ +fn preboot() -> ArgMatches<'static> { + dotenv::dotenv().ok(); + #[cfg(debug_assertions)] pretty_env_logger::init(); - dotenv::dotenv().ok(); - info!( "Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"] ); - let _matches = App::new("delta-s3-loader") + + App::new("delta-s3-loader") .version(env!["CARGO_PKG_VERSION"]) .author("rtyler@brokenco.de") .about("Watch S3 buckets for new data to load into a Delta Table") @@ -42,19 +48,72 @@ fn preboot() { .takes_value(true), ) .arg( - Arg::with_name("SQS queue ARN") + Arg::with_name("queue") .short("q") .long("queue") - .value_name("QUEUE_ARN") - .help("ARN of the SQS queue to consume, *required* in standalone mode") + .env("QUEUE_URL") + .value_name("URL") + .help("URL of the SQS queue to consume, *required* in standalone mode") .takes_value(true), ) - .get_matches(); + .get_matches() } #[cfg(not(feature = "lambda"))] -fn main() { - preboot(); +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + use rusoto_core::*; + use rusoto_sqs::*; + + let args = preboot(); + if let Some(queue) = args.value_of("queue") { + info!("Running in standalone mode with the queue: {}", queue); + let client = SqsClient::new(Region::default()); + + loop { + let rcv = ReceiveMessageRequest { + queue_url: queue.to_string(), + max_number_of_messages: Some(10), + wait_time_seconds: Some(5), + ..Default::default() + }; + + let res = client.receive_message(rcv).await?; + debug!("Message(s) received: {:?}", res); + + if let Some(messages) = &res.messages { + for message in messages { + if let Some(body) = &message.body { + match serde_json::from_str::(&body) { + Ok(event) => { + debug!("Parsed an event to do something with: {:?}", event); + } + Err(err) => { + error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err); + } + } + } else { + debug!("Message had no body: {:?}", &message); + } + + if let Some(receipt) = &message.receipt_handle { + let delete = DeleteMessageRequest { + queue_url: queue.to_string(), + receipt_handle: receipt.to_string(), + }; + // TODO: More gracefully handle this error + client.delete_message(delete).await?; + } else { + warn!("Somehow a message without a receipt handle was received!"); + } + } + } + break; + } + } else { + panic!("When running in standalone mode the `queue` argument (or QUEUE_URL env variable) must be present"); + } + Ok(()) } #[cfg(feature = "lambda")]