diff --git a/config.yml b/config.yml index 681aed4..ddebe1b 100644 --- a/config.yml +++ b/config.yml @@ -7,11 +7,16 @@ sources: # └── simple # └── date=2021-04-16 # └── auditlog-1234.json - - bucket: 'data' + # The name of the bucket should _just_ be a name of the bucket since this + # will come from an AWS Lambda event + - bucket: 'raw' # Everything in the simple/ prefix will be considered part of this table # the prefix may be a regular expression that is compatible with the Rust # regex crate - prefix: '^raw/simple' + prefix: '^simple' + # When specifying partitions, the source data is expected to be parttioned, + # as it is in this example partitions: - 'date' - tablepath: 's3://data/delta/audit_logs' + # The table path is the destination for the written delta + tablepath: 's3://delta/audit_logs' diff --git a/src/config.rs b/src/config.rs index 937ca5b..f8dce73 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,45 +2,46 @@ * The config module is largely responsible for just reading a configuration yaml file in. */ use regex::Regex; -use std::path::PathBuf; use serde::Deserialize; +use std::path::PathBuf; /** * Root configuration */ #[derive(Clone, Debug, Deserialize)] -struct Config { - sources: Vec, +pub struct Config { + pub sources: Vec, } impl Config { + pub fn new(sources: Vec) -> Config { + Config { sources } + } /** * from_file will attempt to load a configuration from the given path * * The path should be to a properly formatted YAML file: */ - fn from_file(path: &PathBuf) -> Result { - use std::io::{Error, ErrorKind}; + pub fn from_file(path: &PathBuf) -> Result { use std::fs::File; + use std::io::{Error, ErrorKind}; let reader = File::open(path)?; - serde_yaml::from_reader(reader) - .map_err(|e| Error::new(ErrorKind::Other, e)) + serde_yaml::from_reader(reader).map_err(|e| Error::new(ErrorKind::Other, e)) } } - /** * A source maps an existing S3 structure into a named Delta table */ #[derive(Clone, Debug, Deserialize)] -struct Source { - bucket: String, +pub struct Source { + pub bucket: String, #[serde(with = "serde_regex")] - prefix: Regex, - partitions: Vec, - tablepath: String, + pub prefix: Regex, + pub partitions: Vec, + pub tablepath: String, } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index 459da10..c1305f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,20 +2,23 @@ * The bulk of the application */ -use aws_lambda_events::event::s3::S3Event; +use aws_lambda_events::event::s3::{S3Event, S3EventRecord}; use lambda_runtime::{handler_fn, Context, Error}; use log::*; +use std::path::PathBuf; mod config; mod writer; - #[tokio::main] async fn main() -> Result<(), Error> { #[cfg(debug_assertions)] pretty_env_logger::init(); - info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]); + info!( + "Initializing delta-s3-loader v{}", + env!["CARGO_PKG_VERSION"] + ); let func = handler_fn(s3_event_handler); lambda_runtime::run(func).await?; @@ -28,19 +31,21 @@ async fn main() -> Result<(), Error> { * */ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result { + // Unfortunately there's not a good way to avoid reload the configuration every time. At least + // as far as I can tell right now + let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?; for record in event.records { if let Some(ref name) = record.event_name { trace!("Processing an event named: {}", name); /* - * The only events that delta-s3-loader is interested in are new PUTs which - * indicate a new file must be processed. - */ + * The only events that delta-s3-loader is interested in are new PUTs which + * indicate a new file must be processed. + */ if name == "ObjectCreated:Put" { trace!("Processing record: {:?}", record); } - } - else { + } else { warn!("Received a record without a name: {:?}", record); } } @@ -49,12 +54,64 @@ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result( + record: &'a S3EventRecord, + conf: &'a config::Config, +) -> Option<&'a config::Source> { + if let Some(bucket) = &record.s3.bucket.name { + for source in &conf.sources { + if bucket == &source.bucket { + return Some(source); + } + } + } + None +} + #[cfg(test)] mod tests { use super::*; + /** + * Make sure the sample event, which doesn't match a configured entry, doesn't return a source + */ + #[test] + fn test_match_record_fail() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = test_config(); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_none()); + } + + #[test] + fn test_match_record_ok() { + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let conf = config::Config::new(vec![config::Source { + bucket: "my-bucket".to_string(), + prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"), + partitions: vec!["date".to_string()], + tablepath: "s3://test".to_string(), + }]); + let matches = match_record(&event.records[0], &conf); + assert!(matches.is_some()); + } + + /** + * Load the config.yml for tests + */ + fn test_config() -> config::Config { + config::Config::from_file(&PathBuf::from("./config.yml")) + .expect("Failed to load configuration for tests") + } fn sample_event() -> String { - String::from(r#" + String::from( + r#" { "Records": [ { @@ -84,7 +141,7 @@ mod tests { "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" }, "object": { - "key": "date=2021-04-16/afile.json", + "key": "somepath/date=2021-04-16/afile.json", "size": 1305107, "eTag": "b21b84d653bb07b05b1e6b33684dc11b", "sequencer": "0C0F6F405D6ED209E1" @@ -92,14 +149,17 @@ mod tests { } } ] -}"#) +}"#, + ) } #[tokio::test] async fn test_s3_event_handler() { - let event: S3Event = serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); - let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler"); + let event: S3Event = + serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); + let result = s3_event_handler(event, Context::default()) + .await + .expect("Failed to run event handler"); assert_eq!("{}", result); } } - diff --git a/src/writer.rs b/src/writer.rs index 07ea97b..8181eb9 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -2,7 +2,6 @@ * The writer module contains the important code for actually writing to a Delta Lake table * */ - use arrow::record_batch::RecordBatch; use serde_json::Value; @@ -10,13 +9,12 @@ enum WriterError { Generic, } -fn json_to_batch(json: Vec) -> Result { +fn json_to_batch(json: Vec) -> Result { use arrow::json::reader::*; // infer_json_schema_from_iterator is weird in that it expects each value to be wrapped in a // Result - let schema = infer_json_schema_from_iterator( - json.into_iter().map(|v| Ok(v))); + let schema = infer_json_schema_from_iterator(json.into_iter().map(|v| Ok(v))); println!("schema: {:#?}", schema); @@ -27,15 +25,18 @@ fn json_to_batch(json: Vec) -> Result { mod tests { use super::*; + #[ignore] #[test] fn demo() { let _delta = deltalake::get_backend_for_uri("./data/simple"); todo!("Still need ta high level writer test"); } + #[ignore] #[test] fn json_to_arrow_success() { - let value: Vec = serde_json::from_str(r#" + let value: Vec = serde_json::from_str( + r#" [ { "action" : "commit", @@ -46,8 +47,9 @@ mod tests { "actor" : "rtyler" } ] - "#).expect("Failed to create JSON"); - + "#, + ) + .expect("Failed to create JSON"); let result = json_to_batch(value); assert!(result.is_ok());