Properly compile the lambda feature in

This commit also removes some unnecessary code which I decided was no longer of
use since S3 Event Notifications can have filters described on them upstream,
reducing the work that is necessary by Delta S3 Loader
This commit is contained in:
R Tyler Croy 2021-06-11 15:42:47 -07:00
parent c20997b197
commit fe76b31955
2 changed files with 5 additions and 150 deletions

View File

@ -1,5 +1,6 @@
use aws_lambda_events::event::s3::{S3Event, S3EventRecord};
use aws_lambda_events::event::s3::S3Event;
use lambda_runtime::{Context, Error};
use log::*;
/**
* The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and
@ -7,10 +8,6 @@ use lambda_runtime::{Context, Error};
* <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html>
*/
pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> {
// Unfortunately there's not a good way to avoid reload the configuration every time. At least
// as far as I can tell right now
let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?;
for record in event.records {
if let Some(ref name) = record.event_name {
trace!("Processing an event named: {}", name);
@ -30,87 +27,6 @@ pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, E
Ok("{}".to_string())
}
/**
* The match_record function will look at a given S3EventRecord to see if the contents of that
* record match a specific source configured for the Lambda
*
* Each of the records passed is assumed to be an `ObjectCreated:Put`
*/
fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option<MatchedFile> {
if let Some(bucket) = &record.s3.bucket.name {
for source in &conf.sources {
if bucket != &source.bucket {
continue;
}
if let Some(key) = &record.s3.object.key {
if !source.prefix.is_match(key) {
continue;
}
let expected_partitions = source.partitions.len();
/* If there are no partitions expected, then we've already matched
*/
if expected_partitions == 0 {
return Some(MatchedFile::new(source, key, vec![]));
}
/*
* At this point the key and the prefix have matched, now to ensure
* that the data is partitioned according to the configuration
*/
let partitions = Partition::from_path_str(key);
debug!(
"Partitions identified from record: {} -- {:?}",
key, partitions
);
// No match if the discovered partitions don't match the count
if partitions.len() != expected_partitions {
continue;
}
for index in 0..expected_partitions {
// The partition at the index doesn't match what we're looking for
if partitions[index].name != source.partitions[index] {
return None;
}
}
return Some(MatchedFile::new(source, key, partitions));
}
}
}
None
}
/**
* A MatchedFile will contain all the necessary information for reading the object and allowing a
* Delta write to occur
*/
struct MatchedFile {
/// The source bucket containing the file to read
bucket: String,
/// The source object to read, expected to be JSON
object: String,
/// Partitions to include in the Delta write
partitions: Vec<Partition>,
/// The output path for the Delta writer to use
output_path: String,
}
impl MatchedFile {
fn new(source: &config::Source, key: &str, partitions: Vec<Partition>) -> Self {
MatchedFile {
bucket: source.bucket.clone(),
object: key.to_string(),
partitions: partitions,
output_path: source.tablepath.clone(),
}
}
}
/**
* A Partition is a simple struct to carry values from paths like:
* just/a/path/year=2021/mydata.json
@ -162,67 +78,6 @@ mod tests {
assert_eq!(partitions[0].value, "2021");
}
/**
* Make sure the sample event, which doesn't match a configured entry, doesn't return a source
*/
#[test]
fn test_match_record_fail() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = test_config();
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
#[test]
fn test_match_record_ok() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
partitions: vec!["date".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_some());
}
#[test]
fn test_match_record_prefix_mismatch() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"),
partitions: vec!["date".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
#[test]
fn test_match_record_partition_mismatch() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
partitions: vec!["year".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
/**
* Load the config.yml for tests
*/
fn test_config() -> config::Config {
config::Config::from_file(&PathBuf::from("./config.yml"))
.expect("Failed to load configuration for tests")
}
fn sample_event() -> String {
String::from(
r#"

View File

@ -4,7 +4,7 @@
use clap::{App, Arg};
use log::*;
#[cfg(lambda)]
#[cfg(feature = "lambda")]
mod lambda;
mod writer;
@ -50,12 +50,12 @@ fn preboot() {
.get_matches();
}
#[cfg(not(lambda))]
#[cfg(not(feature = "lambda"))]
fn main() {
preboot();
}
#[cfg(lambda)]
#[cfg(feature = "lambda")]
#[tokio::main]
async fn main() -> Result<(), lambda_runtime::Error> {
preboot();