Compare commits

...

6 Commits

Author SHA1 Message Date
R Tyler Croy d26d27950c Playing around with reading the file from S3
This structure I don't like, I need to think a bit more about how to structure
the processor code
2021-06-13 09:43:26 -07:00
R Tyler Croy 2667088175 Properly build with the deltalake dependency and my features branch 2021-06-12 15:21:09 -07:00
R Tyler Croy 559c977798 Tinkering around but consuming messages from SQS in standalone mode
Needed to do some manual testing first to ensure that this would properly
consume from SQS and handle the events properly
2021-06-12 12:22:24 -07:00
R Tyler Croy 5f10b2d86d Disable the deltalake dependency and pin rusoto to rustls
Native TLS always becomes a huge pain in the ass when building lambda functions.
Rustls is more reliable of a build, but it seems that I need to thread that
`rustls` feature through the deltalake crate
2021-06-12 10:24:14 -07:00
R Tyler Croy 2f5f68b35c Load environment variables optionally from .env
This is most helpful for local testing 😸
2021-06-12 09:48:34 -07:00
R Tyler Croy fe76b31955 Properly compile the lambda feature in
This commit also removes some unnecessary code which I decided was no longer of
use since S3 Event Notifications can have filters described on them upstream,
reducing the work that is necessary by Delta S3 Loader
2021-06-11 15:42:47 -07:00
5 changed files with 510 additions and 505 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target
*.swp
.env

685
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,16 +5,23 @@ authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
[dependencies]
anyhow = "*"
# This crate is not only usesful for Lambda since it has an S3Event structure
# which is useful in the standalone mode as well
aws_lambda_events = "*"
arrow = "4"
clap = "2"
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "mixing-in-deltalake_ext", features = ["s3"] }
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "features-for-rustls", features = ["s3-rustls"] }
dotenv = "0.15"
form_urlencoded = "1"
log = "0.4"
parquet = "4"
pretty_env_logger = "0.4"
regex = "1"
rusoto_core = "*"
rusoto_core = { version = "*", default-features = false, features = ["rustls"] }
rusoto_credential = "*"
rusoto_s3 = "*"
rusoto_sqs = { version = "*", default-features = false, features = ["serialize_structs", "deserialize_structs", "rustls"]}
rusoto_s3 = { version = "*", default-features = false, features = ["rustls"]}
serde = { version = "1", features = ["rc", "derive"]}
serde_json = "1"
serde_yaml = "0.8"
@ -23,8 +30,6 @@ tokio = { version = "1.0", features = ["macros"]}
# Needed for the lambda feature
lambda_runtime = { version = "0.3", optional = true }
aws_lambda_events = { version = "0.4", optional = true }
[features]
lambda = ["lambda_runtime", "aws_lambda_events"]
lambda = ["lambda_runtime"]

View File

