diff --git a/src/main.rs b/src/main.rs index 808b9c2..789f4b0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -60,10 +60,7 @@ async fn s3_event_handler(event: S3Event, _ctx: Context) -> Result( - record: &'a S3EventRecord, - conf: &'a config::Config, -) -> Option<&'a config::Source> { +fn match_record(record: &S3EventRecord, conf: &config::Config) -> Option { if let Some(bucket) = &record.s3.bucket.name { for source in &conf.sources { if bucket != &source.bucket { @@ -80,7 +77,7 @@ fn match_record<'a>( /* If there are no partitions expected, then we've already matched */ if expected_partitions == 0 { - return Some(source); + return Some(MatchedFile::new(source, key, vec![])); } /* @@ -88,20 +85,24 @@ fn match_record<'a>( * that the data is partitioned according to the configuration */ let partitions = Partition::from_path_str(key); - info!("parts: {:?}", partitions); + debug!( + "Partitions identified from record: {} -- {:?}", + key, partitions + ); // No match if the discovered partitions don't match the count - if partitions.len() != source.partitions.len() { + if partitions.len() != expected_partitions { continue; } - for index in 0..source.partitions.len() { + for index in 0..expected_partitions { // The partition at the index doesn't match what we're looking for if partitions[index].name != source.partitions[index] { return None; } } - return Some(source); + + return Some(MatchedFile::new(source, key, partitions)); } } } @@ -109,6 +110,34 @@ fn match_record<'a>( } /** + * A MatchedFile will contain all the necessary information for reading the object and allowing a + * Delta write to occur + */ +struct MatchedFile { + /// The source bucket containing the file to read + bucket: String, + /// The source object to read, expected to be JSON + object: String, + /// Partitions to include in the Delta write + partitions: Vec, + /// The output path for the Delta writer to use + output_path: String, +} + +impl MatchedFile { + fn new(source: &config::Source, key: &str, partitions: Vec) -> Self { + MatchedFile { + bucket: source.bucket.clone(), + object: key.to_string(), + partitions: partitions, + output_path: source.tablepath.clone(), + } + } +} + +/** + * A Partition is a simple struct to carry values from paths like: + * just/a/path/year=2021/mydata.json */ #[derive(Clone, Debug)] struct Partition {