From d26d27950cb2d80da127ea7cba7e06c30af8ee33 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 13 Jun 2021 09:43:26 -0700 Subject: [PATCH] Playing around with reading the file from S3 This structure I don't like, I need to think a bit more about how to structure the processor code --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 89 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4806231..f3206ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,6 +408,7 @@ dependencies = [ "clap 2.33.3", "deltalake", "dotenv", + "form_urlencoded", "lambda_runtime", "log", "parquet", diff --git a/Cargo.toml b/Cargo.toml index 61e6455..6a4c05a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ arrow = "4" clap = "2" deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "features-for-rustls", features = ["s3-rustls"] } dotenv = "0.15" +form_urlencoded = "1" log = "0.4" parquet = "4" pretty_env_logger = "0.4" diff --git a/src/main.rs b/src/main.rs index 38b3602..ce2e35a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use aws_lambda_events::event::s3::S3Event; use clap::{App, Arg, ArgMatches}; use log::*; +use tokio::io::AsyncReadExt; #[cfg(feature = "lambda")] mod lambda; @@ -66,6 +67,7 @@ async fn main() -> Result<(), anyhow::Error> { use rusoto_sqs::*; let args = preboot(); + if let Some(queue) = args.value_of("queue") { info!("Running in standalone mode with the queue: {}", queue); let client = SqsClient::new(Region::default()); @@ -85,10 +87,19 @@ async fn main() -> Result<(), anyhow::Error> { if let Some(messages) = &res.messages { for message in messages { + let mut should_delete = true; + if let Some(body) = &message.body { match serde_json::from_str::(&body) { Ok(event) => { debug!("Parsed an event to do something with: {:?}", event); + should_delete = match process_event(&event).await { + Ok(_) => true, + Err(e) => { + error!("Failure when processing event: {}", e); + false + } + }; } Err(err) => { error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err); @@ -98,15 +109,20 @@ async fn main() -> Result<(), anyhow::Error> { warn!("Message had no body: {:?}", &message); } - if let Some(receipt) = &message.receipt_handle { - let delete = DeleteMessageRequest { - queue_url: queue.to_string(), - receipt_handle: receipt.to_string(), - }; - // TODO: More gracefully handle this error - client.delete_message(delete).await?; - } else { - warn!("Somehow a message without a receipt handle was received!"); + if should_delete { + match &message.receipt_handle { + Some(receipt) => { + let delete = DeleteMessageRequest { + queue_url: queue.to_string(), + receipt_handle: receipt.to_string(), + }; + // TODO: More gracefully handle this error + client.delete_message(delete).await?; + } + None => { + warn!("Somehow a message without a receipt handle was received!"); + } + } } } } @@ -124,3 +140,65 @@ async fn main() -> Result<(), lambda_runtime::Error> { lambda_runtime::run(func).await?; Ok(()) } + +/** + * URL decode the given string + */ +fn urldecode(buf: &str) -> String { + use form_urlencoded::parse; + parse(buf.as_bytes()) + .map(|(key, val)| [key, val].concat()) + .collect::() +} + +/** + * Process an event + */ +async fn process_event(event: &S3Event) -> Result<(), anyhow::Error> { + use rusoto_core::*; + use rusoto_s3::*; + + let client = S3Client::new(Region::default()); + + for record in &event.records { + // NOTE: This is gross, there's got to be a better way + let bucket = record.s3.bucket.name.as_ref().unwrap().to_string(); + // Key names with partitions will be url encoded which means they need to + // be processed first + let key = urldecode(record.s3.object.key.as_ref().unwrap()); + + let get = GetObjectRequest { + key, + bucket, + ..Default::default() + }; + + trace!("request: {:?}", get); + let result = client.get_object(get).await?; + trace!("result: {:?}", result); + if let Some(stream) = result.body { + let mut buf = Vec::new(); + stream.into_async_read().read_to_end(&mut buf).await; + } else { + warn!("Object had no body, somehow"); + } + } + Err(anyhow::Error::msg("Fail")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_urldecode_with_encodes() { + let buf = "date%3D2021-03-12/auditlog-1235.json"; + assert_eq!("date=2021-03-12/auditlog-1235.json", urldecode(&buf)); + } + + #[test] + fn test_urldecode_normal() { + let buf = "raw/auditlog-1235.json"; + assert_eq!("raw/auditlog-1235.json", urldecode(&buf)); + } +}