FIX #249 Rename camelCased fields to snake_case (#256)

This commit is contained in:
Serge Smertin 2021-05-24 23:17:03 +02:00 committed by GitHub
parent 0c3853ea13
commit 414e971d24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 126 additions and 112 deletions

5
.cargo/config Normal file
View File

@ -0,0 +1,5 @@
[target.x86_64-apple-darwin]
rustflags = [
"-C", "link-arg=-undefined",
"-C", "link-arg=dynamic_lookup",
]

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ tlaplus/*.toolbox/*/*.out
tlaplus/*.toolbox/*/*.tla
tlaplus/*.toolbox/*/MC.cfg
tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/
/.idea

View File

@ -1,3 +1,5 @@
# This file is used by cargo-watch
ruby/spec
ruby/lib
.idea

View File

@ -112,43 +112,45 @@ impl ColumnCountStat {
/// Statistics associated with Add actions contained in the Delta log.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
/// Number of records in the file associated with the log action.
pub numRecords: DeltaDataTypeLong,
pub num_records: DeltaDataTypeLong,
// start of per column stats
/// Contains a value smaller than all values present in the file for all columns.
pub minValues: HashMap<String, ColumnValueStat>,
pub min_values: HashMap<String, ColumnValueStat>,
/// Contains a value larger than all values present in the file for all columns.
pub maxValues: HashMap<String, ColumnValueStat>,
pub max_values: HashMap<String, ColumnValueStat>,
/// The number of null values for all columns.
pub nullCount: HashMap<String, ColumnCountStat>,
pub null_count: HashMap<String, ColumnCountStat>,
}
/// File stats parsed from raw parquet format.
#[derive(Debug, Default)]
pub struct StatsParsed {
/// Number of records in the file associated with the log action.
pub numRecords: DeltaDataTypeLong,
pub num_records: DeltaDataTypeLong,
// start of per column stats
/// Contains a value smaller than all values present in the file for all columns.
pub minValues: HashMap<String, parquet::record::Field>,
pub min_values: HashMap<String, parquet::record::Field>,
/// Contains a value larger than all values present in the file for all columns.
pub maxValues: HashMap<String, parquet::record::Field>,
pub max_values: HashMap<String, parquet::record::Field>,
/// The number of null values for all columns.
pub nullCount: HashMap<String, DeltaDataTypeLong>,
pub null_count: HashMap<String, DeltaDataTypeLong>,
}
/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Add {
/// A relative path, from the root of the table, to a file that should be added to the table
pub path: String,
/// The size of this file in bytes
pub size: DeltaDataTypeLong,
/// A map from partition column to value for this file
pub partitionValues: HashMap<String, String>,
pub partition_values: HashMap<String, String>,
/// Partition values stored in raw parquet struct format. In this struct, the column names
/// correspond to the partition columns and the values are stored in their corresponding data
/// type. This is a required field when the table is partitioned and the table property
@ -157,15 +159,15 @@ pub struct Add {
///
/// This field is only available in add action records read from checkpoints
#[serde(skip_serializing, skip_deserializing)]
pub partitionValues_parsed: Option<parquet::record::Row>,
pub partition_values_parsed: Option<parquet::record::Row>,
/// The time this file was created, as milliseconds since the epoch
pub modificationTime: DeltaDataTypeTimestamp,
pub modification_time: DeltaDataTypeTimestamp,
/// When false the file must already be present in the table or the records in the added file
/// must be contained in one or more remove actions in the same version
///
/// streaming queries that are tailing the transaction log can use this flag to skip actions
/// that would not affect the final results.
pub dataChange: bool,
pub data_change: bool,
/// Contains statistics (e.g., count, min/max values for columns) about the data in this file
pub stats: Option<String>,
/// Contains statistics (e.g., count, min/max values for columns) about the data in this file in
@ -199,12 +201,12 @@ impl Add {
.map_err(|_| gen_action_type_error("add", "size", "long"))?;
}
"modificationTime" => {
re.modificationTime = record
re.modification_time = record
.get_long(i)
.map_err(|_| gen_action_type_error("add", "modificationTime", "long"))?;
}
"dataChange" => {
re.dataChange = record
re.data_change = record
.get_bool(i)
.map_err(|_| gen_action_type_error("add", "dataChange", "bool"))?;
}
@ -212,7 +214,7 @@ impl Add {
let parquetMap = record
.get_map(i)
.map_err(|_| gen_action_type_error("add", "partitionValues", "map"))?;
populate_hashmap_from_parquet_map(&mut re.partitionValues, parquetMap)
populate_hashmap_from_parquet_map(&mut re.partition_values, parquetMap)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid partitionValues for add action: {}",
@ -221,7 +223,7 @@ impl Add {
})?;
}
"partitionValues_parsed" => {
re.partitionValues_parsed = Some(
re.partition_values_parsed = Some(
record
.get_group(i)
.map_err(|_| {
@ -292,7 +294,7 @@ impl Add {
match name.as_str() {
"numRecords" => match record.get_long(i) {
Ok(v) => {
stats.numRecords = v;
stats.num_records = v;
}
_ => {
log::error!("Expect type of stats_parsed field numRecords to be long, got: {}", record);
@ -301,7 +303,7 @@ impl Add {
"minValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
stats.minValues.insert(name.clone(), field.clone());
stats.min_values.insert(name.clone(), field.clone());
}
}
_ => {
@ -311,7 +313,7 @@ impl Add {
"maxValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
stats.maxValues.insert(name.clone(), field.clone());
stats.max_values.insert(name.clone(), field.clone());
}
}
_ => {
@ -323,7 +325,7 @@ impl Add {
for (i, (name, _)) in row.get_column_iter().enumerate() {
match row.get_long(i) {
Ok(v) => {
stats.nullCount.insert(name.clone(), v);
stats.null_count.insert(name.clone(), v);
}
_ => {
log::error!("Expect type of stats_parsed.nullRecords value to be struct, got: {}", row);
@ -363,6 +365,7 @@ pub struct Format {
/// Action that describes the metadata of the table.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct MetaData {
/// Unique identifier for this table
pub id: Guid,
@ -373,11 +376,11 @@ pub struct MetaData {
/// Specification of the encoding for the files stored in the table
pub format: Format,
/// Schema of the table
pub schemaString: String,
pub schema_string: String,
/// An array containing the names of columns by which the data should be partitioned
pub partitionColumns: Vec<String>,
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub createdTime: DeltaDataTypeTimestamp,
pub created_time: DeltaDataTypeTimestamp,
/// A map containing configuration options for the table
pub configuration: HashMap<String, String>,
}
@ -409,7 +412,7 @@ impl MetaData {
gen_action_type_error("metaData", "partitionColumns", "list")
})?;
for j in 0..columns_list.len() {
re.partitionColumns.push(
re.partition_columns.push(
columns_list
.get_string(j)
.map_err(|_| {
@ -424,13 +427,13 @@ impl MetaData {
}
}
"schemaString" => {
re.schemaString = record
re.schema_string = record
.get_string(i)
.map_err(|_| gen_action_type_error("metaData", "schemaString", "string"))?
.clone();
}
"createdTime" => {
re.createdTime = record
re.created_time = record
.get_long(i)
.map_err(|_| gen_action_type_error("metaData", "createdTime", "long"))?;
}
@ -491,25 +494,26 @@ impl MetaData {
/// Returns the table schema from the embedded schema string contained within the metadata
/// action.
pub fn get_schema(&self) -> Result<Schema, serde_json::error::Error> {
serde_json::from_str(&self.schemaString)
serde_json::from_str(&self.schema_string)
}
}
/// Represents a tombstone (deleted file) in the Delta log.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Remove {
/// The path of the file that is removed from the table.
pub path: String,
/// The timestamp when the remove was added to table state.
pub deletionTimestamp: DeltaDataTypeTimestamp,
pub deletion_timestamp: DeltaDataTypeTimestamp,
/// Whether data is changed by the remove. A table optimize will report this as false for
/// example, since it adds and removes files by combining many files into one.
pub dataChange: bool,
pub data_change: bool,
/// When true the fields partitionValues, size, and tags are present
pub extendedFileMetadata: Option<bool>,
pub extended_file_metadata: Option<bool>,
/// A map from partition column to value for this file.
pub partitionValues: Option<HashMap<String, String>>,
pub partition_values: Option<HashMap<String, String>>,
/// Size of this file in bytes
pub size: Option<DeltaDataTypeLong>,
/// Map containing metadata about this file
@ -531,17 +535,17 @@ impl Remove {
.clone();
}
"dataChange" => {
re.dataChange = record
re.data_change = record
.get_bool(i)
.map_err(|_| gen_action_type_error("remove", "dataChange", "bool"))?;
}
"extendedFileMetadata" => {
re.extendedFileMetadata = Some(record.get_bool(i).map_err(|_| {
re.extended_file_metadata = Some(record.get_bool(i).map_err(|_| {
gen_action_type_error("remove", "extendedFileMetadata", "bool")
})?);
}
"deletionTimestamp" => {
re.deletionTimestamp = record.get_long(i).map_err(|_| {
re.deletion_timestamp = record.get_long(i).map_err(|_| {
gen_action_type_error("remove", "deletionTimestamp", "long")
})?;
}
@ -558,9 +562,9 @@ impl Remove {
estr,
))
})?;
re.partitionValues = Some(partitionValues);
re.partition_values = Some(partitionValues);
}
_ => re.partitionValues = None,
_ => re.partition_values = None,
},
"tags" => match record.get_map(i) {
Ok(tags_map) => {
@ -601,13 +605,14 @@ impl Remove {
/// Action used by streaming systems to track progress using application-specific versions to
/// enable idempotency.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Txn {
/// A unique identifier for the application performing the transaction.
pub appId: String,
pub app_id: String,
/// An application-specific numeric identifier for this transaction.
pub version: DeltaDataTypeVersion,
/// The time when this transaction action was created in milliseconds since the Unix epoch.
pub lastUpdated: DeltaDataTypeTimestamp,
pub last_updated: DeltaDataTypeTimestamp,
}
impl Txn {
@ -619,7 +624,7 @@ impl Txn {
for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"appId" => {
re.appId = record
re.app_id = record
.get_string(i)
.map_err(|_| gen_action_type_error("txn", "appId", "string"))?
.clone();
@ -630,7 +635,7 @@ impl Txn {
.map_err(|_| gen_action_type_error("txn", "version", "long"))?;
}
"lastUpdated" => {
re.lastUpdated = record
re.last_updated = record
.get_long(i)
.map_err(|_| gen_action_type_error("txn", "lastUpdated", "long"))?;
}
@ -651,13 +656,14 @@ impl Txn {
/// Action used to increase the version of the Delta protocol required to read or write to the
/// table.
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Protocol {
/// Minimum version of the Delta read protocol a client must implement to correctly read the
/// table.
pub minReaderVersion: DeltaDataTypeInt,
pub min_reader_version: DeltaDataTypeInt,
/// Minimum version of the Delta write protocol a client must implement to correctly read the
/// table.
pub minWriterVersion: DeltaDataTypeInt,
pub min_writer_version: DeltaDataTypeInt,
}
impl Protocol {
@ -669,12 +675,12 @@ impl Protocol {
for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"minReaderVersion" => {
re.minReaderVersion = record.get_int(i).map_err(|_| {
re.min_reader_version = record.get_int(i).map_err(|_| {
gen_action_type_error("protocol", "minReaderVersion", "int")
})?;
}
"minWriterVersion" => {
re.minWriterVersion = record.get_int(i).map_err(|_| {
re.min_writer_version = record.get_int(i).map_err(|_| {
gen_action_type_error("protocol", "minWriterVersion", "int")
})?;
}
@ -833,7 +839,7 @@ mod tests {
let add_record = record.get_group(1).unwrap();
let add_action = Add::from_parquet_record(&add_record).unwrap();
assert_eq!(add_action.partitionValues.len(), 0);
assert_eq!(add_action.partition_values.len(), 0);
assert_eq!(add_action.stats, None);
}
@ -854,51 +860,51 @@ mod tests {
let stats = action.get_stats().unwrap().unwrap();
assert_eq!(stats.numRecords, 22);
assert_eq!(stats.num_records, 22);
assert_eq!(
stats.minValues["a"].as_value().unwrap(),
stats.min_values["a"].as_value().unwrap(),
&serde_json::json!(1)
);
assert_eq!(
stats.minValues["nested"].as_column().unwrap()["b"]
stats.min_values["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
&serde_json::json!(2)
);
assert_eq!(
stats.minValues["nested"].as_column().unwrap()["c"]
stats.min_values["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
&serde_json::json!("a")
);
assert_eq!(
stats.maxValues["a"].as_value().unwrap(),
stats.max_values["a"].as_value().unwrap(),
&serde_json::json!(10)
);
assert_eq!(
stats.maxValues["nested"].as_column().unwrap()["b"]
stats.max_values["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
&serde_json::json!(20)
);
assert_eq!(
stats.maxValues["nested"].as_column().unwrap()["c"]
stats.max_values["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
&serde_json::json!("z")
);
assert_eq!(stats.nullCount["a"].as_value().unwrap(), 1);
assert_eq!(stats.null_count["a"].as_value().unwrap(), 1);
assert_eq!(
stats.nullCount["nested"].as_column().unwrap()["b"]
stats.null_count["nested"].as_column().unwrap()["b"]
.as_value()
.unwrap(),
0
);
assert_eq!(
stats.nullCount["nested"].as_column().unwrap()["c"]
stats.null_count["nested"].as_column().unwrap()["c"]
.as_value()
.unwrap(),
1

View File

@ -735,7 +735,7 @@ impl DeltaTable {
Ok(self
.get_tombstones()
.iter()
.filter(|tombstone| tombstone.deletionTimestamp < delete_before_timestamp)
.filter(|tombstone| tombstone.deletion_timestamp < delete_before_timestamp)
.map(|tombstone| self.storage.join_path(&self.table_path, &tombstone.path))
.collect::<Vec<String>>())
}
@ -1118,11 +1118,11 @@ impl<'a> DeltaTransaction<'a> {
self.actions.push(Action::add(action::Add {
path,
partition_values,
modification_time,
size: bytes.len() as i64,
partitionValues: partition_values,
partitionValues_parsed: None,
modificationTime: modification_time,
dataChange: true,
partition_values_parsed: None,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,
@ -1309,8 +1309,8 @@ fn process_action(
state.tombstones.push(v.clone());
}
Action::protocol(v) => {
state.min_reader_version = v.minReaderVersion;
state.min_writer_version = v.minWriterVersion;
state.min_reader_version = v.min_reader_version;
state.min_writer_version = v.min_writer_version;
}
Action::metaData(v) => {
state.current_metadata = Some(DeltaTableMetaData {
@ -1319,15 +1319,15 @@ fn process_action(
description: v.description.clone(),
format: v.format.clone(),
schema: v.get_schema()?,
partition_columns: v.partitionColumns.clone(),
created_time: v.createdTime,
partition_columns: v.partition_columns.clone(),
created_time: v.created_time,
configuration: v.configuration.clone(),
});
}
Action::txn(v) => {
*state
.app_transaction_version
.entry(v.appId.clone())
.entry(v.app_id.clone())
.or_insert(v.version) = v.version;
}
Action::commitInfo(v) => {
@ -1402,9 +1402,9 @@ mod tests {
};
let txn_action = Action::txn(action::Txn {
appId: "abc".to_string(),
app_id: "abc".to_string(),
version: 2,
lastUpdated: 0,
last_updated: 0,
});
let _ = process_action(&mut state, &txn_action).unwrap();

View File

@ -36,8 +36,8 @@ mod azure {
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet"
.to_string(),
deletionTimestamp: 1587968596250,
dataChange: true,
deletion_timestamp: 1587968596250,
data_change: true,
..Default::default()
}
);

View File

@ -103,10 +103,10 @@ impl Worker {
tx.add_action(action::Action::add(action::Add {
path: format!("{}.parquet", name),
size: 396,
partitionValues: HashMap::new(),
partitionValues_parsed: None,
modificationTime: 1564524294000,
dataChange: true,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: 1564524294000,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,

View File

@ -28,8 +28,8 @@ async fn read_delta_2_0_table_without_version() {
tombstones[0],
deltalake::action::Remove {
path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(),
deletionTimestamp: 1564524298213,
dataChange: false,
deletion_timestamp: 1564524298213,
data_change: false,
..Default::default()
}
);
@ -115,10 +115,10 @@ async fn read_delta_8_0_table_without_version() {
tombstones[0],
deltalake::action::Remove {
path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(),
deletionTimestamp: 1615043776198,
dataChange: true,
extendedFileMetadata: Some(true),
partitionValues: Some(HashMap::new()),
deletion_timestamp: 1615043776198,
data_change: true,
extended_file_metadata: Some(true),
partition_values: Some(HashMap::new()),
size: Some(445),
..Default::default()
}

View File

@ -30,8 +30,8 @@ async fn read_simple_table() {
tombstones[0],
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(),
deletionTimestamp: 1587968596250,
dataChange: true,
deletion_timestamp: 1587968596250,
data_change: true,
..Default::default()
}
);

View File

@ -41,8 +41,8 @@ mod s3 {
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet"
.to_string(),
deletionTimestamp: 1587968596250,
dataChange: true,
deletion_timestamp: 1587968596250,
data_change: true,
..Default::default()
}
);
@ -77,8 +77,8 @@ mod s3 {
deltalake::action::Remove {
path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet"
.to_string(),
deletionTimestamp: 1587968596250,
dataChange: true,
deletion_timestamp: 1587968596250,
data_change: true,
..Default::default()
}
);

View File

@ -158,10 +158,10 @@ fn tx1_actions() -> Vec<action::Action> {
"part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet",
),
size: 396,
partitionValues: HashMap::new(),
partitionValues_parsed: None,
modificationTime: 1564524294000,
dataChange: true,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: 1564524294000,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,
@ -171,10 +171,10 @@ fn tx1_actions() -> Vec<action::Action> {
"part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet",
),
size: 400,
partitionValues: HashMap::new(),
partitionValues_parsed: None,
modificationTime: 1564524294000,
dataChange: true,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: 1564524294000,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,
@ -189,10 +189,10 @@ fn tx2_actions() -> Vec<action::Action> {
"part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet",
),
size: 396,
partitionValues: HashMap::new(),
partitionValues_parsed: None,
modificationTime: 1564524296000,
dataChange: true,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: 1564524296000,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,
@ -202,10 +202,10 @@ fn tx2_actions() -> Vec<action::Action> {
"part-00001-4327c977-2734-4477-9507-7ccf67924649-c000.snappy.parquet",
),
size: 400,
partitionValues: HashMap::new(),
partitionValues_parsed: None,
modificationTime: 1564524296000,
dataChange: true,
partition_values: HashMap::new(),
partition_values_parsed: None,
modification_time: 1564524296000,
data_change: true,
stats: None,
stats_parsed: None,
tags: None,

View File

@ -272,12 +272,12 @@ pub fn create_add(
record_batch: &RecordBatch,
) -> Result<Add, DeltaWriterError> {
let stats = Stats {
numRecords: record_batch.num_rows() as i64,
num_records: record_batch.num_rows() as i64,
// TODO: calculate additional stats
// look at https://github.com/apache/arrow/blob/master/rust/arrow/src/compute/kernels/aggregate.rs for pulling these stats
minValues: HashMap::new(),
maxValues: HashMap::new(),
nullCount: HashMap::new(),
min_values: HashMap::new(),
max_values: HashMap::new(),
null_count: HashMap::new(),
};
let stats_string = serde_json::to_string(&stats).unwrap();
@ -288,11 +288,11 @@ pub fn create_add(
path,
size,
partitionValues: partition_values.to_owned(),
partitionValues_parsed: None,
partition_values: partition_values.to_owned(),
partition_values_parsed: None,
modificationTime: modification_time,
dataChange: true,
modification_time: modification_time,
data_change: true,
// TODO: calculate additional stats
stats: Some(stats_string),
@ -310,9 +310,9 @@ pub fn create_remove(path: String) -> Remove {
Remove {
path,
deletionTimestamp: deletion_timestamp,
dataChange: true,
extendedFileMetadata: Some(false),
deletion_timestamp: deletion_timestamp,
data_change: true,
extended_file_metadata: Some(false),
..Default::default()
}
}