Working on the means of actually configuring the desired behavior

This commit is contained in:
R Tyler Croy 2021-04-17 15:37:47 -07:00
parent 922a08efdc
commit 4fe1319e27
5 changed files with 131 additions and 8 deletions

40
Cargo.lock generated
View File

@ -368,8 +368,11 @@ dependencies = [
"log",
"parquet",
"pretty_env_logger",
"regex",
"serde",
"serde_json",
"serde_regex",
"serde_yaml",
"tokio",
]
@ -910,6 +913,12 @@ version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
[[package]]
name = "linked-hash-map"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "log"
version = "0.4.14"
@ -1636,6 +1645,16 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_regex"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8136f1a4ea815d7eac4101cfd0b16dc0cb5e1fe1b8609dfd728058656b7badf"
dependencies = [
"regex",
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.6.1"
@ -1648,6 +1667,18 @@ dependencies = [
"url",
]
[[package]]
name = "serde_yaml"
version = "0.8.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15654ed4ab61726bf918a39cb8d98a2e2995b002387807fa6ba58fdf7f59bb23"
dependencies = [
"dtoa",
"linked-hash-map",
"serde",
"yaml-rust",
]
[[package]]
name = "sha1"
version = "0.6.0"
@ -2264,6 +2295,15 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a"
[[package]]
name = "yaml-rust"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "zeroize"
version = "1.2.0"

View File

@ -12,6 +12,9 @@ lambda_runtime = "0.3"
log = "0.4"
parquet = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" }
pretty_env_logger = "0.4"
tokio = { version = "1.0", features = ["macros"]}
regex = "1"
serde = { version = "1", features = ["rc", "derive"]}
serde_json = "1"
serde_yaml = "0.8"
serde_regex = "1"
tokio = { version = "1.0", features = ["macros"]}

16
config.yml Normal file
View File

@ -0,0 +1,16 @@
# This is an example configuration for the delta-s3-loader lambda
---
sources:
# This entry maps the existing S3 structure into a delta table named
# `logs.audit_logs` which will be date stamp partitioned
#
# └── simple
# └── date=2021-04-16
# └── auditlog-1234.json
- bucket: 'my-data-bucket'
# Everything in the simple/ prefix will be considered part of this table
prefix: 'simple'
partitions:
- 'date'
database: 'logs'
table: 'audit_logs'

57
src/config.rs Normal file
View File

@ -0,0 +1,57 @@
/**
* The config module is largely responsible for just reading a configuration yaml file in.
*/
use regex::Regex;
use std::path::PathBuf;
use serde::Deserialize;
/**
* Root configuration
*/
#[derive(Clone, Debug, Deserialize)]
struct Config {
sources: Vec<Source>,
}
impl Config {
/**
* from_file will attempt to load a configuration from the given path
*
* The path should be to a properly formatted YAML file:
*/
fn from_file(path: &PathBuf) -> Result<Config, std::io::Error> {
use std::io::{Error, ErrorKind};
use std::fs::File;
let reader = File::open(path)?;
serde_yaml::from_reader(reader)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
}
/**
* A source maps an existing S3 structure into a named Delta table
*/
#[derive(Clone, Debug, Deserialize)]
struct Source {
bucket: String,
#[serde(with = "serde_regex")]
prefix: Regex,
partitions: Vec<String>,
database: String,
table: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_file() -> Result<(), std::io::Error> {
let config = Config::from_file(&PathBuf::from("./config.yml"))?;
assert_eq!(config.sources.len(), 1);
Ok(())
}
}

View File

@ -6,11 +6,15 @@ use aws_lambda_events::event::s3::S3Event;
use lambda_runtime::{handler_fn, Context, Error};
use log::*;
mod config;
mod writer;
#[tokio::main]
async fn main() -> Result<(), Error> {
#[cfg(debug_assertions)]
pretty_env_logger::init();
info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]);
let func = handler_fn(s3_event_handler);
@ -49,9 +53,8 @@ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error
mod tests {
use super::*;
#[tokio::test]
async fn test_s3_event_handler() {
let buf = r#"
fn sample_event() -> String {
String::from(r#"
{
"Records": [
{
@ -74,14 +77,14 @@ mod tests {
"s3SchemaVersion": "1.0",
"configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
"bucket": {
"name": "lambda-artifacts-deafc19498e3f2df",
"name": "my-bucket",
"ownerIdentity": {
"principalId": "A3I5XTEXAMAI3E"
},
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
},
"object": {
"key": "b21b84d653bb07b05b1e6b33684dc11b",
"key": "date=2021-04-16/afile.json",
"size": 1305107,
"eTag": "b21b84d653bb07b05b1e6b33684dc11b",
"sequencer": "0C0F6F405D6ED209E1"
@ -89,8 +92,12 @@ mod tests {
}
}
]
}"#;
let event: S3Event = serde_json::from_str(&buf).expect("Failed to deserialize event");
}"#)
}
#[tokio::test]
async fn test_s3_event_handler() {
let event: S3Event = serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler");
assert_eq!("{}", result);
}