diff --git a/Cargo.lock b/Cargo.lock index 191cc48..049e948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,8 +368,11 @@ dependencies = [ "log", "parquet", "pretty_env_logger", + "regex", "serde", "serde_json", + "serde_regex", + "serde_yaml", "tokio", ] @@ -910,6 +913,12 @@ version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" +[[package]] +name = "linked-hash-map" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" + [[package]] name = "log" version = "0.4.14" @@ -1636,6 +1645,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_regex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf" +dependencies = [ + "regex", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.6.1" @@ -1648,6 +1667,18 @@ dependencies = [ "url", ] +[[package]] +name = "serde_yaml" +version = "0.8.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15654ed4ab61726bf918a39cb8d98a2e2995b002387807fa6ba58fdf7f59bb23" +dependencies = [ + "dtoa", + "linked-hash-map", + "serde", + "yaml-rust", +] + [[package]] name = "sha1" version = "0.6.0" @@ -2264,6 +2295,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "zeroize" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 06b9016..440acc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,9 @@ lambda_runtime = "0.3" log = "0.4" parquet = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" } pretty_env_logger = "0.4" -tokio = { version = "1.0", features = ["macros"]} +regex = "1" serde = { version = "1", features = ["rc", "derive"]} serde_json = "1" +serde_yaml = "0.8" +serde_regex = "1" +tokio = { version = "1.0", features = ["macros"]} diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..e7fdf1a --- /dev/null +++ b/config.yml @@ -0,0 +1,16 @@ +# This is an example configuration for the delta-s3-loader lambda +--- +sources: +# This entry maps the existing S3 structure into a delta table named +# `logs.audit_logs` which will be date stamp partitioned +# +# └── simple +# └── date=2021-04-16 +# └── auditlog-1234.json + - bucket: 'my-data-bucket' + # Everything in the simple/ prefix will be considered part of this table + prefix: 'simple' + partitions: + - 'date' + database: 'logs' + table: 'audit_logs' diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..21e1409 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,57 @@ +/** + * The config module is largely responsible for just reading a configuration yaml file in. + */ +use regex::Regex; +use std::path::PathBuf; +use serde::Deserialize; + +/** + * Root configuration + */ +#[derive(Clone, Debug, Deserialize)] +struct Config { + sources: Vec, +} + +impl Config { + /** + * from_file will attempt to load a configuration from the given path + * + * The path should be to a properly formatted YAML file: + */ + fn from_file(path: &PathBuf) -> Result { + use std::io::{Error, ErrorKind}; + use std::fs::File; + + let reader = File::open(path)?; + + serde_yaml::from_reader(reader) + .map_err(|e| Error::new(ErrorKind::Other, e)) + } +} + + +/** + * A source maps an existing S3 structure into a named Delta table + */ +#[derive(Clone, Debug, Deserialize)] +struct Source { + bucket: String, + #[serde(with = "serde_regex")] + prefix: Regex, + partitions: Vec, + database: String, + table: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_load_file() -> Result<(), std::io::Error> { + let config = Config::from_file(&PathBuf::from("./config.yml"))?; + assert_eq!(config.sources.len(), 1); + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index 2f7f478..459da10 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,15 @@ use aws_lambda_events::event::s3::S3Event; use lambda_runtime::{handler_fn, Context, Error}; use log::*; +mod config; mod writer; + #[tokio::main] async fn main() -> Result<(), Error> { + #[cfg(debug_assertions)] pretty_env_logger::init(); + info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]); let func = handler_fn(s3_event_handler); @@ -49,9 +53,8 @@ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result String { + String::from(r#" { "Records": [ { @@ -74,14 +77,14 @@ mod tests { "s3SchemaVersion": "1.0", "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", "bucket": { - "name": "lambda-artifacts-deafc19498e3f2df", + "name": "my-bucket", "ownerIdentity": { "principalId": "A3I5XTEXAMAI3E" }, "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" }, "object": { - "key": "b21b84d653bb07b05b1e6b33684dc11b", + "key": "date=2021-04-16/afile.json", "size": 1305107, "eTag": "b21b84d653bb07b05b1e6b33684dc11b", "sequencer": "0C0F6F405D6ED209E1" @@ -89,8 +92,12 @@ mod tests { } } ] -}"#; - let event: S3Event = serde_json::from_str(&buf).expect("Failed to deserialize event"); +}"#) + } + + #[tokio::test] + async fn test_s3_event_handler() { + let event: S3Event = serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler"); assert_eq!("{}", result); }