Tinkering with wiring through the events into a processing/matching loop

This will help filter out events which are not important
This commit is contained in:
R Tyler Croy 2021-04-21 11:05:19 -07:00
parent 95109ae12d
commit 3386acc3b6
4 changed files with 105 additions and 37 deletions

View File

@ -7,11 +7,16 @@ sources:
# └── simple # └── simple
# └── date=2021-04-16 # └── date=2021-04-16
# └── auditlog-1234.json # └── auditlog-1234.json
- bucket: 'data' # The name of the bucket should _just_ be a name of the bucket since this
# will come from an AWS Lambda event
- bucket: 'raw'
# Everything in the simple/ prefix will be considered part of this table # Everything in the simple/ prefix will be considered part of this table
# the prefix may be a regular expression that is compatible with the Rust # the prefix may be a regular expression that is compatible with the Rust
# regex crate # regex crate
prefix: '^raw/simple' prefix: '^simple'
# When specifying partitions, the source data is expected to be parttioned,
# as it is in this example
partitions: partitions:
- 'date' - 'date'
tablepath: 's3://data/delta/audit_logs' # The table path is the destination for the written delta
tablepath: 's3://delta/audit_logs'

View File

@ -2,45 +2,46 @@
* The config module is largely responsible for just reading a configuration yaml file in. * The config module is largely responsible for just reading a configuration yaml file in.
*/ */
use regex::Regex; use regex::Regex;
use std::path::PathBuf;
use serde::Deserialize; use serde::Deserialize;
use std::path::PathBuf;
/** /**
* Root configuration * Root configuration
*/ */
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
struct Config { pub struct Config {
sources: Vec<Source>, pub sources: Vec<Source>,
} }
impl Config { impl Config {
pub fn new(sources: Vec<Source>) -> Config {
Config { sources }
}
/** /**
* from_file will attempt to load a configuration from the given path * from_file will attempt to load a configuration from the given path
* *
* The path should be to a properly formatted YAML file: * The path should be to a properly formatted YAML file:
*/ */
fn from_file(path: &PathBuf) -> Result<Config, std::io::Error> { pub fn from_file(path: &PathBuf) -> Result<Config, std::io::Error> {
use std::io::{Error, ErrorKind};
use std::fs::File; use std::fs::File;
use std::io::{Error, ErrorKind};
let reader = File::open(path)?; let reader = File::open(path)?;
serde_yaml::from_reader(reader) serde_yaml::from_reader(reader).map_err(|e| Error::new(ErrorKind::Other, e))
.map_err(|e| Error::new(ErrorKind::Other, e))
} }
} }
/** /**
* A source maps an existing S3 structure into a named Delta table * A source maps an existing S3 structure into a named Delta table
*/ */
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
struct Source { pub struct Source {
bucket: String, pub bucket: String,
#[serde(with = "serde_regex")] #[serde(with = "serde_regex")]
prefix: Regex, pub prefix: Regex,
partitions: Vec<String>, pub partitions: Vec<String>,
tablepath: String, pub tablepath: String,
} }
#[cfg(test)] #[cfg(test)]

View File

@ -2,20 +2,23 @@
* The bulk of the application * The bulk of the application
*/ */
use aws_lambda_events::event::s3::S3Event; use aws_lambda_events::event::s3::{S3Event, S3EventRecord};
use lambda_runtime::{handler_fn, Context, Error}; use lambda_runtime::{handler_fn, Context, Error};
use log::*; use log::*;
use std::path::PathBuf;
mod config; mod config;
mod writer; mod writer;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
pretty_env_logger::init(); pretty_env_logger::init();
info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]); info!(
"Initializing delta-s3-loader v{}",
env!["CARGO_PKG_VERSION"]
);
let func = handler_fn(s3_event_handler); let func = handler_fn(s3_event_handler);
lambda_runtime::run(func).await?; lambda_runtime::run(func).await?;
@ -28,19 +31,21 @@ async fn main() -> Result<(), Error> {
* <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html> * <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html>
*/ */
async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> { async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> {
// Unfortunately there's not a good way to avoid reload the configuration every time. At least
// as far as I can tell right now
let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?;
for record in event.records { for record in event.records {
if let Some(ref name) = record.event_name { if let Some(ref name) = record.event_name {
trace!("Processing an event named: {}", name); trace!("Processing an event named: {}", name);
/* /*
* The only events that delta-s3-loader is interested in are new PUTs which * The only events that delta-s3-loader is interested in are new PUTs which
* indicate a new file must be processed. * indicate a new file must be processed.
*/ */
if name == "ObjectCreated:Put" { if name == "ObjectCreated:Put" {
trace!("Processing record: {:?}", record); trace!("Processing record: {:?}", record);
} }
} } else {
else {
warn!("Received a record without a name: {:?}", record); warn!("Received a record without a name: {:?}", record);
} }
} }
@ -49,12 +54,64 @@ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error
Ok("{}".to_string()) Ok("{}".to_string())
} }
/**
* The match_record function will look at a given S3EventRecord to see if the contents of that
* record match a specific source configured for the Lambda
*/
fn match_record<'a>(
record: &'a S3EventRecord,
conf: &'a config::Config,
) -> Option<&'a config::Source> {
if let Some(bucket) = &record.s3.bucket.name {
for source in &conf.sources {
if bucket == &source.bucket {
return Some(source);
}
}
}
None
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
/**
* Make sure the sample event, which doesn't match a configured entry, doesn't return a source
*/
#[test]
fn test_match_record_fail() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = test_config();
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
#[test]
fn test_match_record_ok() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
partitions: vec!["date".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_some());
}
/**
* Load the config.yml for tests
*/
fn test_config() -> config::Config {
config::Config::from_file(&PathBuf::from("./config.yml"))
.expect("Failed to load configuration for tests")
}
fn sample_event() -> String { fn sample_event() -> String {
String::from(r#" String::from(
r#"
{ {
"Records": [ "Records": [
{ {
@ -84,7 +141,7 @@ mod tests {
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
}, },
"object": { "object": {
"key": "date=2021-04-16/afile.json", "key": "somepath/date=2021-04-16/afile.json",
"size": 1305107, "size": 1305107,
"eTag": "b21b84d653bb07b05b1e6b33684dc11b", "eTag": "b21b84d653bb07b05b1e6b33684dc11b",
"sequencer": "0C0F6F405D6ED209E1" "sequencer": "0C0F6F405D6ED209E1"
@ -92,14 +149,17 @@ mod tests {
} }
} }
] ]
}"#) }"#,
)
} }
#[tokio::test] #[tokio::test]
async fn test_s3_event_handler() { async fn test_s3_event_handler() {
let event: S3Event = serde_json::from_str(&sample_event()).expect("Failed to deserialize event"); let event: S3Event =
let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler"); serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let result = s3_event_handler(event, Context::default())
.await
.expect("Failed to run event handler");
assert_eq!("{}", result); assert_eq!("{}", result);
} }
} }

View File

@ -2,7 +2,6 @@
* The writer module contains the important code for actually writing to a Delta Lake table * The writer module contains the important code for actually writing to a Delta Lake table
* *
*/ */
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use serde_json::Value; use serde_json::Value;
@ -10,13 +9,12 @@ enum WriterError {
Generic, Generic,
} }
fn json_to_batch(json: Vec<Value>) -> Result<RecordBatch, WriterError> { fn json_to_batch(json: Vec<Value>) -> Result<RecordBatch, WriterError> {
use arrow::json::reader::*; use arrow::json::reader::*;
// infer_json_schema_from_iterator is weird in that it expects each value to be wrapped in a // infer_json_schema_from_iterator is weird in that it expects each value to be wrapped in a
// Result // Result
let schema = infer_json_schema_from_iterator( let schema = infer_json_schema_from_iterator(json.into_iter().map(|v| Ok(v)));
json.into_iter().map(|v| Ok(v)));
println!("schema: {:#?}", schema); println!("schema: {:#?}", schema);
@ -27,15 +25,18 @@ fn json_to_batch(json: Vec<Value>) -> Result<RecordBatch, WriterError> {
mod tests { mod tests {
use super::*; use super::*;
#[ignore]
#[test] #[test]
fn demo() { fn demo() {
let _delta = deltalake::get_backend_for_uri("./data/simple"); let _delta = deltalake::get_backend_for_uri("./data/simple");
todo!("Still need ta high level writer test"); todo!("Still need ta high level writer test");
} }
#[ignore]
#[test] #[test]
fn json_to_arrow_success() { fn json_to_arrow_success() {
let value: Vec<serde_json::Value> = serde_json::from_str(r#" let value: Vec<serde_json::Value> = serde_json::from_str(
r#"
[ [
{ {
"action" : "commit", "action" : "commit",
@ -46,8 +47,9 @@ mod tests {
"actor" : "rtyler" "actor" : "rtyler"
} }
] ]
"#).expect("Failed to create JSON"); "#,
)
.expect("Failed to create JSON");
let result = json_to_batch(value); let result = json_to_batch(value);
assert!(result.is_ok()); assert!(result.is_ok());