Sketching out the design and implementation of Delta S3 Loader

There is still more to think about but this is a reasonable checkpoint to commit
and push for others to look at.
This commit is contained in:
R Tyler Croy 2021-06-11 11:50:22 -07:00
parent 88264d83c9
commit c20997b197
6 changed files with 207 additions and 90 deletions

49
Cargo.lock generated
View File

@ -30,6 +30,15 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.40"
@ -257,6 +266,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim 0.8.0",
"textwrap 0.11.0",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "3.0.0-beta.2"
@ -269,9 +293,9 @@ dependencies = [
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim",
"strsim 0.10.0",
"termcolor",
"textwrap",
"textwrap 0.12.1",
"unicode-width",
"vec_map",
]
@ -364,12 +388,16 @@ version = "0.1.0"
dependencies = [
"arrow",
"aws_lambda_events",
"clap 2.33.3",
"deltalake",
"lambda_runtime",
"log",
"parquet",
"pretty_env_logger",
"regex",
"rusoto_core",
"rusoto_credential",
"rusoto_s3",
"serde",
"serde_json",
"serde_regex",
@ -388,7 +416,7 @@ dependencies = [
"bytes 1.0.1",
"cfg-if",
"chrono",
"clap",
"clap 3.0.0-beta.2",
"env_logger 0.8.3",
"errno",
"futures",
@ -1817,6 +1845,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.10.0"
@ -1863,6 +1897,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.12.1"

View File

@ -6,11 +6,15 @@ edition = "2018"
[dependencies]
arrow = "4"
clap = "2"
deltalake = { git = "https://github.com/rtyler/delta-rs", branch = "mixing-in-deltalake_ext", features = ["s3"] }
log = "0.4"
parquet = "4"
pretty_env_logger = "0.4"
regex = "1"
rusoto_core = "*"
rusoto_credential = "*"
rusoto_s3 = "*"
serde = { version = "1", features = ["rc", "derive"]}
serde_json = "1"
serde_yaml = "0.8"

View File

@ -1,13 +1,133 @@
= Delta S3 Loader
ifdef::env-github[]
:tip-caption: :bulb:
:note-caption: :information_source:
:important-caption: :heavy_exclamation_mark:
:caution-caption: :fire:
:warning-caption: :warning:
endif::[]
:toc: macro
= Delta S3 Loader
Delta S3 Loader is a project to quickly and cheaply bring JSON files added to
S3 buckets into Delta Lake. This can be highly useful for legacy or external
processes which rely on uploading JSON to an S3 bucket and cannot be properly
updated to write directly to link:https://delta.io[Delta Lake].
Delta S3 Loader can be built into a stnadalone binary or an
link:https://aws.amazon.com/lambda/[AWS Lambda].
toc::[]
== Modes
Delta S3 Loader can be built into a standalone binary or an
link:https://aws.amazon.com/lambda/[AWS Lambda]. While both modes are
functionally identical they have different configuration requirements as
mentioned below.
=== Standalone
A standalone instance of Delta S3 Loader requires:
* Destination Delta table path
* SQS queue ARN
=== Lambda
When deployed with AWS Lambda, the Lambda function should be configured with an
AWS SQS trigger. This causes AWS to manage the queue on behalf of Delta S3
Loader. Learn more in the
link:https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-lambda-function-trigger.html[AWS
Lambda trigger documentation].
== Design
This project is designed to work in a Lambda or packaged up into a container.
It relies on
link:https://docs.aws.amazon.com/AmazonS3/latest/userguide/NotificationHowTo.html[S3
Event Notifications] which are delivered into an
link:http://aws.amazon.com/sqs/[Amazon SQS] queue. The S3 Event Notifications
should be configured to funnel events to a single SQS queue per table. The
Delta S3 Loader will take **all** messages from a single queue and insert those
into a single table.
Additionally, for source buckets which have _multiple_ types of data in them you may use link:https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-filtering.html[filtering on event notifications] to specify different object prefixes, etc.
For example, in a bucket named `audit_logs` bucket that has data prefixed with:
* `databricks/workspaceId=123/*.json`
* `tableau/*.json`
* `admin_console/domain=github.com/*.json`
A deployment of Delta S3 Loader to _only_ process the `admin_console` events
into an Delta table would require the following event configuration:
.SQS Event Configuration
[source,xml]
----
<NotificationConfiguration>
<QueueConfiguration>
<Id>1</Id>
<Filter>
<S3Key>
<FilterRule>
<Name>prefix</Name>
<Value>admin_console/</Value>
</FilterRule>
<FilterRule>
<Name>suffix</Name>
<Value>json</Value>
</FilterRule>
</S3Key>
</Filter>
<Queue>arn:aws:sqs:us-west-2:444455556666:admin_console_audit_queue</Queue>
<Event>s3:ObjectCreated:Put</Event>
</QueueConfiguration>
</NotificationConfiguration>
----
[CAUTION]
====
Always use different source and destination S3 buckets to avoid infinite loops!
====
A standalone Delta S3 Loader invocation for the above queue might look something like:
[source,bash]
----
delta-s3-loader -t s3://warehouse/audit_logs_raw/databricks \ # <1>
-p domain \ # <2>
-q "arn:aws:sqs:us-west-2:444455556666:admin_console_audit_queue" # <3>
----
<1> Specify a destination Delta Lake table path in S3.
<2> Annotate the partition columns to help the loader partition data properly.
<3> Specify the input SQS queue by ARN
== Environment Variables
When running in an AWS Lambda, Delta S3 Loader should be configured solely with environment variables. In a standalone mode the daemon can be configured with command line options _or_ environment variables
|===
| Name | Required | Description
| `RUST_LOG`
| No
| Define the log level for the process: `error`, `warn`, `info`, `debug`.
|===
=== Authentication/Authorization
Delta S3 Loader assumes that the right AWS environment variables, such as
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` are defined in the environment.
Under the hood the Delta S3 Loader is not responsible for
authentication/authorization so please consult the
link:https://github.com/rusoto/rusoto/blob/master/AWS-CREDENTIALS.md[Rusoto AWS
credentials documentation] for more information.
== Related work

View File

@ -1,22 +0,0 @@
# This is an example configuration for the delta-s3-loader lambda
---
sources:
# This entry maps the existing S3 structure into a delta table named
# `logs.audit_logs` which will be date stamp partitioned
#
# └── simple
# └── date=2021-04-16
# └── auditlog-1234.json
# The name of the bucket should _just_ be a name of the bucket since this
# will come from an AWS Lambda event
- bucket: 'raw'
# Everything in the simple/ prefix will be considered part of this table
# the prefix may be a regular expression that is compatible with the Rust
# regex crate
prefix: '^simple'
# When specifying partitions, the source data is expected to be parttioned,
# as it is in this example
partitions:
- 'date'
# The table path is the destination for the written delta
tablepath: 's3://delta/audit_logs'

View File

@ -1,57 +0,0 @@
/**
* The config module is largely responsible for just reading a configuration yaml file in.
*/
use regex::Regex;
use serde::Deserialize;
use std::path::PathBuf;
/**
* Root configuration
*/
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub sources: Vec<Source>,
}
impl Config {
pub fn new(sources: Vec<Source>) -> Config {
Config { sources }
}
/**
* from_file will attempt to load a configuration from the given path
*
* The path should be to a properly formatted YAML file:
*/
pub fn from_file(path: &PathBuf) -> Result<Config, std::io::Error> {
use std::fs::File;
use std::io::{Error, ErrorKind};
let reader = File::open(path)?;
serde_yaml::from_reader(reader).map_err(|e| Error::new(ErrorKind::Other, e))
}
}
/**
* A source maps an existing S3 structure into a named Delta table
*/
#[derive(Clone, Debug, Deserialize)]
pub struct Source {
pub bucket: String,
#[serde(with = "serde_regex")]
pub prefix: Regex,
pub partitions: Vec<String>,
pub tablepath: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_file() -> Result<(), std::io::Error> {
let config = Config::from_file(&PathBuf::from("./config.yml"))?;
assert_eq!(config.sources.len(), 1);
Ok(())
}
}

View File

@ -1,14 +1,12 @@
/*
* The bulk of the application
*/
use clap::{App, Arg};
use log::*;
use std::path::PathBuf;
mod config;
mod writer;
#[cfg(lambda)]
mod lambda;
mod writer;
fn preboot() {
#[cfg(debug_assertions)]
@ -18,6 +16,38 @@ fn preboot() {
"Initializing delta-s3-loader v{}",
env!["CARGO_PKG_VERSION"]
);
let _matches = App::new("delta-s3-loader")
.version(env!["CARGO_PKG_VERSION"])
.author("rtyler@brokenco.de")
.about("Watch S3 buckets for new data to load into a Delta Table")
.arg(
Arg::with_name("table")
.required(true)
.short("t")
.long("table")
.env("TABLE_PATH")
.value_name("TABLE_PATH")
.help("Sets the destination Delta Table for the ingested data")
.takes_value(true),
)
.arg(
Arg::with_name("partitions")
.short("p")
.long("partitions")
.env("CSV_PARTITIONS")
.value_name("CSV_PARTITIONS")
.help("Ordered list of partition from the source data, e.g. year,month")
.takes_value(true),
)
.arg(
Arg::with_name("SQS queue ARN")
.short("q")
.long("queue")
.value_name("QUEUE_ARN")
.help("ARN of the SQS queue to consume, *required* in standalone mode")
.takes_value(true),
)
.get_matches();
}
#[cfg(not(lambda))]
@ -25,7 +55,6 @@ fn main() {
preboot();
}
#[cfg(lambda)]
#[tokio::main]
async fn main() -> Result<(), lambda_runtime::Error> {