Checkpointing some work on the lambda side of things, processing inbound events

At this point I'm still spending a lot of time reading through some
kafka-delta-ingest and arrow code to figure out exactly how I want to do schema
inference and writes.
This commit is contained in:
R Tyler Croy 2021-04-14 22:20:51 -07:00
parent bf1fb592e3
commit a795083129
4 changed files with 188 additions and 41 deletions

61
Cargo.lock generated
View File

@ -112,6 +112,22 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "aws_lambda_events"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "663ef110325f68726f4bc51d12c610b9700f70e8bb8a55279cdeca01b12b5491"
dependencies = [
"base64 0.12.3",
"bytes 0.5.6",
"chrono",
"http",
"http-serde",
"serde",
"serde_derive",
"serde_json",
]
[[package]]
name = "base-x"
version = "0.2.8"
@ -190,6 +206,15 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
dependencies = [
"serde",
]
[[package]]
name = "bytes"
version = "1.0.1"
@ -336,11 +361,15 @@ dependencies = [
name = "delta-s3-loader"
version = "0.1.0"
dependencies = [
"arrow",
"aws_lambda_events",
"deltalake",
"lambda_runtime",
"log",
"parquet",
"pretty_env_logger",
"serde",
"serde_json",
"tokio",
]
@ -352,7 +381,7 @@ dependencies = [
"anyhow",
"arrow",
"async-trait",
"bytes",
"bytes 1.0.1",
"cfg-if",
"chrono",
"clap",
@ -705,7 +734,7 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes",
"bytes 1.0.1",
"fnv",
"itoa",
]
@ -716,11 +745,21 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfb77c123b4e2f72a2069aeae0b4b4949cc7e966df277813fc16347e7549737"
dependencies = [
"bytes",
"bytes 1.0.1",
"http",
"pin-project-lite",
]
[[package]]
name = "http-serde"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaa9564ce1decf49edcbd2b8f4f732843b4df64eabb8dcfcf0085ff34dbc76a2"
dependencies = [
"http",
"serde",
]
[[package]]
name = "httparse"
version = "1.3.6"
@ -754,7 +793,7 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8bf09f61b52cfcf4c00de50df88ae423d6c02354e385a86341133b5338630ad1"
dependencies = [
"bytes",
"bytes 1.0.1",
"futures-channel",
"futures-core",
"futures-util",
@ -777,7 +816,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"bytes 1.0.1",
"hyper",
"native-tls",
"tokio",
@ -833,7 +872,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9b08856997d11ca8122121b26b17a27ef1dce689d71ccd754e051f2417aebdd"
dependencies = [
"async-stream",
"bytes",
"bytes 1.0.1",
"futures",
"http",
"hyper",
@ -1414,7 +1453,7 @@ checksum = "02aff20978970d47630f08de5f0d04799497818d16cafee5aec90c4b4d0806cf"
dependencies = [
"async-trait",
"base64 0.13.0",
"bytes",
"bytes 1.0.1",
"crc32fast",
"futures",
"http",
@ -1456,7 +1495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abc3f56f14ccf91f880b9a9c2d0556d8523e8c155041c54db155b384a1dd1119"
dependencies = [
"async-trait",
"bytes",
"bytes 1.0.1",
"futures",
"rusoto_core",
"xml-rs",
@ -1469,7 +1508,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5486e6b1673ab3e0ba1ded284fb444845fe1b7f41d13989a54dd60f62a7b2baa"
dependencies = [
"base64 0.13.0",
"bytes",
"bytes 1.0.1",
"futures",
"hex",
"hmac",
@ -1494,7 +1533,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f93005e0c3b9e40a424b50ca71886d2445cc19bb6cdac3ac84c2daff482eb59"
dependencies = [
"async-trait",
"bytes",
"bytes 1.0.1",
"chrono",
"futures",
"rusoto_core",
@ -1920,7 +1959,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5"
dependencies = [
"autocfg",
"bytes",
"bytes 1.0.1",
"libc",
"memchr",
"mio",

View File

@ -5,9 +5,13 @@ authors = ["R. Tyler Croy <rtyler@brokenco.de>"]
edition = "2018"
[dependencies]
arrow = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" }
aws_lambda_events = "0.4"
deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3"] }
lambda_runtime = "0.3"
log = "0.4"
parquet = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" }
pretty_env_logger = "0.4"
tokio = { version = "1.0", features = ["macros"]}
serde = { version = "1", features = ["rc", "derive"]}
serde_json = "1"

View File

@ -2,49 +2,97 @@
* The bulk of the application
*/
use deltalake::*;
use aws_lambda_events::event::s3::S3Event;
use lambda_runtime::{handler_fn, Context, Error};
use log::*;
use serde::{Deserialize, Serialize};
/// This is also a made-up example. Requests come into the runtime as unicode
/// strings in json format, which can map to any structure that implements `serde::Deserialize`
/// The runtime pays no attention to the contents of the request payload.
#[derive(Deserialize)]
struct Request {
command: String,
}
/// This is a made-up example of what a response structure may look like.
/// There is no restriction on what it can be. The runtime requires responses
/// to be serialized into json. The runtime pays no attention
/// to the contents of the response payload.
#[derive(Serialize)]
struct Response {
req_id: String,
msg: String,
}
mod writer;
#[tokio::main]
async fn main() -> Result<(), Error> {
pretty_env_logger::init();
info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]);
let func = handler_fn(my_handler);
let func = handler_fn(s3_event_handler);
lambda_runtime::run(func).await?;
Ok(())
}
async fn my_handler(event: Request, ctx: Context) -> Result<Response, Error> {
// extract some useful info from the request
let command = event.command;
/**
* The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and
* each S3EventRecord processed:
* <https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html>
*/
async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result<String, Error> {
// prepare the response
let resp = Response {
req_id: ctx.request_id,
msg: format!("Command {} executed.", command),
};
for record in event.records {
if let Some(ref name) = record.event_name {
trace!("Processing an event named: {}", name);
/*
* The only events that delta-s3-loader is interested in are new PUTs which
* indicate a new file must be processed.
*/
if name == "ObjectCreated:Put" {
trace!("Processing record: {:?}", record);
}
}
else {
warn!("Received a record without a name: {:?}", record);
}
}
// return `Response` (it will be serialized to JSON automatically by the runtime)
Ok(resp)
// Since this was triggered asynchronously, no need for a real response
Ok("{}".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_s3_event_handler() {
let buf = r#"
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-east-2",
"eventTime": "2019-09-03T19:37:27.192Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AIDAINPONIXQXHT3IKHL2"
},
"requestParameters": {
"sourceIPAddress": "205.255.255.255"
},
"responseElements": {
"x-amz-request-id": "D82B88E5F771F645",
"x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo="
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1",
"bucket": {
"name": "lambda-artifacts-deafc19498e3f2df",
"ownerIdentity": {
"principalId": "A3I5XTEXAMAI3E"
},
"arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"
},
"object": {
"key": "b21b84d653bb07b05b1e6b33684dc11b",
"size": 1305107,
"eTag": "b21b84d653bb07b05b1e6b33684dc11b",
"sequencer": "0C0F6F405D6ED209E1"
}
}
}
]
}"#;
let event: S3Event = serde_json::from_str(&buf).expect("Failed to deserialize event");
let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler");
assert_eq!("{}", result);
}
}

56
src/writer.rs Normal file
View File

@ -0,0 +1,56 @@
/**
* The writer module contains the important code for actually writing to a Delta Lake table
*
*/
use arrow::record_batch::RecordBatch;
use serde_json::Value;
enum WriterError {
Generic,
}
fn json_to_batch(json: Vec<Value>) { //-> Result<RecordBatch, WriterError> {
use arrow::json::reader::*;
// infer_json_schema_from_iterator is weird in that it expects each value to be wrapped in a
// Result
let schema = infer_json_schema_from_iterator(
json.into_iter().map(|v| Ok(v)));
println!("schema: {:#?}", schema);
//Err(WriterError::Generic)
}
#[cfg(test)]
mod tests {
use super::*;
#[ignore]
#[test]
fn demo() {
let delta = deltalake::get_backend_for_uri("./data");
}
#[test]
fn json_to_arrow_success() {
let value: Vec<serde_json::Value> = serde_json::from_str(r#"
[
{
"action" : "commit",
"actor" : "rtyler"
},
{
"action" : "update",
"actor" : "rtyler"
}
]
"#).expect("Failed to create JSON");
let result = json_to_batch(value);
assert!(false);
//assert!(result.is_ok());
}
}