diff --git a/src/lambda.rs b/src/lambda.rs index b5a16e3..eac5414 100644 --- a/src/lambda.rs +++ b/src/lambda.rs @@ -1,5 +1,6 @@ -use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; +use aws_lambda_events::event::s3::S3Event; use lambda_runtime::{Context, Error}; +use log::*; /** * The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and @@ -7,10 +8,6 @@ use lambda_runtime::{Context, Error}; * */ pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result { - // Unfortunately there's not a good way to avoid reload the configuration every time. At least - // as far as I can tell right now - let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?; - for record in event.records { if let Some(ref name) = record.event_name { trace!("Processing an event named: {}", name); @@ -30,87 +27,6 @@ pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result Option { - if let Some(bucket) = &record.s3.bucket.name { - for source in &conf.sources { - if bucket != &source.bucket { - continue; - } - - if let Some(key) = &record.s3.object.key { - if !source.prefix.is_match(key) { - continue; - } - - let expected_partitions = source.partitions.len(); - - /* If there are no partitions expected, then we've already matched - */ - if expected_partitions == 0 { - return Some(MatchedFile::new(source, key, vec![])); - } - - /* - * At this point the key and the prefix have matched, now to ensure - * that the data is partitioned according to the configuration - */ - let partitions = Partition::from_path_str(key); - debug!( - "Partitions identified from record: {} -- {:?}", - key, partitions - ); - - // No match if the discovered partitions don't match the count - if partitions.len() != expected_partitions { - continue; - } - - for index in 0..expected_partitions { - // The partition at the index doesn't match what we're looking for - if partitions[index].name != source.partitions[index] { - return None; - } - } - - return Some(MatchedFile::new(source, key, partitions)); - } - } - } - None -} - -/** - * A MatchedFile will contain all the necessary information for reading the object and allowing a - * Delta write to occur - */ -struct MatchedFile { - /// The source bucket containing the file to read - bucket: String, - /// The source object to read, expected to be JSON - object: String, - /// Partitions to include in the Delta write - partitions: Vec, - /// The output path for the Delta writer to use - output_path: String, -} - -impl MatchedFile { - fn new(source: &config::Source, key: &str, partitions: Vec) -> Self { - MatchedFile { - bucket: source.bucket.clone(), - object: key.to_string(), - partitions: partitions, - output_path: source.tablepath.clone(), - } - } -} - /** * A Partition is a simple struct to carry values from paths like: * just/a/path/year=2021/mydata.json @@ -162,67 +78,6 @@ mod tests { assert_eq!(partitions[0].value, "2021"); } - /** - * Make sure the sample event, which doesn't match a configured entry, doesn't return a source - */ - #[test] - fn test_match_record_fail() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = test_config(); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - #[test] - fn test_match_record_ok() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), - partitions: vec!["date".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_some()); - } - - #[test] - fn test_match_record_prefix_mismatch() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"), - partitions: vec!["date".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - #[test] - fn test_match_record_partition_mismatch() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), - partitions: vec!["year".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - /** - * Load the config.yml for tests - */ - fn test_config() -> config::Config { - config::Config::from_file(&PathBuf::from("./config.yml")) - .expect("Failed to load configuration for tests") - } fn sample_event() -> String { String::from( r#" diff --git a/src/main.rs b/src/main.rs index 766f0fe..f7fc498 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use clap::{App, Arg}; use log::*; -#[cfg(lambda)] +#[cfg(feature = "lambda")] mod lambda; mod writer; @@ -50,12 +50,12 @@ fn preboot() { .get_matches(); } -#[cfg(not(lambda))] +#[cfg(not(feature = "lambda"))] fn main() { preboot(); } -#[cfg(lambda)] +#[cfg(feature = "lambda")] #[tokio::main] async fn main() -> Result<(), lambda_runtime::Error> { preboot();