diff --git a/Cargo.lock b/Cargo.lock index c2cdd04..191cc48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "aws_lambda_events" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "663ef110325f68726f4bc51d12c610b9700f70e8bb8a55279cdeca01b12b5491" +dependencies = [ + "base64 0.12.3", + "bytes 0.5.6", + "chrono", + "http", + "http-serde", + "serde", + "serde_derive", + "serde_json", +] + [[package]] name = "base-x" version = "0.2.8" @@ -190,6 +206,15 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" +dependencies = [ + "serde", +] + [[package]] name = "bytes" version = "1.0.1" @@ -336,11 +361,15 @@ dependencies = [ name = "delta-s3-loader" version = "0.1.0" dependencies = [ + "arrow", + "aws_lambda_events", "deltalake", "lambda_runtime", "log", + "parquet", "pretty_env_logger", "serde", + "serde_json", "tokio", ] @@ -352,7 +381,7 @@ dependencies = [ "anyhow", "arrow", "async-trait", - "bytes", + "bytes 1.0.1", "cfg-if", "chrono", "clap", @@ -705,7 +734,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" dependencies = [ - "bytes", + "bytes 1.0.1", "fnv", "itoa", ] @@ -716,11 +745,21 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfb77c123b4e2f72a2069aeae0b4b4949cc7e966df277813fc16347e7549737" dependencies = [ - "bytes", + "bytes 1.0.1", "http", "pin-project-lite", ] +[[package]] +name = "http-serde" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaa9564ce1decf49edcbd2b8f4f732843b4df64eabb8dcfcf0085ff34dbc76a2" +dependencies = [ + "http", + "serde", +] + [[package]] name = "httparse" version = "1.3.6" @@ -754,7 +793,7 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf09f61b52cfcf4c00de50df88ae423d6c02354e385a86341133b5338630ad1" dependencies = [ - "bytes", + "bytes 1.0.1", "futures-channel", "futures-core", "futures-util", @@ -777,7 +816,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes", + "bytes 1.0.1", "hyper", "native-tls", "tokio", @@ -833,7 +872,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9b08856997d11ca8122121b26b17a27ef1dce689d71ccd754e051f2417aebdd" dependencies = [ "async-stream", - "bytes", + "bytes 1.0.1", "futures", "http", "hyper", @@ -1414,7 +1453,7 @@ checksum = "02aff20978970d47630f08de5f0d04799497818d16cafee5aec90c4b4d0806cf" dependencies = [ "async-trait", "base64 0.13.0", - "bytes", + "bytes 1.0.1", "crc32fast", "futures", "http", @@ -1456,7 +1495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abc3f56f14ccf91f880b9a9c2d0556d8523e8c155041c54db155b384a1dd1119" dependencies = [ "async-trait", - "bytes", + "bytes 1.0.1", "futures", "rusoto_core", "xml-rs", @@ -1469,7 +1508,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5486e6b1673ab3e0ba1ded284fb444845fe1b7f41d13989a54dd60f62a7b2baa" dependencies = [ "base64 0.13.0", - "bytes", + "bytes 1.0.1", "futures", "hex", "hmac", @@ -1494,7 +1533,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f93005e0c3b9e40a424b50ca71886d2445cc19bb6cdac3ac84c2daff482eb59" dependencies = [ "async-trait", - "bytes", + "bytes 1.0.1", "chrono", "futures", "rusoto_core", @@ -1920,7 +1959,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" dependencies = [ "autocfg", - "bytes", + "bytes 1.0.1", "libc", "memchr", "mio", diff --git a/Cargo.toml b/Cargo.toml index b173e1b..06b9016 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,13 @@ authors = ["R. Tyler Croy "] edition = "2018" [dependencies] +arrow = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" } +aws_lambda_events = "0.4" deltalake = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["s3"] } lambda_runtime = "0.3" log = "0.4" +parquet = { git = "https://github.com/apache/arrow.git", rev = "05b36567bd8216bec71b796fe3bb6811c71abbec" } pretty_env_logger = "0.4" tokio = { version = "1.0", features = ["macros"]} serde = { version = "1", features = ["rc", "derive"]} +serde_json = "1" diff --git a/src/main.rs b/src/main.rs index 4c5ddc9..2f7f478 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,49 +2,97 @@ * The bulk of the application */ -use deltalake::*; +use aws_lambda_events::event::s3::S3Event; use lambda_runtime::{handler_fn, Context, Error}; use log::*; -use serde::{Deserialize, Serialize}; -/// This is also a made-up example. Requests come into the runtime as unicode -/// strings in json format, which can map to any structure that implements `serde::Deserialize` -/// The runtime pays no attention to the contents of the request payload. -#[derive(Deserialize)] -struct Request { - command: String, -} - -/// This is a made-up example of what a response structure may look like. -/// There is no restriction on what it can be. The runtime requires responses -/// to be serialized into json. The runtime pays no attention -/// to the contents of the response payload. -#[derive(Serialize)] -struct Response { - req_id: String, - msg: String, -} +mod writer; #[tokio::main] async fn main() -> Result<(), Error> { pretty_env_logger::init(); info!("Initializing delta-s3-loader v{}", env!["CARGO_PKG_VERSION"]); - let func = handler_fn(my_handler); + let func = handler_fn(s3_event_handler); lambda_runtime::run(func).await?; Ok(()) } -async fn my_handler(event: Request, ctx: Context) -> Result { - // extract some useful info from the request - let command = event.command; +/** + * The s3_event_handler will be invoked with an S3Event which will need to be iterated upon and + * each S3EventRecord processed: + * + */ +async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result { - // prepare the response - let resp = Response { - req_id: ctx.request_id, - msg: format!("Command {} executed.", command), - }; + for record in event.records { + if let Some(ref name) = record.event_name { + trace!("Processing an event named: {}", name); + /* + * The only events that delta-s3-loader is interested in are new PUTs which + * indicate a new file must be processed. + */ + if name == "ObjectCreated:Put" { + trace!("Processing record: {:?}", record); + } + } + else { + warn!("Received a record without a name: {:?}", record); + } + } - // return `Response` (it will be serialized to JSON automatically by the runtime) - Ok(resp) + // Since this was triggered asynchronously, no need for a real response + Ok("{}".to_string()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_s3_event_handler() { + let buf = r#" +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-2", + "eventTime": "2019-09-03T19:37:27.192Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "AWS:AIDAINPONIXQXHT3IKHL2" + }, + "requestParameters": { + "sourceIPAddress": "205.255.255.255" + }, + "responseElements": { + "x-amz-request-id": "D82B88E5F771F645", + "x-amz-id-2": "vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "828aa6fc-f7b5-4305-8584-487c791949c1", + "bucket": { + "name": "lambda-artifacts-deafc19498e3f2df", + "ownerIdentity": { + "principalId": "A3I5XTEXAMAI3E" + }, + "arn": "arn:aws:s3:::lambda-artifacts-deafc19498e3f2df" + }, + "object": { + "key": "b21b84d653bb07b05b1e6b33684dc11b", + "size": 1305107, + "eTag": "b21b84d653bb07b05b1e6b33684dc11b", + "sequencer": "0C0F6F405D6ED209E1" + } + } + } + ] +}"#; + let event: S3Event = serde_json::from_str(&buf).expect("Failed to deserialize event"); + let result = s3_event_handler(event, Context::default()).await.expect("Failed to run event handler"); + assert_eq!("{}", result); + } +} + diff --git a/src/writer.rs b/src/writer.rs new file mode 100644 index 0000000..396ca22 --- /dev/null +++ b/src/writer.rs @@ -0,0 +1,56 @@ +/** + * The writer module contains the important code for actually writing to a Delta Lake table + * + */ + +use arrow::record_batch::RecordBatch; +use serde_json::Value; + +enum WriterError { + Generic, +} + +fn json_to_batch(json: Vec) { //-> Result { + use arrow::json::reader::*; + + // infer_json_schema_from_iterator is weird in that it expects each value to be wrapped in a + // Result + let schema = infer_json_schema_from_iterator( + json.into_iter().map(|v| Ok(v))); + + println!("schema: {:#?}", schema); + + //Err(WriterError::Generic) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[ignore] + #[test] + fn demo() { + let delta = deltalake::get_backend_for_uri("./data"); + } + + #[test] + fn json_to_arrow_success() { + let value: Vec = serde_json::from_str(r#" + [ + { + "action" : "commit", + "actor" : "rtyler" + }, + { + "action" : "update", + "actor" : "rtyler" + } + ] + "#).expect("Failed to create JSON"); + + + let result = json_to_batch(value); + assert!(false); + //assert!(result.is_ok()); + } +}