Compare commits
6 Commits
c20997b197
...
d26d27950c
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | d26d27950c | |
R Tyler Croy | 2667088175 | |
R Tyler Croy | 559c977798 | |
R Tyler Croy | 5f10b2d86d | |
R Tyler Croy | 2f5f68b35c | |
R Tyler Croy | fe76b31955 |
|
@ -1,2 +1,3 @@
|
|||
/target
|
||||
*.swp
|
||||
.env
|
||||
|
|
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
17
Cargo.toml
|
@ -5,16 +5,23 @@ authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
|
|||
edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "*"
|
||||
# This crate is not only usesful for Lambda since it has an S3Event structure
|
||||
# which is useful in the standalone mode as well
|
||||
aws_lambda_events = "*"
|
||||
arrow = "4"
|
||||
clap = "2"
|
||||
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "mixing-in-deltalake_ext", features = ["s3"] }
|
||||
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "features-for-rustls", features = ["s3-rustls"] }
|
||||
dotenv = "0.15"
|
||||
form_urlencoded = "1"
|
||||
log = "0.4"
|
||||
parquet = "4"
|
||||
pretty_env_logger = "0.4"
|
||||
regex = "1"
|
||||
rusoto_core = "*"
|
||||
rusoto_core = { version = "*", default-features = false, features = ["rustls"] }
|
||||
rusoto_credential = "*"
|
||||
rusoto_s3 = "*"
|
||||
rusoto_sqs = { version = "*", default-features = false, features = ["serialize_structs", "deserialize_structs", "rustls"]}
|
||||
rusoto_s3 = { version = "*", default-features = false, features = ["rustls"]}
|
||||
serde = { version = "1", features = ["rc", "derive"]}
|
||||
serde_json = "1"
|
||||
serde_yaml = "0.8"
|
||||
|
@ -23,8 +30,6 @@ tokio = { version = "1.0", features = ["macros"]}
|
|||
|
||||
# Needed for the lambda feature
|
||||
lambda_runtime = { version = "0.3", optional = true }
|
||||
aws_lambda_events = { version = "0.4", optional = true }
|
||||
|
||||
|
||||
[features]
|
||||
lambda = ["lambda_runtime", "aws_lambda_events"]
|
||||
lambda = ["lambda_runtime"]
|
||||
|
|
149
src/lambda.rs
149
src/lambda.rs
|
@ -1,5 +1,6 @@
|
|||
use aws_lambda_events::event::s3::{S3Event, S3EventRecord};
|
||||
use aws_lambda_events::event::s3::S3Event;
|
||||
use lambda_runtime::{Context, Error};
|
||||
use log::*;
|
||||
|
||||
/**
|
||||
* The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and
|
||||
|
@ -7,10 +8,6 @@ use lambda_runtime::{Context, Error};
|
|||
* <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html>
|
||||
*/
|
||||
pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> {
|
||||
// Unfortunately there's not a good way to avoid reload the configuration every time. At least
|
||||
// as far as I can tell right now
|
||||
let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?;
|
||||
|
||||
for record in event.records {
|
||||
if let Some(ref name) = record.event_name {
|
||||
trace!("Processing an event named: {}", name);
|
||||
|
@ -30,87 +27,6 @@ pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, E
|
|||
Ok("{}".to_string())
|
||||
}
|
||||
|
||||
/**
|
||||
* The match_record function will look at a given S3EventRecord to see if the contents of that
|
||||
* record match a specific source configured for the Lambda
|
||||
*
|
||||
* Each of the records passed is assumed to be an `ObjectCreated:Put`
|
||||
*/
|
||||
fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option<MatchedFile> {
|
||||
if let Some(bucket) = &record.s3.bucket.name {
|
||||
for source in &conf.sources {
|
||||
if bucket != &source.bucket {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(key) = &record.s3.object.key {
|
||||
if !source.prefix.is_match(key) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let expected_partitions = source.partitions.len();
|
||||
|
||||
/* If there are no partitions expected, then we've already matched
|
||||
*/
|
||||
if expected_partitions == 0 {
|
||||
return Some(MatchedFile::new(source, key, vec![]));
|
||||
}
|
||||
|
||||
/*
|
||||
* At this point the key and the prefix have matched, now to ensure
|
||||
* that the data is partitioned according to the configuration
|
||||
*/
|
||||
let partitions = Partition::from_path_str(key);
|
||||
debug!(
|
||||
"Partitions identified from record: {} -- {:?}",
|
||||
key, partitions
|
||||
);
|
||||
|
||||
// No match if the discovered partitions don't match the count
|
||||
if partitions.len() != expected_partitions {
|
||||
continue;
|
||||
}
|
||||
|
||||
for index in 0..expected_partitions {
|
||||
// The partition at the index doesn't match what we're looking for
|
||||
if partitions[index].name != source.partitions[index] {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
return Some(MatchedFile::new(source, key, partitions));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/**
|
||||
* A MatchedFile will contain all the necessary information for reading the object and allowing a
|
||||
* Delta write to occur
|
||||
*/
|
||||
struct MatchedFile {
|
||||
/// The source bucket containing the file to read
|
||||
bucket: String,
|
||||
/// The source object to read, expected to be JSON
|
||||
object: String,
|
||||
/// Partitions to include in the Delta write
|
||||
partitions: Vec<Partition>,
|
||||
/// The output path for the Delta writer to use
|
||||
output_path: String,
|
||||
}
|
||||
|
||||
impl MatchedFile {
|
||||
fn new(source: &config::Source, key: &str, partitions: Vec<Partition>) -> Self {
|
||||
MatchedFile {
|
||||
bucket: source.bucket.clone(),
|
||||
object: key.to_string(),
|
||||
partitions: partitions,
|
||||
output_path: source.tablepath.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A Partition is a simple struct to carry values from paths like:
|
||||
* just/a/path/year=2021/mydata.json
|
||||
|
@ -162,67 +78,6 @@ mod tests {
|
|||
assert_eq!(partitions[0].value, "2021");
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure the sample event, which doesn't match a configured entry, doesn't return a source
|
||||
*/
|
||||
#[test]
|
||||
fn test_match_record_fail() {
|
||||
let event: S3Event =
|
||||
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
|
||||
let conf = test_config();
|
||||
let matches = match_record(&event.records[0], &conf);
|
||||
assert!(matches.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_match_record_ok() {
|
||||
let event: S3Event =
|
||||
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
|
||||
let conf = config::Config::new(vec![config::Source {
|
||||
bucket: "my-bucket".to_string(),
|
||||
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
|
||||
partitions: vec!["date".to_string()],
|
||||
tablepath: "s3://test".to_string(),
|
||||
}]);
|
||||
let matches = match_record(&event.records[0], &conf);
|
||||
assert!(matches.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_match_record_prefix_mismatch() {
|
||||
let event: S3Event =
|
||||
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
|
||||
let conf = config::Config::new(vec![config::Source {
|
||||
bucket: "my-bucket".to_string(),
|
||||
prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"),
|
||||
partitions: vec!["date".to_string()],
|
||||
tablepath: "s3://test".to_string(),
|
||||
}]);
|
||||
let matches = match_record(&event.records[0], &conf);
|
||||
assert!(matches.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_match_record_partition_mismatch() {
|
||||
let event: S3Event =
|
||||
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
|
||||
let conf = config::Config::new(vec![config::Source {
|
||||
bucket: "my-bucket".to_string(),
|
||||
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
|
||||
partitions: vec!["year".to_string()],
|
||||
tablepath: "s3://test".to_string(),
|
||||
}]);
|
||||
let matches = match_record(&event.records[0], &conf);
|
||||
assert!(matches.is_none());
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the config.yml for tests
|
||||
*/
|
||||
fn test_config() -> config::Config {
|
||||
config::Config::from_file(&PathBuf::from("./config.yml"))
|
||||
.expect("Failed to load configuration for tests")
|
||||
}
|
||||
fn sample_event() -> String {
|
||||
String::from(
|
||||
r#"
|
||||
|
|
163
src/main.rs
163
src/main.rs
|
@ -1,14 +1,22 @@
|
|||
/*
|
||||
* The bulk of the application
|
||||
*/
|
||||
use clap::{App, Arg};
|
||||
use aws_lambda_events::event::s3::S3Event;
|
||||
use clap::{App, Arg, ArgMatches};
|
||||
use log::*;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[cfg(lambda)]
|
||||
#[cfg(feature = "lambda")]
|
||||
mod lambda;
|
||||
mod writer;
|
||||
|
||||
fn preboot() {
|
||||
/**
|
||||
* Preboot is responsible for the processing of arguments and other initialization
|
||||
* tasks common to both the standalone and lambda modes of operation
|
||||
*/
|
||||
fn preboot() -> ArgMatches<'static> {
|
||||
dotenv::dotenv().ok();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
pretty_env_logger::init();
|
||||
|
||||
|
@ -16,7 +24,8 @@ fn preboot() {
|
|||
"Initializing delta-s3-loader v{}",
|
||||
env!["CARGO_PKG_VERSION"]
|
||||
);
|
||||
let _matches = App::new("delta-s3-loader")
|
||||
|
||||
App::new("delta-s3-loader")
|
||||
.version(env!["CARGO_PKG_VERSION"])
|
||||
.author("rtyler@brokenco.de")
|
||||
.about("Watch S3 buckets for new data to load into a Delta Table")
|
||||
|
@ -40,22 +49,90 @@ fn preboot() {
|
|||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("SQS queue ARN")
|
||||
Arg::with_name("queue")
|
||||
.short("q")
|
||||
.long("queue")
|
||||
.value_name("QUEUE_ARN")
|
||||
.help("ARN of the SQS queue to consume, *required* in standalone mode")
|
||||
.env("QUEUE_URL")
|
||||
.value_name("URL")
|
||||
.help("URL of the SQS queue to consume, *required* in standalone mode")
|
||||
.takes_value(true),
|
||||
)
|
||||
.get_matches();
|
||||
.get_matches()
|
||||
}
|
||||
|
||||
#[cfg(not(lambda))]
|
||||
fn main() {
|
||||
preboot();
|
||||
#[cfg(not(feature = "lambda"))]
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), anyhow::Error> {
|
||||
use rusoto_core::*;
|
||||
use rusoto_sqs::*;
|
||||
|
||||
let args = preboot();
|
||||
|
||||
if let Some(queue) = args.value_of("queue") {
|
||||
info!("Running in standalone mode with the queue: {}", queue);
|
||||
let client = SqsClient::new(Region::default());
|
||||
|
||||
loop {
|
||||
let rcv = ReceiveMessageRequest {
|
||||
queue_url: queue.to_string(),
|
||||
// Always consuming the max number of available messages to reduce round trips
|
||||
max_number_of_messages: Some(10),
|
||||
// This wait time is being used as the effective loop interval for the main runloop
|
||||
wait_time_seconds: Some(10),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let res = client.receive_message(rcv).await?;
|
||||
trace!("Message(s) received: {:?}", res);
|
||||
|
||||
if let Some(messages) = &res.messages {
|
||||
for message in messages {
|
||||
let mut should_delete = true;
|
||||
|
||||
if let Some(body) = &message.body {
|
||||
match serde_json::from_str::<S3Event>(&body) {
|
||||
Ok(event) => {
|
||||
debug!("Parsed an event to do something with: {:?}", event);
|
||||
should_delete = match process_event(&event).await {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
error!("Failure when processing event: {}", e);
|
||||
false
|
||||
}
|
||||
};
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Message had no body: {:?}", &message);
|
||||
}
|
||||
|
||||
if should_delete {
|
||||
match &message.receipt_handle {
|
||||
Some(receipt) => {
|
||||
let delete = DeleteMessageRequest {
|
||||
queue_url: queue.to_string(),
|
||||
receipt_handle: receipt.to_string(),
|
||||
};
|
||||
// TODO: More gracefully handle this error
|
||||
client.delete_message(delete).await?;
|
||||
}
|
||||
None => {
|
||||
warn!("Somehow a message without a receipt handle was received!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("When running in standalone mode the `queue` argument (or QUEUE_URL env variable) must be present");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(lambda)]
|
||||
#[cfg(feature = "lambda")]
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), lambda_runtime::Error> {
|
||||
preboot();
|
||||
|
@ -63,3 +140,65 @@ async fn main() -> Result<(), lambda_runtime::Error> {
|
|||
lambda_runtime::run(func).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/**
|
||||
* URL decode the given string
|
||||
*/
|
||||
fn urldecode(buf: &str) -> String {
|
||||
use form_urlencoded::parse;
|
||||
parse(buf.as_bytes())
|
||||
.map(|(key, val)| [key, val].concat())
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
/**
|
||||
* Process an event
|
||||
*/
|
||||
async fn process_event(event: &S3Event) -> Result<(), anyhow::Error> {
|
||||
use rusoto_core::*;
|
||||
use rusoto_s3::*;
|
||||
|
||||
let client = S3Client::new(Region::default());
|
||||
|
||||
for record in &event.records {
|
||||
// NOTE: This is gross, there's got to be a better way
|
||||
let bucket = record.s3.bucket.name.as_ref().unwrap().to_string();
|
||||
// Key names with partitions will be url encoded which means they need to
|
||||
// be processed first
|
||||
let key = urldecode(record.s3.object.key.as_ref().unwrap());
|
||||
|
||||
let get = GetObjectRequest {
|
||||
key,
|
||||
bucket,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
trace!("request: {:?}", get);
|
||||
let result = client.get_object(get).await?;
|
||||
trace!("result: {:?}", result);
|
||||
if let Some(stream) = result.body {
|
||||
let mut buf = Vec::new();
|
||||
stream.into_async_read().read_to_end(&mut buf).await;
|
||||
} else {
|
||||
warn!("Object had no body, somehow");
|
||||
}
|
||||
}
|
||||
Err(anyhow::Error::msg("Fail"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_urldecode_with_encodes() {
|
||||
let buf = "date%3D2021-03-12/auditlog-1235.json";
|
||||
assert_eq!("date=2021-03-12/auditlog-1235.json", urldecode(&buf));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_urldecode_normal() {
|
||||
let buf = "raw/auditlog-1235.json";
|
||||
assert_eq!("raw/auditlog-1235.json", urldecode(&buf));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue