diff --git a/Cargo.lock b/Cargo.lock index 049e948..2e64ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,8 +44,9 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" -version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=05b36567bd8216bec71b796fe3bb6811c71abbec#05b36567bd8216bec71b796fe3bb6811c71abbec" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "014c7490f839d9dd4ce28f8232d4731f8b1fb93fa477d69e6fe881814ac2b6bb" dependencies = [ "cfg_aliases", "chrono", @@ -378,8 +379,8 @@ dependencies = [ [[package]] name = "deltalake" -version = "0.2.1" -source = "git+https://github.com/delta-io/delta-rs?branch=main#e956f08edcace2dbc3247884d82039140a087f22" +version = "0.4.0" +source = "git+https://github.com/rtyler/delta-rs?branch=mixing-in-deltalake_ext#c43ac478fe3dedf63386ce18c193fc52293642f2" dependencies = [ "anyhow", "arrow", @@ -396,6 +397,7 @@ dependencies = [ "libc", "log", "parquet", + "parquet-format", "regex", "rusoto_core", "rusoto_credential", @@ -1173,8 +1175,9 @@ checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85" [[package]] name = "parquet" -version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=05b36567bd8216bec71b796fe3bb6811c71abbec#05b36567bd8216bec71b796fe3bb6811c71abbec" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32858ae16bd61fda406be4b76af617d2f632fed4f0093810245aa4f5316ac865" dependencies = [ "arrow", "base64 0.12.3", @@ -2312,18 +2315,18 @@ checksum = "81a974bcdd357f0dca4d41677db03436324d45a4c9ed2d0b873a5a360ce41c36" [[package]] name = "zstd" -version = "0.6.1+zstd.1.4.9" +version = "0.7.0+zstd.1.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3" +checksum = "9428752481d8372e15b1bf779ea518a179ad6c771cca2d2c60e4fbff3cc2cd52" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "3.0.1+zstd.1.4.9" +version = "3.1.0+zstd.1.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c" +checksum = "5aa1926623ad7fe406e090555387daf73db555b948134b4d73eac5eb08fb666d" dependencies = [ "libc", "zstd-sys", @@ -2331,9 +2334,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.4.20+zstd.1.4.9" +version = "1.5.0+zstd.1.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e" +checksum = "4e6c094340240369025fc6b731b054ee2a834328fa584310ac96aa4baebdc465" dependencies = [ "cc", "libc", diff --git a/Cargo.toml b/Cargo.toml index 440acc7..48c08b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,10 @@ authors = ["R. Tyler Croy "] edition = "2018" [dependencies] -arrow = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" } -aws_lambda_events = "0.4" -deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3"] } -lambda_runtime = "0.3" +arrow = "4" +deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "mixing-in-deltalake_ext", features = ["s3"] } log = "0.4" -parquet = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" } +parquet = "4" pretty_env_logger = "0.4" regex = "1" serde = { version = "1", features = ["rc", "derive"]} @@ -18,3 +16,11 @@ serde_json = "1" serde_yaml = "0.8" serde_regex = "1" tokio = { version = "1.0", features = ["macros"]} + +# Needed for the lambda feature +lambda_runtime = { version = "0.3", optional = true } +aws_lambda_events = { version = "0.4", optional = true } + + +[features] +lambda = ["lambda_runtime", "aws_lambda_events"] diff --git a/src/lambda.rs b/src/lambda.rs new file mode 100644 index 0000000..b5a16e3 --- /dev/null +++ b/src/lambda.rs @@ -0,0 +1,279 @@ +use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; +use lambda_runtime::{Context, Error}; + +/** + * The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and + * each S3EventRecord processed: + * + */ +pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result { + // Unfortunately there's not a good way to avoid reload the configuration every time. At least + // as far as I can tell right now + let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?; + + for record in event.records { + if let Some(ref name) = record.event_name { + trace!("Processing an event named: {}", name); + /* + * The only events that delta-s3-loader is interested in are new PUTs which + * indicate a new file must be processed. + */ + if name == "ObjectCreated:Put" { + trace!("Processing record: {:?}", record); + } + } else { + warn!("Received a record without a name: {:?}", record); + } + } + + // Since this was triggered asynchronously, no need for a real response + Ok("{}".to_string()) +} + +/** + * The match_record function will look at a given S3EventRecord to see if the contents of that + * record match a specific source configured for the Lambda + * + * Each of the records passed is assumed to be an `ObjectCreated:Put` + */ +fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option { + if let Some(bucket) = &record.s3.bucket.name { + for source in &conf.sources { + if bucket != &source.bucket { + continue; + } + + if let Some(key) = &record.s3.object.key { + if !source.prefix.is_match(key) { + continue; + } + + let expected_partitions = source.partitions.len(); + + /* If there are no partitions expected, then we've already matched + */ + if expected_partitions == 0 { + return Some(MatchedFile::new(source, key, vec![])); + } + + /* + * At this point the key and the prefix have matched, now to ensure + * that the data is partitioned according to the configuration + */ + let partitions = Partition::from_path_str(key); + debug!( + "Partitions identified from record: {} -- {:?}", + key, partitions + ); + + // No match if the discovered partitions don't match the count + if partitions.len() != expected_partitions { + continue; + } + + for index in 0..expected_partitions { + // The partition at the index doesn't match what we're looking for + if partitions[index].name != source.partitions[index] { + return None; + } + } + + return Some(MatchedFile::new(source, key, partitions)); + } + } + } + None +} + +/** + * A MatchedFile will contain all the necessary information for reading the object and allowing a + * Delta write to occur + */ +struct MatchedFile { + /// The source bucket containing the file to read + bucket: String, + /// The source object to read, expected to be JSON + object: String, + /// Partitions to include in the Delta write + partitions: Vec, + /// The output path for the Delta writer to use + output_path: String, +} + +impl MatchedFile { + fn new(source: &config::Source, key: &str, partitions: Vec) -> Self { + MatchedFile { + bucket: source.bucket.clone(), + object: key.to_string(), + partitions: partitions, + output_path: source.tablepath.clone(), + } + } +} + +/** + * A Partition is a simple struct to carry values from paths like: + * just/a/path/year=2021/mydata.json + */ +#[derive(Clone, Debug)] +struct Partition { + name: String, + value: String, +} + +impl Partition { + /** + * Convert the given path string (e.g. root/year=2021/month=04/afile.json) + * into an ordered vector of the partitions contained within the path string + */ + fn from_path_str(pathstr: &str) -> Vec { + pathstr + .split("/") + .filter_map(|part| { + let sides: Vec<&str> = part.split("=").collect(); + if sides.len() == 2 { + Some(Partition { + name: sides[0].to_string(), + value: sides[1].to_string(), + }) + } else { + None + } + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_path_str_no_partitions() { + let result = Partition::from_path_str("just/a/path"); + assert_eq!(0, result.len()); + } + + #[test] + fn from_path_str_single() { + let partitions = Partition::from_path_str("some/year=2021/file.json"); + assert_eq!(1, partitions.len()); + assert_eq!(partitions[0].name, "year"); + assert_eq!(partitions[0].value, "2021"); + } + + /** + * Make sure the sample event, which doesn't match a configured entry, doesn't return a source + */ + #[test] + fn test_match_record_fail() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = test_config(); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_none()); + } + + #[test] + fn test_match_record_ok() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = config::Config::new(vec![config::Source { + bucket: "my-bucket".to_string(), + prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), + partitions: vec!["date".to_string()], + tablepath: "s3://test".to_string(), + }]); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_some()); + } + + #[test] + fn test_match_record_prefix_mismatch() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = config::Config::new(vec![config::Source { + bucket: "my-bucket".to_string(), + prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"), + partitions: vec!["date".to_string()], + tablepath: "s3://test".to_string(), + }]); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_none()); + } + + #[test] + fn test_match_record_partition_mismatch() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = config::Config::new(vec![config::Source { + bucket: "my-bucket".to_string(), + prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), + partitions: vec!["year".to_string()], + tablepath: "s3://test".to_string(), + }]); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_none()); + } + + /** + * Load the config.yml for tests + */ + fn test_config() -> config::Config { + config::Config::from_file(&PathBuf::from("./config.yml")) + .expect("Failed to load configuration for tests") + } + fn sample_event() -> String { + String::from( + r#" +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-2", + "eventTime": "2019-09-03T19:37:27.192Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AWS:AIDAINPONIXQXHT3IKHL2" + }, + "requestParameters": { + "sourceIPAddress": "205.255.255.255" + }, + "responseElements": { + "x-amz-request-id": "D82B88E5F771F645", + "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", + "bucket": { + "name": "my-bucket", + "ownerIdentity": { + "principalId": "A3I5XTEXAMAI3E" + }, + "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" + }, + "object": { + "key": "somepath/date=2021-04-16/afile.json", + "size": 1305107, + "eTag": "b21b84d653bb07b05b1e6b33684dc11b", + "sequencer": "0C0F6F405D6ED209E1" + } + } + } + ] +}"#, + ) + } + + #[tokio::test] + async fn test_s3_event_handler() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let result = s3_event_handler(event, Context::default()) + .await + .expect("Failed to run event handler"); + assert_eq!("{}", result); + } +} diff --git a/src/main.rs b/src/main.rs index 789f4b0..0674bbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,16 +2,15 @@ * The bulk of the application */ -use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; -use lambda_runtime::{handler_fn, Context, Error}; use log::*; use std::path::PathBuf; mod config; mod writer; +#[cfg(lambda)] +mod lambda; -#[tokio::main] -async fn main() -> Result<(), Error> { +fn preboot() { #[cfg(debug_assertions)] pretty_env_logger::init(); @@ -19,285 +18,19 @@ async fn main() -> Result<(), Error> { "Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"] ); +} - let func = handler_fn(s3_event_handler); +#[cfg(not(lambda))] +fn main() { + preboot(); +} + + +#[cfg(lambda)] +#[tokio::main] +async fn main() -> Result<(), lambda_runtime::Error> { + preboot(); + let func = lambda_runtime::handler_fn(crate::lambda::s3_event_handler); lambda_runtime::run(func).await?; Ok(()) } - -/** - * The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and - * each S3EventRecord processed: - * - */ -async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result { - // Unfortunately there's not a good way to avoid reload the configuration every time. At least - // as far as I can tell right now - let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?; - - for record in event.records { - if let Some(ref name) = record.event_name { - trace!("Processing an event named: {}", name); - /* - * The only events that delta-s3-loader is interested in are new PUTs which - * indicate a new file must be processed. - */ - if name == "ObjectCreated:Put" { - trace!("Processing record: {:?}", record); - } - } else { - warn!("Received a record without a name: {:?}", record); - } - } - - // Since this was triggered asynchronously, no need for a real response - Ok("{}".to_string()) -} - -/** - * The match_record function will look at a given S3EventRecord to see if the contents of that - * record match a specific source configured for the Lambda - * - * Each of the records passed is assumed to be an `ObjectCreated:Put` - */ -fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option { - if let Some(bucket) = &record.s3.bucket.name { - for source in &conf.sources { - if bucket != &source.bucket { - continue; - } - - if let Some(key) = &record.s3.object.key { - if !source.prefix.is_match(key) { - continue; - } - - let expected_partitions = source.partitions.len(); - - /* If there are no partitions expected, then we've already matched - */ - if expected_partitions == 0 { - return Some(MatchedFile::new(source, key, vec![])); - } - - /* - * At this point the key and the prefix have matched, now to ensure - * that the data is partitioned according to the configuration - */ - let partitions = Partition::from_path_str(key); - debug!( - "Partitions identified from record: {} -- {:?}", - key, partitions - ); - - // No match if the discovered partitions don't match the count - if partitions.len() != expected_partitions { - continue; - } - - for index in 0..expected_partitions { - // The partition at the index doesn't match what we're looking for - if partitions[index].name != source.partitions[index] { - return None; - } - } - - return Some(MatchedFile::new(source, key, partitions)); - } - } - } - None -} - -/** - * A MatchedFile will contain all the necessary information for reading the object and allowing a - * Delta write to occur - */ -struct MatchedFile { - /// The source bucket containing the file to read - bucket: String, - /// The source object to read, expected to be JSON - object: String, - /// Partitions to include in the Delta write - partitions: Vec, - /// The output path for the Delta writer to use - output_path: String, -} - -impl MatchedFile { - fn new(source: &config::Source, key: &str, partitions: Vec) -> Self { - MatchedFile { - bucket: source.bucket.clone(), - object: key.to_string(), - partitions: partitions, - output_path: source.tablepath.clone(), - } - } -} - -/** - * A Partition is a simple struct to carry values from paths like: - * just/a/path/year=2021/mydata.json - */ -#[derive(Clone, Debug)] -struct Partition { - name: String, - value: String, -} - -impl Partition { - /** - * Convert the given path string (e.g. root/year=2021/month=04/afile.json) - * into an ordered vector of the partitions contained within the path string - */ - fn from_path_str(pathstr: &str) -> Vec { - pathstr - .split("/") - .filter_map(|part| { - let sides: Vec<&str> = part.split("=").collect(); - if sides.len() == 2 { - Some(Partition { - name: sides[0].to_string(), - value: sides[1].to_string(), - }) - } else { - None - } - }) - .collect() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn from_path_str_no_partitions() { - let result = Partition::from_path_str("just/a/path"); - assert_eq!(0, result.len()); - } - - #[test] - fn from_path_str_single() { - let partitions = Partition::from_path_str("some/year=2021/file.json"); - assert_eq!(1, partitions.len()); - assert_eq!(partitions[0].name, "year"); - assert_eq!(partitions[0].value, "2021"); - } - - /** - * Make sure the sample event, which doesn't match a configured entry, doesn't return a source - */ - #[test] - fn test_match_record_fail() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = test_config(); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - #[test] - fn test_match_record_ok() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), - partitions: vec!["date".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_some()); - } - - #[test] - fn test_match_record_prefix_mismatch() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"), - partitions: vec!["date".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - #[test] - fn test_match_record_partition_mismatch() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let conf = config::Config::new(vec![config::Source { - bucket: "my-bucket".to_string(), - prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), - partitions: vec!["year".to_string()], - tablepath: "s3://test".to_string(), - }]); - let matches = match_record(&event.records[0], &conf); - assert!(matches.is_none()); - } - - /** - * Load the config.yml for tests - */ - fn test_config() -> config::Config { - config::Config::from_file(&PathBuf::from("./config.yml")) - .expect("Failed to load configuration for tests") - } - fn sample_event() -> String { - String::from( - r#" -{ - "Records": [ - { - "eventVersion": "2.1", - "eventSource": "aws:s3", - "awsRegion": "us-east-2", - "eventTime": "2019-09-03T19:37:27.192Z", - "eventName": "ObjectCreated:Put", - "userIdentity": { - "principalId": "AWS:AIDAINPONIXQXHT3IKHL2" - }, - "requestParameters": { - "sourceIPAddress": "205.255.255.255" - }, - "responseElements": { - "x-amz-request-id": "D82B88E5F771F645", - "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" - }, - "s3": { - "s3SchemaVersion": "1.0", - "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", - "bucket": { - "name": "my-bucket", - "ownerIdentity": { - "principalId": "A3I5XTEXAMAI3E" - }, - "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" - }, - "object": { - "key": "somepath/date=2021-04-16/afile.json", - "size": 1305107, - "eTag": "b21b84d653bb07b05b1e6b33684dc11b", - "sequencer": "0C0F6F405D6ED209E1" - } - } - } - ] -}"#, - ) - } - - #[tokio::test] - async fn test_s3_event_handler() { - let event: S3Event = - serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let result = s3_event_handler(event, Context::default()) - .await - .expect("Failed to run event handler"); - assert_eq!("{}", result); - } -}