@ -1,5 +1,6 @@
use aws_lambda_events::event::s3::{S3Event, S3EventRecord};
use aws_lambda_events::event::s3::S3Event;
use lambda_runtime::{Context, Error};
use log::*;
/**
* The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and
@ -7,10 +8,6 @@ use lambda_runtime::{Context, Error};
* <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html>
*/
pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> {
// Unfortunately there's not a good way to avoid reload the configuration every time. At least
// as far as I can tell right now
let _conf = config::Config::from_file(&PathBuf::from("./config.yml"))?;
for record in event.records {
if let Some(ref name) = record.event_name {
trace!("Processing an event named: {}", name);
@ -30,87 +27,6 @@ pub async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, E
Ok("{}".to_string())
}
/**
* The match_record function will look at a given S3EventRecord to see if the contents of that
* record match a specific source configured for the Lambda
*
* Each of the records passed is assumed to be an `ObjectCreated:Put`
*/
fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option<MatchedFile> {
if let Some(bucket) = &record.s3.bucket.name {
for source in &conf.sources {
if bucket != &source.bucket {
continue;
}
if let Some(key) = &record.s3.object.key {
if !source.prefix.is_match(key) {
continue;
}
let expected_partitions = source.partitions.len();
/* If there are no partitions expected, then we've already matched
*/
if expected_partitions == 0 {
return Some(MatchedFile::new(source, key, vec![]));
}
/*
* At this point the key and the prefix have matched, now to ensure
* that the data is partitioned according to the configuration
*/
let partitions = Partition::from_path_str(key);
debug!(
"Partitions identified from record: {} -- {:?}",
key, partitions
);
// No match if the discovered partitions don't match the count
if partitions.len() != expected_partitions {
continue;
}
for index in 0..expected_partitions {
// The partition at the index doesn't match what we're looking for
if partitions[index].name != source.partitions[index] {
return None;
}
}
return Some(MatchedFile::new(source, key, partitions));
}
}
}
None
}
/**
* A MatchedFile will contain all the necessary information for reading the object and allowing a
* Delta write to occur
*/
struct MatchedFile {
/// The source bucket containing the file to read
bucket: String,
/// The source object to read, expected to be JSON
object: String,
/// Partitions to include in the Delta write
partitions: Vec<Partition>,
/// The output path for the Delta writer to use
output_path: String,
}
impl MatchedFile {
fn new(source: &config::Source, key: &str, partitions: Vec<Partition>) -> Self {
MatchedFile {
bucket: source.bucket.clone(),
object: key.to_string(),
partitions: partitions,
output_path: source.tablepath.clone(),
}
}
}
/**
* A Partition is a simple struct to carry values from paths like:
* just/a/path/year=2021/mydata.json
@ -162,67 +78,6 @@ mod tests {
assert_eq!(partitions[0].value, "2021");
}
/**
* Make sure the sample event, which doesn't match a configured entry, doesn't return a source
*/
#[test]
fn test_match_record_fail() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = test_config();
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
#[test]
fn test_match_record_ok() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
partitions: vec!["date".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_some());
}
#[test]
fn test_match_record_prefix_mismatch() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^scoobydoo").expect("Failed to compile test regex"),
partitions: vec!["date".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
#[test]
fn test_match_record_partition_mismatch() {
let event: S3Event =
serde_json::from_str(&sample_event()).expect("Failed to deserialize event");
let conf = config::Config::new(vec![config::Source {
bucket: "my-bucket".to_string(),
prefix: regex::Regex::new("^somepath").expect("Failed to compile test regex"),
partitions: vec!["year".to_string()],
tablepath: "s3://test".to_string(),
}]);
let matches = match_record(&event.records[0], &conf);
assert!(matches.is_none());
}
/**
* Load the config.yml for tests
*/
fn test_config() -> config::Config {
config::Config::from_file(&PathBuf::from("./config.yml"))
.expect("Failed to load configuration for tests")
}
fn sample_event() -> String {
String::from(
r#"

View File

@ -1,14 +1,22 @@
/*
* The bulk of the application
*/
use clap::{App, Arg};
use aws_lambda_events::event::s3::S3Event;
use clap::{App, Arg, ArgMatches};
use log::*;
use tokio::io::AsyncReadExt;
#[cfg(lambda)]
#[cfg(feature = "lambda")]
mod lambda;
mod writer;
fn preboot() {
/**
* Preboot is responsible for the processing of arguments and other initialization
* tasks common to both the standalone and lambda modes of operation
*/
fn preboot() -> ArgMatches<'static> {
dotenv::dotenv().ok();
#[cfg(debug_assertions)]
pretty_env_logger::init();
@ -16,7 +24,8 @@ fn preboot() {
"Initializing delta-s3-loader v{}",
env!["CARGO_PKG_VERSION"]
);
let _matches = App::new("delta-s3-loader")
App::new("delta-s3-loader")
.version(env!["CARGO_PKG_VERSION"])
.author("rtyler@brokenco.de")
.about("Watch S3 buckets for new data to load into a Delta Table")
@ -40,22 +49,90 @@ fn preboot() {
.takes_value(true),
)
.arg(
Arg::with_name("SQS queue ARN")
Arg::with_name("queue")
.short("q")
.long("queue")
.value_name("QUEUE_ARN")
.help("ARN of the SQS queue to consume, *required* in standalone mode")
.env("QUEUE_URL")
.value_name("URL")
.help("URL of the SQS queue to consume, *required* in standalone mode")
.takes_value(true),
)
.get_matches();
.get_matches()
}
#[cfg(not(lambda))]
fn main() {
preboot();
#[cfg(not(feature = "lambda"))]
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
use rusoto_core::*;
use rusoto_sqs::*;
let args = preboot();
if let Some(queue) = args.value_of("queue") {
info!("Running in standalone mode with the queue: {}", queue);
let client = SqsClient::new(Region::default());
loop {
let rcv = ReceiveMessageRequest {
queue_url: queue.to_string(),
// Always consuming the max number of available messages to reduce round trips
max_number_of_messages: Some(10),
// This wait time is being used as the effective loop interval for the main runloop
wait_time_seconds: Some(10),
..Default::default()
};
let res = client.receive_message(rcv).await?;
trace!("Message(s) received: {:?}", res);
if let Some(messages) = &res.messages {
for message in messages {
let mut should_delete = true;
if let Some(body) = &message.body {
match serde_json::from_str::<S3Event>(&body) {
Ok(event) => {
debug!("Parsed an event to do something with: {:?}", event);
should_delete = match process_event(&event).await {
Ok(_) => true,
Err(e) => {
error!("Failure when processing event: {}", e);
false
}
};
}
Err(err) => {
error!("Failed to deserialize what should have been an S3Event: {:?} - {}", &message.message_id, err);
}
}
} else {
warn!("Message had no body: {:?}", &message);
}
if should_delete {
match &message.receipt_handle {
Some(receipt) => {
let delete = DeleteMessageRequest {
queue_url: queue.to_string(),
receipt_handle: receipt.to_string(),
};
// TODO: More gracefully handle this error
client.delete_message(delete).await?;
}
None => {
warn!("Somehow a message without a receipt handle was received!");
}
}
}
}
}
}
} else {
panic!("When running in standalone mode the `queue` argument (or QUEUE_URL env variable) must be present");
}
}
#[cfg(lambda)]
#[cfg(feature = "lambda")]
#[tokio::main]
async fn main() -> Result<(), lambda_runtime::Error> {
preboot();
@ -63,3 +140,65 @@ async fn main() -> Result<(), lambda_runtime::Error> {
lambda_runtime::run(func).await?;
Ok(())
}
/**
* URL decode the given string
*/
fn urldecode(buf: &str) -> String {
use form_urlencoded::parse;
parse(buf.as_bytes())
.map(|(key, val)| [key, val].concat())
.collect::<String>()
}
/**
* Process an event
*/
async fn process_event(event: &S3Event) -> Result<(), anyhow::Error> {
use rusoto_core::*;
use rusoto_s3::*;
let client = S3Client::new(Region::default());
for record in &event.records {
// NOTE: This is gross, there's got to be a better way
let bucket = record.s3.bucket.name.as_ref().unwrap().to_string();
// Key names with partitions will be url encoded which means they need to
// be processed first
let key = urldecode(record.s3.object.key.as_ref().unwrap());
let get = GetObjectRequest {
key,
bucket,
..Default::default()
};
trace!("request: {:?}", get);
let result = client.get_object(get).await?;
trace!("result: {:?}", result);
if let Some(stream) = result.body {
let mut buf = Vec::new();
stream.into_async_read().read_to_end(&mut buf).await;
} else {
warn!("Object had no body, somehow");
}
}
Err(anyhow::Error::msg("Fail"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_urldecode_with_encodes() {
let buf = "date%3D2021-03-12/auditlog-1235.json";
assert_eq!("date=2021-03-12/auditlog-1235.json", urldecode(&buf));
}
#[test]
fn test_urldecode_normal() {
let buf = "raw/auditlog-1235.json";
assert_eq!("raw/auditlog-1235.json", urldecode(&buf));
}
}