From c20997b19765c3910cc8e63d069aca868d31fdf4 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 11 Jun 2021 11:50:22 -0700 Subject: [PATCH] Sketching out the design and implementation of Delta S3 Loader There is still more to think about but this is a reasonable checkpoint to commit and push for others to look at. --- Cargo.lock | 49 ++++++++++++++++++-- Cargo.toml | 4 ++ README.adoc | 126 ++++++++++++++++++++++++++++++++++++++++++++++++-- config.yml | 22 --------- src/config.rs | 57 ----------------------- src/main.rs | 39 ++++++++++++++-- 6 files changed, 207 insertions(+), 90 deletions(-) delete mode 100644 config.yml delete mode 100644 src/config.rs diff --git a/Cargo.lock b/Cargo.lock index 2e64ae3..b2fd252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,15 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + [[package]] name = "anyhow" version = "1.0.40" @@ -257,6 +266,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim 0.8.0", + "textwrap 0.11.0", + "unicode-width", + "vec_map", +] + [[package]] name = "clap" version = "3.0.0-beta.2" @@ -269,9 +293,9 @@ dependencies = [ "indexmap", "lazy_static", "os_str_bytes", - "strsim", + "strsim 0.10.0", "termcolor", - "textwrap", + "textwrap 0.12.1", "unicode-width", "vec_map", ] @@ -364,12 +388,16 @@ version = "0.1.0" dependencies = [ "arrow", "aws_lambda_events", + "clap 2.33.3", "deltalake", "lambda_runtime", "log", "parquet", "pretty_env_logger", "regex", + "rusoto_core", + "rusoto_credential", + "rusoto_s3", "serde", "serde_json", "serde_regex", @@ -388,7 +416,7 @@ dependencies = [ "bytes 1.0.1", "cfg-if", "chrono", - "clap", + "clap 3.0.0-beta.2", "env_logger 0.8.3", "errno", "futures", @@ -1817,6 +1845,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "strsim" version = "0.10.0" @@ -1863,6 +1897,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "textwrap" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index 48c08b0..6fd6f20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,11 +6,15 @@ edition = "2018" [dependencies] arrow = "4" +clap = "2" deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "mixing-in-deltalake_ext", features = ["s3"] } log = "0.4" parquet = "4" pretty_env_logger = "0.4" regex = "1" +rusoto_core = "*" +rusoto_credential = "*" +rusoto_s3 = "*" serde = { version = "1", features = ["rc", "derive"]} serde_json = "1" serde_yaml = "0.8" diff --git a/README.adoc b/README.adoc index 8818fd8..e5ee678 100644 --- a/README.adoc +++ b/README.adoc @@ -1,13 +1,133 @@ -= Delta S3 Loader +ifdef::env-github[] +:tip-caption: :bulb: +:note-caption: :information_source: +:important-caption: :heavy_exclamation_mark: +:caution-caption: :fire: +:warning-caption: :warning: +endif::[] +:toc: macro + += Delta S3 Loader Delta S3 Loader is a project to quickly and cheaply bring JSON files added to S3 buckets into Delta Lake. This can be highly useful for legacy or external processes which rely on uploading JSON to an S3 bucket and cannot be properly updated to write directly to link:https://delta.io[Delta Lake]. -Delta S3 Loader can be built into a stnadalone binary or an -link:https://aws.amazon.com/lambda/[AWS Lambda]. +toc::[] + +== Modes + +Delta S3 Loader can be built into a standalone binary or an +link:https://aws.amazon.com/lambda/[AWS Lambda]. While both modes are +functionally identical they have different configuration requirements as +mentioned below. + +=== Standalone + +A standalone instance of Delta S3 Loader requires: + +* Destination Delta table path +* SQS queue ARN + +=== Lambda + +When deployed with AWS Lambda, the Lambda function should be configured with an +AWS SQS trigger. This causes AWS to manage the queue on behalf of Delta S3 +Loader. Learn more in the +link:https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-lambda-function-trigger.html[AWS +Lambda trigger documentation]. + +== Design + +This project is designed to work in a Lambda or packaged up into a container. +It relies on +link:https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html[S3 +Event Notifications] which are delivered into an +link:http://aws.amazon.com/sqs/[Amazon SQS] queue. The S3 Event Notifications +should be configured to funnel events to a single SQS queue per table. The +Delta S3 Loader will take **all** messages from a single queue and insert those +into a single table. + +Additionally, for source buckets which have _multiple_ types of data in them you may use link:https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html[filtering on event notifications] to specify different object prefixes, etc. +For example, in a bucket named `audit_logs` bucket that has data prefixed with: + +* `databricks/workspaceId=123/*.json` +* `tableau/*.json` +* `admin_console/domain=github.com/*.json` + +A deployment of Delta S3 Loader to _only_ process the `admin_console` events +into an Delta table would require the following event configuration: + +.SQS Event Configuration +[source,xml] +---- + + + 1 + + + + prefix + admin_console/ + + + suffix + json + + + + arn:aws:sqs:us-west-2:444455556666:admin_console_audit_queue + s3:ObjectCreated:Put + + +---- + +[CAUTION] +==== +Always use different source and destination S3 buckets to avoid infinite loops! +==== + + +A standalone Delta S3 Loader invocation for the above queue might look something like: + + +[source,bash] +---- +delta-s3-loader -t s3://warehouse/audit_logs_raw/databricks \ # <1> + -p domain \ # <2> + -q "arn:aws:sqs:us-west-2:444455556666:admin_console_audit_queue" # <3> +---- +<1> Specify a destination Delta Lake table path in S3. +<2> Annotate the partition columns to help the loader partition data properly. +<3> Specify the input SQS queue by ARN + + +== Environment Variables + +When running in an AWS Lambda, Delta S3 Loader should be configured solely with environment variables. In a standalone mode the daemon can be configured with command line options _or_ environment variables + + +|=== +| Name | Required | Description + +| `RUST_LOG` +| No +| Define the log level for the process: `error`, `warn`, `info`, `debug`. + +|=== + + +=== Authentication/Authorization + +Delta S3 Loader assumes that the right AWS environment variables, such as +`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` are defined in the environment. +Under the hood the Delta S3 Loader is not responsible for +authentication/authorization so please consult the +link:https://github.com/rusoto/rusoto/blob/master/AWS-CREDENTIALS.md[Rusoto AWS +credentials documentation] for more information. + == Related work diff --git a/config.yml b/config.yml deleted file mode 100644 index ddebe1b..0000000 --- a/config.yml +++ /dev/null @@ -1,22 +0,0 @@ -# This is an example configuration for the delta-s3-loader lambda ---- -sources: -# This entry maps the existing S3 structure into a delta table named -# `logs.audit_logs` which will be date stamp partitioned -# -# └── simple -# └── date=2021-04-16 -# └── auditlog-1234.json - # The name of the bucket should _just_ be a name of the bucket since this - # will come from an AWS Lambda event - - bucket: 'raw' - # Everything in the simple/ prefix will be considered part of this table - # the prefix may be a regular expression that is compatible with the Rust - # regex crate - prefix: '^simple' - # When specifying partitions, the source data is expected to be parttioned, - # as it is in this example - partitions: - - 'date' - # The table path is the destination for the written delta - tablepath: 's3://delta/audit_logs' diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index f8dce73..0000000 --- a/src/config.rs +++ /dev/null @@ -1,57 +0,0 @@ -/** - * The config module is largely responsible for just reading a configuration yaml file in. - */ -use regex::Regex; -use serde::Deserialize; -use std::path::PathBuf; - -/** - * Root configuration - */ -#[derive(Clone, Debug, Deserialize)] -pub struct Config { - pub sources: Vec, -} - -impl Config { - pub fn new(sources: Vec) -> Config { - Config { sources } - } - /** - * from_file will attempt to load a configuration from the given path - * - * The path should be to a properly formatted YAML file: - */ - pub fn from_file(path: &PathBuf) -> Result { - use std::fs::File; - use std::io::{Error, ErrorKind}; - - let reader = File::open(path)?; - - serde_yaml::from_reader(reader).map_err(|e| Error::new(ErrorKind::Other, e)) - } -} - -/** - * A source maps an existing S3 structure into a named Delta table - */ -#[derive(Clone, Debug, Deserialize)] -pub struct Source { - pub bucket: String, - #[serde(with = "serde_regex")] - pub prefix: Regex, - pub partitions: Vec, - pub tablepath: String, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_load_file() -> Result<(), std::io::Error> { - let config = Config::from_file(&PathBuf::from("./config.yml"))?; - assert_eq!(config.sources.len(), 1); - Ok(()) - } -} diff --git a/src/main.rs b/src/main.rs index 0674bbd..766f0fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,12 @@ /* * The bulk of the application */ - +use clap::{App, Arg}; use log::*; -use std::path::PathBuf; -mod config; -mod writer; #[cfg(lambda)] mod lambda; +mod writer; fn preboot() { #[cfg(debug_assertions)] @@ -18,6 +16,38 @@ fn preboot() { "Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"] ); + let _matches = App::new("delta-s3-loader") + .version(env!["CARGO_PKG_VERSION"]) + .author("rtyler@brokenco.de") + .about("Watch S3 buckets for new data to load into a Delta Table") + .arg( + Arg::with_name("table") + .required(true) + .short("t") + .long("table") + .env("TABLE_PATH") + .value_name("TABLE_PATH") + .help("Sets the destination Delta Table for the ingested data") + .takes_value(true), + ) + .arg( + Arg::with_name("partitions") + .short("p") + .long("partitions") + .env("CSV_PARTITIONS") + .value_name("CSV_PARTITIONS") + .help("Ordered list of partition from the source data, e.g. year,month") + .takes_value(true), + ) + .arg( + Arg::with_name("SQS queue ARN") + .short("q") + .long("queue") + .value_name("QUEUE_ARN") + .help("ARN of the SQS queue to consume, *required* in standalone mode") + .takes_value(true), + ) + .get_matches(); } #[cfg(not(lambda))] @@ -25,7 +55,6 @@ fn main() { preboot(); } - #[cfg(lambda)] #[tokio::main] async fn main() -> Result<(), lambda_runtime::Error> {