Playing around with reading the file from S3
This structure I don't like, I need to think a bit more about how to structure the processor code
This commit is contained in:
parent
2667088175
commit
d26d27950c
|
@ -408,6 +408,7 @@ dependencies = [
|
|||
"clap 2.33.3",
|
||||
"deltalake",
|
||||
"dotenv",
|
||||
"form_urlencoded",
|
||||
"lambda_runtime",
|
||||
"log",
|
||||
"parquet",
|
||||
|
|
|
@ -13,6 +13,7 @@ arrow = "4"
|
|||
clap = "2"
|
||||
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "features-for-rustls", features = ["s3-rustls"] }
|
||||
dotenv = "0.15"
|
||||
form_urlencoded = "1"
|
||||
log = "0.4"
|
||||
parquet = "4"
|
||||
pretty_env_logger = "0.4"
|
||||
|
|
96
src/main.rs
96
src/main.rs
|
@ -4,6 +4,7 @@
|
|||
use aws_lambda_events::event::s3::S3Event;
|
||||
use clap::{App, Arg, ArgMatches};
|
||||
use log::*;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[cfg(feature = "lambda")]
|
||||
mod lambda;
|
||||
|
@ -66,6 +67,7 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
use rusoto_sqs::*;
|
||||
|
||||
let args = preboot();
|
||||
|
||||
if let Some(queue) = args.value_of("queue") {
|
||||
info!("Running in standalone mode with the queue: {}", queue);
|
||||
let client = SqsClient::new(Region::default());
|
||||
|
@ -85,10 +87,19 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
|
||||
if let Some(messages) = &res.messages {
|
||||
for message in messages {
|
||||
let mut should_delete = true;
|
||||
|
||||
if let Some(body) = &message.body {
|
||||
match serde_json::from_str::<S3Event>(&body) {
|
||||
Ok(event) => {
|
||||
debug!("Parsed an event to do something with: {:?}", event);
|
||||
should_delete = match process_event(&event).await {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
error!("Failure when processing event: {}", e);
|
||||
false
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err);
|
||||
|
@ -98,15 +109,20 @@ async fn main() -> Result<(), anyhow::Error> {
|
|||
warn!("Message had no body: {:?}", &message);
|
||||
}
|
||||
|
||||
if let Some(receipt) = &message.receipt_handle {
|
||||
let delete = DeleteMessageRequest {
|
||||
queue_url: queue.to_string(),
|
||||
receipt_handle: receipt.to_string(),
|
||||
};
|
||||
// TODO: More gracefully handle this error
|
||||
client.delete_message(delete).await?;
|
||||
} else {
|
||||
warn!("Somehow a message without a receipt handle was received!");
|
||||
if should_delete {
|
||||
match &message.receipt_handle {
|
||||
Some(receipt) => {
|
||||
let delete = DeleteMessageRequest {
|
||||
queue_url: queue.to_string(),
|
||||
receipt_handle: receipt.to_string(),
|
||||
};
|
||||
// TODO: More gracefully handle this error
|
||||
client.delete_message(delete).await?;
|
||||
}
|
||||
None => {
|
||||
warn!("Somehow a message without a receipt handle was received!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,3 +140,65 @@ async fn main() -> Result<(), lambda_runtime::Error> {
|
|||
lambda_runtime::run(func).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/**
|
||||
* URL decode the given string
|
||||
*/
|
||||
fn urldecode(buf: &str) -> String {
|
||||
use form_urlencoded::parse;
|
||||
parse(buf.as_bytes())
|
||||
.map(|(key, val)| [key, val].concat())
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Process an event
|
||||
*/
|
||||
async fn process_event(event: &S3Event) -> Result<(), anyhow::Error> {
|
||||
use rusoto_core::*;
|
||||
use rusoto_s3::*;
|
||||
|
||||
let client = S3Client::new(Region::default());
|
||||
|
||||
for record in &event.records {
|
||||
// NOTE: This is gross, there's got to be a better way
|
||||
let bucket = record.s3.bucket.name.as_ref().unwrap().to_string();
|
||||
// Key names with partitions will be url encoded which means they need to
|
||||
// be processed first
|
||||
let key = urldecode(record.s3.object.key.as_ref().unwrap());
|
||||
|
||||
let get = GetObjectRequest {
|
||||
key,
|
||||
bucket,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
trace!("request: {:?}", get);
|
||||
let result = client.get_object(get).await?;
|
||||
trace!("result: {:?}", result);
|
||||
if let Some(stream) = result.body {
|
||||
let mut buf = Vec::new();
|
||||
stream.into_async_read().read_to_end(&mut buf).await;
|
||||
} else {
|
||||
warn!("Object had no body, somehow");
|
||||
}
|
||||
}
|
||||
Err(anyhow::Error::msg("Fail"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_urldecode_with_encodes() {
|
||||
let buf = "date%3D2021-03-12/auditlog-1235.json";
|
||||
assert_eq!("date=2021-03-12/auditlog-1235.json", urldecode(&buf));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_urldecode_normal() {
|
||||
let buf = "raw/auditlog-1235.json";
|
||||
assert_eq!("raw/auditlog-1235.json", urldecode(&buf));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue