Compare commits
3 Commits
c8df042acd
...
6d1bc34b86
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | 6d1bc34b86 | |
R Tyler Croy | 84eba62314 | |
R Tyler Croy | a39e49697a |
|
@ -7,12 +7,12 @@ members = [
|
|||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.10.3"
|
||||
version = "0.11.0"
|
||||
edition = "2021"
|
||||
keywords = ["deltalake", "parquet", "lambda", "delta"]
|
||||
keywords = ["deltalake", "parquet", "lambda", "delta", "sqs"]
|
||||
homepage = "https://github.com/buoyant-data/oxbow"
|
||||
repository = "https://github.com/buoyant-data/oxbow"
|
||||
description = "Tooling for converting directories of Apache Parquet files into Delta Lake tables"
|
||||
description = "Toolbox for converting or generating Delta Lake tables with AWS Lambda and more"
|
||||
license-file = "LICENSE.txt"
|
||||
|
||||
[workspace.dependencies]
|
||||
|
|
|
@ -6,6 +6,7 @@ repository.workspace = true
|
|||
homepage.workspace = true
|
||||
|
||||
[dependencies]
|
||||
chrono = { workpace = true }
|
||||
deltalake = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
* The lib module contains the business logic of oxbow, regardless of the interface implementation
|
||||
*/
|
||||
///
|
||||
/// The lib module contains the business logic of oxbow, regardless of the interface implementation
|
||||
///
|
||||
use deltalake::arrow::datatypes::Schema as ArrowSchema;
|
||||
use deltalake::parquet::arrow::async_reader::{
|
||||
ParquetObjectReader, ParquetRecordBatchStreamBuilder,
|
||||
|
@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
|
|||
use std::sync::Arc;
|
||||
|
||||
pub mod lock;
|
||||
pub mod write;
|
||||
|
||||
/**
|
||||
* convert is the main function to be called by the CLI or other "one shot" executors which just
|
||||
|
@ -333,7 +334,7 @@ pub fn remove_actions_for(files: &[ObjectMeta]) -> Vec<Action> {
|
|||
* This can be useful to find the smallest possible parquet file to load from the set in order to
|
||||
* discern schema information
|
||||
*/
|
||||
fn find_smallest_file(files: &Vec<ObjectMeta>) -> Option<&ObjectMeta> {
|
||||
fn find_smallest_file(files: &[ObjectMeta]) -> Option<&ObjectMeta> {
|
||||
if files.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
use chrono::prelude::*;
|
||||
use deltalake::arrow::array::RecordBatch;
|
||||
use deltalake::arrow::datatypes::Schema as ArrowSchema;
|
||||
use deltalake::arrow::json::reader::ReaderBuilder;
|
||||
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
|
||||
use deltalake::{DeltaResult, DeltaTable};
|
||||
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use tracing::log::*;
|
||||
|
||||
/// Function responsible for appending values to an opened [DeltaTable]
|
||||
pub async fn append_values(mut table: DeltaTable, jsonl: &str) -> DeltaResult<DeltaTable> {
|
||||
let cursor = Cursor::new(jsonl);
|
||||
let schema = table.get_schema()?;
|
||||
let schema = ArrowSchema::try_from(schema)?;
|
||||
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
|
||||
|
||||
let mut writer = RecordBatchWriter::for_table(&table)?;
|
||||
|
||||
while let Some(Ok(batch)) = reader.next() {
|
||||
let batch = augment_with_ds(&batch);
|
||||
debug!("Augmented: {batch:?}");
|
||||
writer.write(batch).await?;
|
||||
}
|
||||
|
||||
let version = writer.flush_and_commit(&mut table).await?;
|
||||
info!("Successfully flushed v{version} to Delta table");
|
||||
if version % 10 == 0 {
|
||||
// Reload the table to make sure we have the latest version to checkpoint
|
||||
let _ = table.load().await;
|
||||
if table.version() == version {
|
||||
match deltalake::checkpoints::create_checkpoint(&table).await {
|
||||
Ok(_) => info!("Successfully created checkpoint"),
|
||||
Err(e) => {
|
||||
error!("Failed to create checkpoint for {table:?}: {e:?}")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"The table was reloaded to create a checkpoint but a new version already exists!"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
|
||||
///
|
||||
/// Augment the given [RecordBatch] with another column that represents `ds`, treated
|
||||
/// as the date-stampe, e.g. `2024-01-01`.
|
||||
///
|
||||
fn augment_with_ds(batch: &RecordBatch) -> RecordBatch {
|
||||
use deltalake::arrow::array::{Array, StringArray};
|
||||
|
||||
// If the schema doesn't have a `ds` then don't try to add one
|
||||
if batch.column_by_name("ds").is_none() {
|
||||
info!("The schema of the destination table doesn't have `ds` so not adding the value");
|
||||
return batch.clone();
|
||||
}
|
||||
|
||||
let mut ds = vec![];
|
||||
let now = Utc::now();
|
||||
let datestamp = now.format("%Y-%m-%d").to_string();
|
||||
|
||||
for _index in 0..batch.num_rows() {
|
||||
ds.push(datestamp.clone());
|
||||
}
|
||||
|
||||
let mut columns: Vec<Arc<dyn Array>> = vec![];
|
||||
|
||||
for field in &batch.schema().fields {
|
||||
if field.name() != "ds" {
|
||||
if let Some(column) = batch.column_by_name(field.name()) {
|
||||
columns.push(column.clone());
|
||||
}
|
||||
} else {
|
||||
columns.push(Arc::new(StringArray::from(ds.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::try_new(batch.schema(), columns).expect("Failed to transpose `ds` onto batch")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use deltalake::*;
|
||||
|
||||
async fn setup_test_table() -> DeltaResult<DeltaTable> {
|
||||
DeltaOps::try_from_uri("memory://")
|
||||
.await?
|
||||
.create()
|
||||
.with_table_name("test")
|
||||
.with_column(
|
||||
"id",
|
||||
SchemaDataType::primitive("integer".into()),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.with_column("ds", SchemaDataType::primitive("string".into()), true, None)
|
||||
.with_column(
|
||||
"name",
|
||||
SchemaDataType::primitive("string".into()),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_values() -> DeltaResult<()> {
|
||||
let table = setup_test_table().await?;
|
||||
|
||||
let jsonl = r#"
|
||||
{"id" : 0, "name" : "Ben"}
|
||||
{"id" : 1, "name" : "Chris"}
|
||||
"#;
|
||||
let table = append_values(table, jsonl)
|
||||
.await
|
||||
.expect("Failed to do nothing");
|
||||
|
||||
assert_eq!(table.version(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_augment_with_ds() -> DeltaResult<()> {
|
||||
let table = setup_test_table().await?;
|
||||
let jsonl = r#"
|
||||
{"id" : 0, "name" : "Ben"}
|
||||
{"id" : 1, "name" : "Chris"}
|
||||
"#;
|
||||
|
||||
let cursor = Cursor::new(jsonl);
|
||||
let schema = table.get_schema()?;
|
||||
let schema = ArrowSchema::try_from(schema)?;
|
||||
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
|
||||
|
||||
while let Some(Ok(batch)) = reader.next() {
|
||||
let batch = augment_with_ds(&batch);
|
||||
println!("{:?}", batch);
|
||||
assert_ne!(None, batch.column_by_name("ds"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@ oxbow = { path = "../../crates/oxbow" }
|
|||
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
|
||||
|
||||
dynamodb_lock = "0.6.1"
|
||||
lambda_runtime = { version = "0.8" }
|
||||
lambda_runtime = { version = "0.11" }
|
||||
rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]}
|
||||
rusoto_credential = { version = "0.47"}
|
||||
rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"] }
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
/target
|
|
@ -0,0 +1,21 @@
|
|||
[package]
|
||||
name = "sqs-ingest"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
repository.workspace = true
|
||||
homepage.workspace = true
|
||||
|
||||
[dependencies]
|
||||
lambda_runtime = { version = "0.11" }
|
||||
oxbow = { path = "../../crates/oxbow" }
|
||||
oxbow-lambda-shared = { path = "../../crates/oxbow-lambda-shared" }
|
||||
|
||||
anyhow = { workspace = true }
|
||||
aws_lambda_events = { workspace = true, default-features = false, features = ["sqs"] }
|
||||
chrono = { workspace = true }
|
||||
deltalake = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { workspace = true }
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
ifdef::env-github[]
|
||||
:tip-caption: :bulb:
|
||||
:note-caption: :information_source:
|
||||
:important-caption: :heavy_exclamation_mark:
|
||||
:caution-caption: :fire:
|
||||
:warning-caption: :warning:
|
||||
endif::[]
|
||||
:toc: macro
|
||||
|
||||
= SQS Ingest
|
||||
|
||||
The `sqs-ingest` is the AWS SQS equivalent of
|
||||
link:https://github.com/delta-io/kafka-delta-ingest[kafka-delta-ingest] insofar
|
||||
that it allows the ingestion of JSON data from SQS and translatest each record
|
||||
into a row appended onto a link:https://delta.io[Delta Lake] table.
|
||||
|
||||
toc::[]
|
||||
|
||||
|
||||
for simplicity's sake this Lambda is intended to only work with a single Delta
|
||||
table. For ingestion to multiple Delta tables, deploy different instances of
|
||||
this Lambda triggered by different SQS queues.
|
||||
|
||||
image::diagram.png[Flow]
|
||||
|
||||
== Environment Variables
|
||||
|
||||
|===
|
||||
|
||||
| Name | Default Value | Notes
|
||||
|
||||
| `RUST_LOG`
|
||||
| `error`
|
||||
| Set the log level, e.g. `info`, `warn`, `error`. Can be scoped to specific modules, i.e. `oxbow=debug`
|
||||
|
||||
| `DELTA_TABLE_URI`
|
||||
|
|
||||
| Set to the `s3://` URL of the Delta table which should be appended
|
||||
|
||||
| `AWS_S3_LOCKING_PROVIDER`
|
||||
|
|
||||
| Set to `dynamodb` to enable safe concurrent writes to the table
|
||||
|
||||
| `DYNAMO_LOCK_TABLE_NAME`
|
||||
|
|
||||
| Set to the DynamoDB table used for locking, required for safe concurrent writes.
|
||||
|
||||
|===
|
|
@ -0,0 +1,18 @@
|
|||
digraph {
|
||||
label="SQS Ingest";
|
||||
labelloc=t;
|
||||
node[shape=rect];
|
||||
|
||||
app[label="Application", style=dashed];
|
||||
|
||||
{
|
||||
rank=same;
|
||||
rankdir=LR;
|
||||
sqs[shape=cds, label="SQS queue"];
|
||||
lambda[label="sqs-ingest Lambda"];
|
||||
delta[shape=cylinder, label="Delta table"];
|
||||
}
|
||||
app -> sqs -> lambda -> delta;
|
||||
}
|
||||
|
||||
// vim: ft=dot sw=2 ts=2 et
|
Binary file not shown.
After Width: | Height: | Size: 11 KiB |
|
@ -0,0 +1,96 @@
|
|||
///
|
||||
/// The sqs-ingest function is for ingesting simple JSON payloads from SQS
|
||||
/// and appending them to the configured Delta table
|
||||
///
|
||||
use aws_lambda_events::event::sqs::{SqsEvent, SqsMessage};
|
||||
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
|
||||
use tracing::log::*;
|
||||
|
||||
use oxbow::write::*;
|
||||
|
||||
use std::env;
|
||||
|
||||
/// This is the primary invocation point for the lambda and should do the heavy lifting
|
||||
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
|
||||
let table_uri = std::env::var("DELTA_TABLE_URI").expect("Failed to get `DELTA_TABLE_URI`");
|
||||
debug!("payload received: {:?}", event.payload.records);
|
||||
|
||||
let jsonl = extract_json_from_records(&event.payload.records);
|
||||
debug!("jsonl generated: {jsonl}");
|
||||
|
||||
if !jsonl.is_empty() {
|
||||
let table = oxbow::lock::open_table(&table_uri).await?;
|
||||
match append_values(table, &jsonl).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to append the values to configured Delta table: {e:?}");
|
||||
return Err(Box::new(e));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!("An empty payload was extracted which doesn't seem right!");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Error> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
// disable printing the name of the module in every log line.
|
||||
.with_target(false)
|
||||
// disabling time is handy because CloudWatch will add the ingestion time.
|
||||
.without_time()
|
||||
.init();
|
||||
|
||||
let _ =
|
||||
env::var("DELTA_TABLE_URI").expect("The `DELTA_TABLE_URI` must be set in the environment");
|
||||
match env::var("DYNAMO_LOCK_TABLE_NAME") {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
warn!("sqs-ingest SHOULD have `DYNAMO_LOCK_TABLE_NAME` set to a valid name, and should have AWS_S3_LOCKING_PROVIDER=dynamodb set so that concurrent writes can be performed safely.");
|
||||
}
|
||||
}
|
||||
|
||||
info!("Starting sqs-ingest");
|
||||
|
||||
run(service_fn(function_handler)).await
|
||||
}
|
||||
|
||||
/// Convert the `body` payloads from [SqsMessage] entities into JSONL
|
||||
/// which can be passed into the [oxbow::write::append_values] function
|
||||
fn extract_json_from_records(records: &[SqsMessage]) -> String {
|
||||
records
|
||||
.iter()
|
||||
.filter(|m| m.body.is_some())
|
||||
.map(|m| m.body.as_ref().unwrap().clone())
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_extract_data() {
|
||||
let buf = r#"{
|
||||
"body" : "{\"key\" : \"value\"}"
|
||||
}"#;
|
||||
let message: SqsMessage = serde_json::from_str(&buf).expect("Failed to deserialize");
|
||||
let messages = vec![
|
||||
message.clone(),
|
||||
message.clone(),
|
||||
message.clone(),
|
||||
SqsMessage::default(),
|
||||
];
|
||||
|
||||
let jsonl = extract_json_from_records(&messages);
|
||||
|
||||
let expected = r#"{"key" : "value"}
|
||||
{"key" : "value"}
|
||||
{"key" : "value"}"#;
|
||||
assert_eq!(expected, jsonl);
|
||||
}
|
||||
}
|
|
@ -35,5 +35,12 @@ toc::[]
|
|||
| _null_
|
||||
| **Required** the S3 location of the Delta table which should be appended to.
|
||||
|
||||
| `AWS_S3_LOCKING_PROVIDER`
|
||||
|
|
||||
| Set to `dynamodb` to enable safe concurrent writes to the table
|
||||
|
||||
| `DYNAMO_LOCK_TABLE_NAME`
|
||||
|
|
||||
| Set to the DynamoDB table used for locking, required for safe concurrent writes.
|
||||
|
||||
|===
|
||||
|
|
|
@ -1,54 +1,10 @@
|
|||
///
|
||||
/// The webhook lambda can receive JSONL formatted events and append them to a pre-configured Delta
|
||||
/// table
|
||||
use chrono::prelude::*;
|
||||
use deltalake::arrow::array::RecordBatch;
|
||||
use deltalake::arrow::datatypes::Schema as ArrowSchema;
|
||||
use deltalake::arrow::json::reader::ReaderBuilder;
|
||||
use deltalake::writer::{record_batch::RecordBatchWriter, DeltaWriter};
|
||||
use deltalake::DeltaTable;
|
||||
use lambda_http::{run, service_fn, tracing, Body, Error, Request, Response};
|
||||
use tracing::log::*;
|
||||
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Function responsible for opening the table and actually appending values to it
|
||||
async fn append_values(mut table: DeltaTable, jsonl: &str) -> Result<DeltaTable, Error> {
|
||||
let cursor = Cursor::new(jsonl);
|
||||
let schema = table.get_schema()?;
|
||||
let schema = ArrowSchema::try_from(schema)?;
|
||||
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
|
||||
|
||||
let mut writer = RecordBatchWriter::for_table(&table)?;
|
||||
|
||||
while let Some(Ok(batch)) = reader.next() {
|
||||
let batch = augment_with_ds(&batch);
|
||||
debug!("Augmented: {batch:?}");
|
||||
writer.write(batch).await?;
|
||||
}
|
||||
|
||||
let version = writer.flush_and_commit(&mut table).await?;
|
||||
info!("Successfully flushed v{version} to Delta table");
|
||||
if version % 10 == 0 {
|
||||
// Reload the table to make sure we have the latest version to checkpoint
|
||||
let _ = table.load().await;
|
||||
if table.version() == version {
|
||||
match deltalake::checkpoints::create_checkpoint(&table).await {
|
||||
Ok(_) => info!("Successfully created checkpoint"),
|
||||
Err(e) => {
|
||||
error!("Failed to create checkpoint for {table:?}: {e:?}")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
error!(
|
||||
"The table was reloaded to create a checkpoint but a new version already exists!"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(table)
|
||||
}
|
||||
use oxbow::write::*;
|
||||
|
||||
/// Main function handler which does the basics around HTTP management for the Lambda
|
||||
async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
|
||||
|
@ -76,7 +32,7 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
|
|||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to append the values to configured Delta table: {e:?}");
|
||||
return Err(e);
|
||||
return Err(Box::new(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -112,106 +68,15 @@ async fn main() -> Result<(), Error> {
|
|||
.init();
|
||||
let _ = std::env::var("DELTA_TABLE_URI")
|
||||
.expect("THe `DELTA_TABLE_URI` environment must be set in the environment");
|
||||
|
||||
info!("Starting webhook lambda handler");
|
||||
|
||||
match std::env::var("DYNAMO_LOCK_TABLE_NAME") {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
warn!("sqs-ingest SHOULD have `DYNAMO_LOCK_TABLE_NAME` set to a valid name, and should have AWS_S3_LOCKING_PROVIDER=dynamodb set so that concurrent writes can be performed safely.");
|
||||
}
|
||||
}
|
||||
|
||||
run(service_fn(function_handler)).await
|
||||
}
|
||||
|
||||
///
|
||||
/// Augment the given [RecordBatch] with another column that represents `ds`, treated
|
||||
/// as the date-stampe, e.g. `2024-01-01`.
|
||||
///
|
||||
fn augment_with_ds(batch: &RecordBatch) -> RecordBatch {
|
||||
use deltalake::arrow::array::{Array, StringArray};
|
||||
|
||||
// If the schema doesn't have a `ds` then don't try to add one
|
||||
if batch.column_by_name("ds").is_none() {
|
||||
info!("The schema of the destination table doesn't have `ds` so not adding the value");
|
||||
return batch.clone();
|
||||
}
|
||||
|
||||
let mut ds = vec![];
|
||||
let now = Utc::now();
|
||||
let datestamp = now.format("%Y-%m-%d").to_string();
|
||||
|
||||
for _index in 0..batch.num_rows() {
|
||||
ds.push(datestamp.clone());
|
||||
}
|
||||
|
||||
let mut columns: Vec<Arc<dyn Array>> = vec![];
|
||||
|
||||
for field in &batch.schema().fields {
|
||||
if field.name() != "ds" {
|
||||
if let Some(column) = batch.column_by_name(field.name()) {
|
||||
columns.push(column.clone());
|
||||
}
|
||||
} else {
|
||||
columns.push(Arc::new(StringArray::from(ds.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
RecordBatch::try_new(batch.schema(), columns).expect("Failed to transpose `ds` onto batch")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use deltalake::*;
|
||||
|
||||
async fn setup_test_table() -> DeltaResult<DeltaTable> {
|
||||
DeltaOps::try_from_uri("memory://")
|
||||
.await?
|
||||
.create()
|
||||
.with_table_name("test")
|
||||
.with_column(
|
||||
"id",
|
||||
SchemaDataType::primitive("integer".into()),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.with_column("ds", SchemaDataType::primitive("string".into()), true, None)
|
||||
.with_column(
|
||||
"name",
|
||||
SchemaDataType::primitive("string".into()),
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_append_values() -> DeltaResult<()> {
|
||||
let table = setup_test_table().await?;
|
||||
|
||||
let jsonl = r#"
|
||||
{"id" : 0, "name" : "Ben"}
|
||||
{"id" : 1, "name" : "Chris"}
|
||||
"#;
|
||||
let table = append_values(table, jsonl)
|
||||
.await
|
||||
.expect("Failed to do nothing");
|
||||
|
||||
assert_eq!(table.version(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_augment_with_ds() -> DeltaResult<()> {
|
||||
let table = setup_test_table().await?;
|
||||
let jsonl = r#"
|
||||
{"id" : 0, "name" : "Ben"}
|
||||
{"id" : 1, "name" : "Chris"}
|
||||
"#;
|
||||
|
||||
let cursor = Cursor::new(jsonl);
|
||||
let schema = table.get_schema()?;
|
||||
let schema = ArrowSchema::try_from(schema)?;
|
||||
let mut reader = ReaderBuilder::new(schema.into()).build(cursor).unwrap();
|
||||
|
||||
while let Some(Ok(batch)) = reader.next() {
|
||||
let batch = augment_with_ds(&batch);
|
||||
println!("{:?}", batch);
|
||||
assert_ne!(None, batch.column_by_name("ds"));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue