mirror of https://github.com/delta-io/delta-rs
Compare commits
13 Commits
acf0989845
...
61489c4ddc
Author | SHA1 | Date |
---|---|---|
R Tyler Croy | 61489c4ddc | |
Ion Koutsouris | c7349269e0 | |
Alex Wilcoxson | 8a8702fd9b | |
dependabot[bot] | 81593e9194 | |
emcake | 35664c0ef0 | |
KyJah Keys | cfb20f1795 | |
KyJah Keys | 7192997604 | |
Yijie Shen | e370d34571 | |
R Tyler Croy | d7165cfef8 | |
Ion Koutsouris | e25aed70a0 | |
Adrian Garcia Badaracco | d0617b5ca1 | |
R Tyler Croy | b2f55f441f | |
R Tyler Croy | 2acb62849a |
|
@ -22,15 +22,9 @@ pub struct ConfiguredCredentialChain {
|
|||
#[derive(Debug)]
|
||||
pub struct NoOpCredentials {}
|
||||
|
||||
pub fn new_region_provider(
|
||||
configuration: &ProviderConfig,
|
||||
disable_imds: bool,
|
||||
imds_timeout: u64,
|
||||
) -> RegionProviderChain {
|
||||
pub fn new_region_provider(disable_imds: bool, imds_timeout: u64) -> RegionProviderChain {
|
||||
let env_provider = EnvironmentVariableRegionProvider::new();
|
||||
let profile_file = aws_config::profile::region::Builder::default()
|
||||
.configure(configuration)
|
||||
.build();
|
||||
let profile_file = aws_config::profile::region::ProfileFileRegionProvider::default();
|
||||
if disable_imds {
|
||||
return RegionProviderChain::first_try(env_provider).or_else(profile_file);
|
||||
}
|
||||
|
@ -39,7 +33,6 @@ pub fn new_region_provider(
|
|||
.or_else(profile_file)
|
||||
.or_else(
|
||||
aws_config::imds::region::Builder::default()
|
||||
.configure(configuration)
|
||||
.imds_client(
|
||||
aws_config::imds::Client::builder()
|
||||
.connect_timeout(Duration::from_millis(imds_timeout))
|
||||
|
@ -61,6 +54,7 @@ impl ConfiguredCredentialChain {
|
|||
let web_identity_token_provider = WebIdentityTokenCredentialsProvider::builder()
|
||||
.configure(conf)
|
||||
.build();
|
||||
|
||||
let ecs_provider = EcsCredentialsProvider::builder().configure(conf).build();
|
||||
|
||||
let provider_chain = CredentialsProviderChain::first_try("Environment", env_provider)
|
||||
|
|
|
@ -175,7 +175,10 @@ impl S3StorageOptions {
|
|||
.unwrap_or(true);
|
||||
let imds_timeout =
|
||||
Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100);
|
||||
let provider_config = ProviderConfig::default();
|
||||
|
||||
let region_provider = crate::credentials::new_region_provider(disable_imds, imds_timeout);
|
||||
let region = execute_sdk_future(region_provider.region())?;
|
||||
let provider_config = ProviderConfig::default().with_region(region);
|
||||
let credentials_provider = crate::credentials::ConfiguredCredentialChain::new(
|
||||
disable_imds,
|
||||
imds_timeout,
|
||||
|
@ -189,23 +192,15 @@ impl S3StorageOptions {
|
|||
.map(|val| str_is_truthy(&val))
|
||||
.unwrap_or(false),
|
||||
))
|
||||
.region(crate::credentials::new_region_provider(
|
||||
&provider_config,
|
||||
disable_imds,
|
||||
imds_timeout,
|
||||
))
|
||||
.credentials_provider(credentials_provider)
|
||||
.region(region_provider)
|
||||
.load(),
|
||||
)?;
|
||||
#[cfg(feature = "rustls")]
|
||||
let sdk_config = execute_sdk_future(
|
||||
aws_config::from_env()
|
||||
.credentials_provider(credentials_provider)
|
||||
.region(crate::credentials::new_region_provider(
|
||||
&provider_config,
|
||||
disable_imds,
|
||||
imds_timeout,
|
||||
))
|
||||
.region(region_provider)
|
||||
.load(),
|
||||
)?;
|
||||
|
||||
|
@ -252,16 +247,18 @@ impl S3StorageOptions {
|
|||
}
|
||||
}
|
||||
|
||||
fn execute_sdk_future<F: Future<Output = SdkConfig> + Send + 'static>(
|
||||
future: F,
|
||||
) -> DeltaResult<SdkConfig> {
|
||||
fn execute_sdk_future<F, T>(future: F) -> DeltaResult<T>
|
||||
where
|
||||
T: Send,
|
||||
F: Future<Output = T> + Send,
|
||||
{
|
||||
match tokio::runtime::Handle::try_current() {
|
||||
Ok(handle) => match handle.runtime_flavor() {
|
||||
tokio::runtime::RuntimeFlavor::MultiThread => {
|
||||
Ok(tokio::task::block_in_place(move || handle.block_on(future)))
|
||||
}
|
||||
_ => {
|
||||
let mut cfg: Option<SdkConfig> = None;
|
||||
let mut cfg: Option<T> = None;
|
||||
std::thread::scope(|scope| {
|
||||
scope.spawn(|| {
|
||||
cfg = Some(handle.block_on(future));
|
||||
|
|
|
@ -97,13 +97,14 @@ reqwest = { version = "0.11.18", default-features = false, features = [
|
|||
"rustls-tls",
|
||||
"json",
|
||||
], optional = true }
|
||||
sqlparser = { version = "0.44", optional = true }
|
||||
sqlparser = { version = "0.46", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion = "0.5"
|
||||
ctor = "0"
|
||||
deltalake-test = { path = "../test", features = ["datafusion"] }
|
||||
dotenvy = "0"
|
||||
fs_extra = "1.2.0"
|
||||
hyper = { version = "0.14", features = ["server"] }
|
||||
maplit = "1"
|
||||
pretty_assertions = "1.2.1"
|
||||
|
|
|
@ -518,6 +518,9 @@ pub(super) async fn list_log_files(
|
|||
#[cfg(test)]
|
||||
pub(super) mod tests {
|
||||
use deltalake_test::utils::*;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::checkpoints::create_checkpoint_from_table_uri_and_cleanup;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -620,6 +623,115 @@ pub(super) mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn concurrent_checkpoint(context: &IntegrationContext) -> TestResult {
|
||||
context
|
||||
.load_table(TestTables::LatestNotCheckpointed)
|
||||
.await?;
|
||||
let table_to_checkpoint = context
|
||||
.table_builder(TestTables::LatestNotCheckpointed)
|
||||
.load()
|
||||
.await?;
|
||||
let store = context
|
||||
.table_builder(TestTables::LatestNotCheckpointed)
|
||||
.build_storage()?
|
||||
.object_store();
|
||||
let slow_list_store = Arc::new(slow_store::SlowListStore { store });
|
||||
|
||||
let version = table_to_checkpoint.version();
|
||||
let load_task: JoinHandle<Result<LogSegment, DeltaTableError>> = tokio::spawn(async move {
|
||||
let segment =
|
||||
LogSegment::try_new(&Path::default(), Some(version), slow_list_store.as_ref())
|
||||
.await?;
|
||||
Ok(segment)
|
||||
});
|
||||
|
||||
create_checkpoint_from_table_uri_and_cleanup(
|
||||
&table_to_checkpoint.table_uri(),
|
||||
version,
|
||||
Some(false),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let segment = load_task.await??;
|
||||
assert_eq!(segment.version, version);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
mod slow_store {
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::stream::BoxStream;
|
||||
use object_store::{
|
||||
path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
|
||||
PutOptions, PutResult, Result,
|
||||
};
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SlowListStore {
|
||||
pub store: Arc<dyn ObjectStore>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SlowListStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "SlowListStore {{ store: {} }}", self.store)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl object_store::ObjectStore for SlowListStore {
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
bytes: Bytes,
|
||||
opts: PutOptions,
|
||||
) -> Result<PutResult> {
|
||||
self.store.put_opts(location, bytes, opts).await
|
||||
}
|
||||
async fn put_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
|
||||
self.store.put_multipart(location).await
|
||||
}
|
||||
|
||||
async fn abort_multipart(
|
||||
&self,
|
||||
location: &Path,
|
||||
multipart_id: &MultipartId,
|
||||
) -> Result<()> {
|
||||
self.store.abort_multipart(location, multipart_id).await
|
||||
}
|
||||
|
||||
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
|
||||
self.store.get_opts(location, options).await
|
||||
}
|
||||
|
||||
async fn delete(&self, location: &Path) -> Result<()> {
|
||||
self.store.delete(location).await
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
|
||||
std::thread::sleep(std::time::Duration::from_secs(1));
|
||||
self.store.list(prefix)
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
|
||||
self.store.list_with_delimiter(prefix).await
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
self.store.copy(from, to).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
|
||||
self.store.copy_if_not_exists(from, to).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn is_commit_file_only_matches_commits() {
|
||||
for path in [0, 1, 5, 10, 100, i64::MAX]
|
||||
|
|
|
@ -287,11 +287,13 @@ impl Snapshot {
|
|||
}
|
||||
|
||||
/// Get the statistics schema of the snapshot
|
||||
pub fn stats_schema(&self) -> DeltaResult<StructType> {
|
||||
pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult<StructType> {
|
||||
let schema = table_schema.unwrap_or_else(|| self.schema());
|
||||
|
||||
let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
|
||||
stats_cols
|
||||
.iter()
|
||||
.map(|col| match self.schema().field_with_name(col) {
|
||||
.map(|col| match schema.field_with_name(col) {
|
||||
Ok(field) => match field.data_type() {
|
||||
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
|
||||
Err(DeltaTableError::Generic(format!(
|
||||
|
@ -314,7 +316,7 @@ impl Snapshot {
|
|||
.collect::<Result<Vec<_>, _>>()?
|
||||
} else {
|
||||
let num_indexed_cols = self.table_config().num_indexed_cols();
|
||||
self.schema()
|
||||
schema
|
||||
.fields
|
||||
.iter()
|
||||
.enumerate()
|
||||
|
@ -362,7 +364,7 @@ impl EagerSnapshot {
|
|||
let mut files = Vec::new();
|
||||
let mut scanner = LogReplayScanner::new();
|
||||
files.push(scanner.process_files_batch(&batch, true)?);
|
||||
let mapper = LogMapper::try_new(&snapshot)?;
|
||||
let mapper = LogMapper::try_new(&snapshot, None)?;
|
||||
files = files
|
||||
.into_iter()
|
||||
.map(|b| mapper.map_batch(b))
|
||||
|
@ -401,7 +403,7 @@ impl EagerSnapshot {
|
|||
)
|
||||
.boxed()
|
||||
};
|
||||
let mapper = LogMapper::try_new(&self.snapshot)?;
|
||||
let mapper = LogMapper::try_new(&self.snapshot, None)?;
|
||||
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
|
||||
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
|
||||
.try_collect()
|
||||
|
@ -517,7 +519,13 @@ impl EagerSnapshot {
|
|||
files.push(scanner.process_files_batch(&batch?, true)?);
|
||||
}
|
||||
|
||||
let mapper = LogMapper::try_new(&self.snapshot)?;
|
||||
let mapper = if let Some(metadata) = &metadata {
|
||||
let new_schema: StructType = serde_json::from_str(&metadata.schema_string)?;
|
||||
LogMapper::try_new(&self.snapshot, Some(&new_schema))?
|
||||
} else {
|
||||
LogMapper::try_new(&self.snapshot, None)?
|
||||
};
|
||||
|
||||
self.files = files
|
||||
.into_iter()
|
||||
.chain(
|
||||
|
@ -605,7 +613,7 @@ mod tests {
|
|||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
|
||||
use super::log_segment::tests::test_log_segment;
|
||||
use super::log_segment::tests::{concurrent_checkpoint, test_log_segment};
|
||||
use super::replay::tests::test_log_replay;
|
||||
use super::*;
|
||||
use crate::kernel::Remove;
|
||||
|
@ -627,6 +635,13 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_concurrent_checkpoint() -> TestResult {
|
||||
let context = IntegrationContext::new(Box::<LocalStorageIntegration>::default())?;
|
||||
concurrent_checkpoint(&context).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn test_snapshot(context: &IntegrationContext) -> TestResult {
|
||||
let store = context
|
||||
.table_builder(TestTables::Simple)
|
||||
|
|
|
@ -21,6 +21,7 @@ use tracing::debug;
|
|||
|
||||
use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
|
||||
use crate::kernel::arrow::json;
|
||||
use crate::kernel::StructType;
|
||||
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
|
||||
|
||||
use super::Snapshot;
|
||||
|
@ -41,7 +42,7 @@ pin_project! {
|
|||
|
||||
impl<S> ReplayStream<S> {
|
||||
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
|
||||
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
|
||||
let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?);
|
||||
let mapper = Arc::new(LogMapper {
|
||||
stats_schema,
|
||||
config: snapshot.config.clone(),
|
||||
|
@ -61,9 +62,12 @@ pub(super) struct LogMapper {
|
|||
}
|
||||
|
||||
impl LogMapper {
|
||||
pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult<Self> {
|
||||
pub(super) fn try_new(
|
||||
snapshot: &Snapshot,
|
||||
table_schema: Option<&StructType>,
|
||||
) -> DeltaResult<Self> {
|
||||
Ok(Self {
|
||||
stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?),
|
||||
stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?),
|
||||
config: snapshot.config.clone(),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1561,5 +1561,30 @@ pub(super) mod zorder {
|
|||
assert_eq!(data.value_data().len(), 3 * 16 * 3);
|
||||
assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn works_on_spark_table() {
|
||||
use crate::DeltaOps;
|
||||
use tempfile::TempDir;
|
||||
// Create a temporary directory
|
||||
let tmp_dir = TempDir::new().expect("Failed to make temp dir");
|
||||
let table_name = "delta-1.2.1-only-struct-stats";
|
||||
|
||||
// Copy recursively from the test data directory to the temporary directory
|
||||
let source_path = format!("../test/tests/data/{table_name}");
|
||||
fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap();
|
||||
|
||||
// Run optimize
|
||||
let (_, metrics) =
|
||||
DeltaOps::try_from_uri(tmp_dir.path().join(table_name).to_str().unwrap())
|
||||
.await
|
||||
.unwrap()
|
||||
.optimize()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify it worked
|
||||
assert_eq!(metrics.num_files_added, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1190,6 +1190,32 @@ mod tests {
|
|||
assert_eq!(expected, actions);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_table_not_always_with_stats() {
|
||||
let path = "../test/tests/data/delta-stats-optional";
|
||||
let mut table = crate::open_table(path).await.unwrap();
|
||||
table.load().await.unwrap();
|
||||
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
|
||||
let actions = sort_batch_by(&actions, "path").unwrap();
|
||||
// get column-0 path, and column-4 num_records, and column_5 null_count.integer
|
||||
let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
|
||||
"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
|
||||
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
|
||||
]));
|
||||
let expected_num_records: ArrayRef =
|
||||
Arc::new(array::Int64Array::from(vec![None, Some(1)]));
|
||||
let expected_null_count: ArrayRef =
|
||||
Arc::new(array::Int64Array::from(vec![None, Some(0)]));
|
||||
|
||||
let path_column = actions.column(0);
|
||||
let num_records_column = actions.column(4);
|
||||
let null_count_column = actions.column(5);
|
||||
|
||||
assert_eq!(&expected_path, path_column);
|
||||
assert_eq!(&expected_num_records, num_records_column);
|
||||
assert_eq!(&expected_null_count, null_count_column);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_only_struct_stats() {
|
||||
// test table with no json stats
|
||||
|
|
|
@ -447,11 +447,12 @@ impl DeltaTableState {
|
|||
.map(|(path, datatype)| -> Result<ColStats, DeltaTableError> {
|
||||
let null_count = stats
|
||||
.iter()
|
||||
.flat_map(|maybe_stat| {
|
||||
.map(|maybe_stat| {
|
||||
maybe_stat
|
||||
.as_ref()
|
||||
.map(|stat| resolve_column_count_stat(&stat.null_count, &path))
|
||||
})
|
||||
.map(|null_count| null_count.flatten())
|
||||
.collect::<Vec<Option<i64>>>();
|
||||
let null_count = Some(value_vec_to_array(null_count, |values| {
|
||||
Ok(Arc::new(arrow::array::Int64Array::from(values)))
|
||||
|
@ -463,11 +464,12 @@ impl DeltaTableState {
|
|||
let min_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
|
||||
let min_values = stats
|
||||
.iter()
|
||||
.flat_map(|maybe_stat| {
|
||||
.map(|maybe_stat| {
|
||||
maybe_stat
|
||||
.as_ref()
|
||||
.map(|stat| resolve_column_value_stat(&stat.min_values, &path))
|
||||
})
|
||||
.map(|min_value| min_value.flatten())
|
||||
.collect::<Vec<Option<&serde_json::Value>>>();
|
||||
|
||||
Some(value_vec_to_array(min_values, |values| {
|
||||
|
@ -480,11 +482,12 @@ impl DeltaTableState {
|
|||
let max_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
|
||||
let max_values = stats
|
||||
.iter()
|
||||
.flat_map(|maybe_stat| {
|
||||
.map(|maybe_stat| {
|
||||
maybe_stat
|
||||
.as_ref()
|
||||
.map(|stat| resolve_column_value_stat(&stat.max_values, &path))
|
||||
})
|
||||
.map(|max_value| max_value.flatten())
|
||||
.collect::<Vec<Option<&serde_json::Value>>>();
|
||||
Some(value_vec_to_array(max_values, |values| {
|
||||
json_value_to_array_general(&arrow_type, values.into_iter())
|
||||
|
|
|
@ -192,6 +192,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
|
|||
values: RecordBatch,
|
||||
mode: WriteMode,
|
||||
) -> Result<(), DeltaTableError> {
|
||||
if mode == WriteMode::MergeSchema && !self.partition_columns.is_empty() {
|
||||
return Err(DeltaTableError::Generic(
|
||||
"Merging Schemas with partition columns present is currently unsupported"
|
||||
.to_owned(),
|
||||
));
|
||||
}
|
||||
// Set the should_evolve flag for later in case the writer should perform schema evolution
|
||||
// on its flush_and_commit
|
||||
self.should_evolve = mode == WriteMode::MergeSchema;
|
||||
|
@ -237,8 +243,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
|
|||
|
||||
if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve {
|
||||
let schema: StructType = self.arrow_schema_ref.clone().try_into()?;
|
||||
// TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe
|
||||
// this should just propagate the existing columns in the new action
|
||||
if !self.partition_columns.is_empty() {
|
||||
return Err(DeltaTableError::Generic(
|
||||
"Merging Schemas with partition columns present is currently unsupported"
|
||||
.to_owned(),
|
||||
));
|
||||
}
|
||||
let part_cols: Vec<String> = vec![];
|
||||
let metadata = Metadata::try_new(schema, part_cols, HashMap::new())?;
|
||||
adds.push(Action::Metadata(metadata));
|
||||
|
@ -662,6 +672,8 @@ mod tests {
|
|||
// The following sets of tests are related to #1386 and mergeSchema support
|
||||
// <https://github.com/delta-io/delta-rs/issues/1386>
|
||||
mod schema_evolution {
|
||||
use itertools::Itertools;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -772,6 +784,80 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_schema_evolution_with_partition_columns_should_fail_as_unsupported() {
|
||||
let table_schema = get_delta_schema();
|
||||
let table_dir = tempfile::tempdir().unwrap();
|
||||
let table_path = table_dir.path();
|
||||
|
||||
let mut table = CreateBuilder::new()
|
||||
.with_location(table_path.to_str().unwrap())
|
||||
.with_table_name("test-table")
|
||||
.with_comment("A table for running tests")
|
||||
.with_columns(table_schema.fields().clone())
|
||||
.with_partition_columns(["id"])
|
||||
.await
|
||||
.unwrap();
|
||||
table.load().await.expect("Failed to load table");
|
||||
assert_eq!(table.version(), 0);
|
||||
|
||||
let batch = get_record_batch(None, false);
|
||||
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
|
||||
|
||||
writer.write(batch).await.unwrap();
|
||||
let version = writer.flush_and_commit(&mut table).await.unwrap();
|
||||
assert_eq!(version, 1);
|
||||
table.load().await.expect("Failed to load table");
|
||||
assert_eq!(table.version(), 1);
|
||||
|
||||
// Create a second batch with appended columns
|
||||
let second_batch = {
|
||||
let second = get_record_batch(None, false);
|
||||
let second_schema = ArrowSchema::new(
|
||||
second
|
||||
.schema()
|
||||
.fields
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain([
|
||||
Field::new("vid", DataType::Int32, true).into(),
|
||||
Field::new("name", DataType::Utf8, true).into(),
|
||||
])
|
||||
.collect_vec(),
|
||||
);
|
||||
|
||||
let len = second.num_rows();
|
||||
|
||||
let second_arrays = second
|
||||
.columns()
|
||||
.iter()
|
||||
.cloned()
|
||||
.chain([
|
||||
Arc::new(Int32Array::from(vec![Some(1); len])) as _, // vid
|
||||
Arc::new(StringArray::from(vec![Some("will"); len])) as _, // name
|
||||
])
|
||||
.collect_vec();
|
||||
|
||||
RecordBatch::try_new(second_schema.into(), second_arrays).unwrap()
|
||||
};
|
||||
|
||||
let result = writer
|
||||
.write_with_mode(second_batch, WriteMode::MergeSchema)
|
||||
.await;
|
||||
|
||||
assert!(result.is_err());
|
||||
|
||||
match result.unwrap_err() {
|
||||
DeltaTableError::Generic(s) => {
|
||||
assert_eq!(
|
||||
s,
|
||||
"Merging Schemas with partition columns present is currently unsupported"
|
||||
)
|
||||
}
|
||||
e => panic!("unexpected error: {e:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schema_evolution_column_type_mismatch() {
|
||||
let batch = get_record_batch(None, false);
|
||||
|
|
|
@ -276,22 +276,19 @@ async fn regular_rename(from: &str, to: &str) -> Result<(), LocalFileSystemError
|
|||
"Already exists",
|
||||
)),
|
||||
})
|
||||
} else if std::path::Path::new(&from_path).exists() {
|
||||
std::fs::rename(&from_path, &to_path).map_err(|err| LocalFileSystemError::Generic {
|
||||
store: STORE_NAME,
|
||||
source: Box::new(err),
|
||||
})
|
||||
} else {
|
||||
std::fs::rename(&from_path, &to_path).map_err(|err| {
|
||||
println!("err: {err:?}");
|
||||
if err.kind() == std::io::ErrorKind::NotFound {
|
||||
LocalFileSystemError::NotFound {
|
||||
path: from_path.clone(),
|
||||
source: Box::new(err),
|
||||
}
|
||||
} else {
|
||||
LocalFileSystemError::Generic {
|
||||
store: STORE_NAME,
|
||||
source: Box::new(err),
|
||||
}
|
||||
}
|
||||
})?;
|
||||
Ok(())
|
||||
Err(LocalFileSystemError::NotFound {
|
||||
path: from_path.clone(),
|
||||
source: Box::new(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
format!("Could not find {from_path}"),
|
||||
)),
|
||||
})
|
||||
}
|
||||
})
|
||||
.await
|
||||
|
|
|
@ -154,6 +154,7 @@ pub enum TestTables {
|
|||
Delta0_8_0Partitioned,
|
||||
Delta0_8_0SpecialPartitioned,
|
||||
Checkpoints,
|
||||
LatestNotCheckpointed,
|
||||
WithDvSmall,
|
||||
Custom(String),
|
||||
}
|
||||
|
@ -189,6 +190,11 @@ impl TestTables {
|
|||
.unwrap()
|
||||
.to_owned(),
|
||||
Self::Checkpoints => data_path.join("checkpoints").to_str().unwrap().to_owned(),
|
||||
Self::LatestNotCheckpointed => data_path
|
||||
.join("latest_not_checkpointed")
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_owned(),
|
||||
Self::WithDvSmall => data_path
|
||||
.join("table-with-dv-small")
|
||||
.to_str()
|
||||
|
@ -208,6 +214,7 @@ impl TestTables {
|
|||
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
|
||||
Self::Delta0_8_0SpecialPartitioned => "delta-0.8.0-special-partition".into(),
|
||||
Self::Checkpoints => "checkpoints".into(),
|
||||
Self::LatestNotCheckpointed => "latest_not_checkpointed".into(),
|
||||
Self::WithDvSmall => "table-with-dv-small".into(),
|
||||
Self::Custom(name) => name.to_owned(),
|
||||
}
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
{"commitInfo":{"timestamp":1666652369577,"userId":"6114986638742036","userName":"dummy_username","operation":"CREATE OR REPLACE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"a8510a45-92dc-4e9f-9f7a-42bbcc9b752d"}}
|
||||
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
|
||||
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
|
|
@ -0,0 +1,3 @@
|
|||
{"commitInfo":{"timestamp":1666652373383,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"35e88c76-9cfb-4e0e-bce8-2317f3c49c75"}}
|
||||
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"null\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal\",\"type\":\"decimal(8,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array\",\"type\":{\"type\":\"array\",\"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_array_of_map\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
|
||||
{"add":{"path":"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652373000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"nullCount\":{\"integer\":0,\"null\":1,\"boolean\":0,\"double\":0,\"decimal\":0,\"string\":0,\"binary\":0,\"date\":0,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"map\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0}}}","tags":{"INSERTION_TIME":"1666652373000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
|
|
@ -0,0 +1,2 @@
|
|||
{"commitInfo":{"timestamp":1666652374424,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"efe25f5f-e03a-458d-8fbe-34ed2111b3c1"}}
|
||||
{"add":{"path":"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652374000,"dataChange":true,"stats_parsed":null,"tags":{"INSERTION_TIME":"1666652374000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,3 @@
|
|||
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
|
||||
{"metaData":{"id":"84b09beb-329c-4b5e-b493-f58c6c78b8fd","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"2"},"createdTime":1674611455081}}
|
||||
{"commitInfo":{"timestamp":1674611455099,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"d87e63fb-7388-4b1c-9afc-750a561012b7"}}
|
|
@ -0,0 +1,2 @@
|
|||
{"add":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","partitionValues":{},"size":965,"modificationTime":1674611456921,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"b\",\"int\":288,\"date\":\"1978-02-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":988,\"date\":\"2020-05-01\"},\"nullCount\":{\"letter\":3,\"int\":0,\"date\":0}}"}}
|
||||
{"commitInfo":{"timestamp":1674611457269,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"965"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"71d9bcd1-7f2b-46f8-bd1f-e0a8e872f3c3"}}
|
Binary file not shown.
|
@ -0,0 +1,3 @@
|
|||
{"add":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","partitionValues":{},"size":976,"modificationTime":1674611458901,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":120,\"date\":\"1971-07-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":667,\"date\":\"2018-02-01\"},\"nullCount\":{\"letter\":2,\"int\":0,\"date\":0}}"}}
|
||||
{"remove":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","deletionTimestamp":1674611459307,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":965}}
|
||||
{"commitInfo":{"timestamp":1674611459307,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"976"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"b08f5758-a8e9-4dd1-af7e-7b6e53928d7a"}}
|
|
@ -0,0 +1,3 @@
|
|||
{"add":{"path":"part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet","partitionValues":{},"size":1010,"modificationTime":1674611461541,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":93,\"date\":\"1975-06-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":753,\"date\":\"2013-03-01\"},\"nullCount\":{\"letter\":1,\"int\":0,\"date\":0}}"}}
|
||||
{"remove":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","deletionTimestamp":1674611461982,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":976}}
|
||||
{"commitInfo":{"timestamp":1674611461982,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"1010"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"0403bbaf-a6f2-4543-9e6c-bd068e76670f"}}
|
|
@ -0,0 +1 @@
|
|||
{"version":2,"size":1}
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "deltalake-python"
|
||||
version = "0.17.3"
|
||||
version = "0.17.4"
|
||||
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
|
||||
homepage = "https://github.com/delta-io/delta-rs"
|
||||
license = "Apache-2.0"
|
||||
|
|
|
@ -721,10 +721,17 @@ class DeltaFileSystemHandler:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
root: str,
|
||||
options: dict[str, str] | None = None,
|
||||
known_sizes: dict[str, int] | None = None,
|
||||
table_uri: str,
|
||||
options: Dict[str, str] | None = None,
|
||||
known_sizes: Dict[str, int] | None = None,
|
||||
) -> None: ...
|
||||
@classmethod
|
||||
def from_table(
|
||||
cls,
|
||||
table: RawDeltaTable,
|
||||
options: Dict[str, str] | None = None,
|
||||
known_sizes: Dict[str, int] | None = None,
|
||||
) -> "DeltaFileSystemHandler": ...
|
||||
def get_type_name(self) -> str: ...
|
||||
def copy_file(self, src: str, dst: str) -> None:
|
||||
"""Copy a file.
|
||||
|
@ -776,7 +783,7 @@ class DeltaFileSystemHandler:
|
|||
def open_input_file(self, path: str) -> ObjectInputFile:
|
||||
"""Open an input file for random access reading."""
|
||||
def open_output_stream(
|
||||
self, path: str, metadata: dict[str, str] | None = None
|
||||
self, path: str, metadata: Dict[str, str] | None = None
|
||||
) -> ObjectOutputStream:
|
||||
"""Open an output stream for sequential writing."""
|
||||
|
||||
|
|
|
@ -1,17 +1,102 @@
|
|||
from typing import Dict, List, Optional
|
||||
from typing import Any, Dict, List, Mapping, Optional
|
||||
|
||||
import pyarrow as pa
|
||||
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler
|
||||
|
||||
from ._internal import DeltaFileSystemHandler
|
||||
from ._internal import DeltaFileSystemHandler, RawDeltaTable
|
||||
|
||||
|
||||
# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
|
||||
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
|
||||
class DeltaStorageHandler(FileSystemHandler):
|
||||
"""
|
||||
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
table_uri: str,
|
||||
options: Optional[Dict[str, str]] = None,
|
||||
known_sizes: Optional[Dict[str, int]] = None,
|
||||
):
|
||||
self._handler = DeltaFileSystemHandler(
|
||||
table_uri=table_uri, options=options, known_sizes=known_sizes
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_table(
|
||||
cls,
|
||||
table: RawDeltaTable,
|
||||
options: Optional[Dict[str, str]] = None,
|
||||
known_sizes: Optional[Dict[str, int]] = None,
|
||||
) -> "DeltaStorageHandler":
|
||||
self = cls.__new__(cls)
|
||||
self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes)
|
||||
return self
|
||||
|
||||
def get_type_name(self) -> str:
|
||||
return self._handler.get_type_name()
|
||||
|
||||
def copy_file(self, src: str, dst: str) -> None:
|
||||
"""Copy a file.
|
||||
|
||||
If the destination exists and is a directory, an error is returned. Otherwise, it is replaced.
|
||||
"""
|
||||
return self._handler.copy_file(src=src, dst=dst)
|
||||
|
||||
def create_dir(self, path: str, recursive: bool = True) -> None:
|
||||
"""Create a directory and subdirectories.
|
||||
|
||||
This function succeeds if the directory already exists.
|
||||
"""
|
||||
return self._handler.create_dir(path, recursive)
|
||||
|
||||
def delete_dir(self, path: str) -> None:
|
||||
"""Delete a directory and its contents, recursively."""
|
||||
return self._handler.delete_dir(path)
|
||||
|
||||
def delete_file(self, path: str) -> None:
|
||||
"""Delete a file."""
|
||||
return self._handler.delete_file(path)
|
||||
|
||||
def equals(self, other: Any) -> bool:
|
||||
return self._handler.equals(other)
|
||||
|
||||
def delete_dir_contents(
|
||||
self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False
|
||||
) -> None:
|
||||
"""Delete a directory's contents, recursively.
|
||||
|
||||
Like delete_dir, but doesn't delete the directory itself.
|
||||
"""
|
||||
return self._handler.delete_dir_contents(
|
||||
path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok
|
||||
)
|
||||
|
||||
def delete_root_dir_contents(self) -> None:
|
||||
"""Delete the root directory contents, recursively."""
|
||||
return self._handler.delete_root_dir_contents()
|
||||
|
||||
def get_file_info(self, paths: List[str]) -> List[FileInfo]:
|
||||
"""Get info for the given files.
|
||||
|
||||
A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound.
|
||||
An exception indicates a truly exceptional condition (low-level I/O error, etc.).
|
||||
"""
|
||||
return self._handler.get_file_info(paths)
|
||||
|
||||
def move(self, src: str, dest: str) -> None:
|
||||
"""Move / rename a file or directory.
|
||||
|
||||
If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
|
||||
if it has the same type as the source, it is replaced - otherwise, behavior is
|
||||
unspecified (implementation-dependent).
|
||||
"""
|
||||
self._handler.move_file(src=src, dest=dest)
|
||||
|
||||
def normalize_path(self, path: str) -> str:
|
||||
"""Normalize filesystem path."""
|
||||
return self._handler.normalize_path(path)
|
||||
|
||||
def open_input_file(self, path: str) -> pa.PythonFile:
|
||||
"""
|
||||
Open an input file for random access reading.
|
||||
|
@ -22,7 +107,7 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
|
|||
Returns:
|
||||
NativeFile
|
||||
"""
|
||||
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
|
||||
return pa.PythonFile(self._handler.open_input_file(path))
|
||||
|
||||
def open_input_stream(self, path: str) -> pa.PythonFile:
|
||||
"""
|
||||
|
@ -34,7 +119,7 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
|
|||
Returns:
|
||||
NativeFile
|
||||
"""
|
||||
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
|
||||
return pa.PythonFile(self._handler.open_input_file(path))
|
||||
|
||||
def open_output_stream(
|
||||
self, path: str, metadata: Optional[Dict[str, str]] = None
|
||||
|
@ -51,11 +136,9 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
|
|||
Returns:
|
||||
NativeFile
|
||||
"""
|
||||
return pa.PythonFile(
|
||||
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
|
||||
)
|
||||
return pa.PythonFile(self._handler.open_output_stream(path, metadata))
|
||||
|
||||
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
|
||||
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]:
|
||||
"""
|
||||
Get info for the files defined by FileSelector.
|
||||
|
||||
|
@ -65,6 +148,9 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
|
|||
Returns:
|
||||
list of file info objects
|
||||
"""
|
||||
return DeltaFileSystemHandler.get_file_info_selector(
|
||||
self, selector.base_dir, selector.allow_not_found, selector.recursive
|
||||
return self._handler.get_file_info_selector(
|
||||
selector.base_dir, selector.allow_not_found, selector.recursive
|
||||
)
|
||||
|
||||
def open_append_stream(self, path: str, metadata: Mapping[str, str]) -> None:
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -1030,6 +1030,8 @@ class DeltaTable:
|
|||
partitions: Optional[List[Tuple[str, str, Any]]] = None,
|
||||
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
|
||||
parquet_read_options: Optional[ParquetReadOptions] = None,
|
||||
schema: Optional[pyarrow.Schema] = None,
|
||||
as_large_types: bool = False,
|
||||
) -> pyarrow.dataset.Dataset:
|
||||
"""
|
||||
Build a PyArrow Dataset using data from the DeltaTable.
|
||||
|
@ -1038,6 +1040,12 @@ class DeltaTable:
|
|||
partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
|
||||
filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
|
||||
parquet_read_options: Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31
|
||||
schema: The schema to use for the dataset. If None, the schema of the DeltaTable will be used. This can be used to force reading of Parquet/Arrow datatypes
|
||||
that DeltaLake can't represent in it's schema (e.g. LargeString).
|
||||
If you only need to read the schema with large types (e.g. for compatibility with Polars) you may want to use the `as_large_types` parameter instead.
|
||||
as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices).
|
||||
This is for compatibility with systems like Polars that only support the large versions of Arrow types.
|
||||
If `schema` is passed it takes precedence over this option.
|
||||
|
||||
More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html
|
||||
|
||||
|
@ -1087,8 +1095,10 @@ class DeltaTable:
|
|||
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
|
||||
}
|
||||
filesystem = pa_fs.PyFileSystem(
|
||||
DeltaStorageHandler(
|
||||
self._table.table_uri(), self._storage_options, file_sizes
|
||||
DeltaStorageHandler.from_table(
|
||||
self._table,
|
||||
self._storage_options,
|
||||
file_sizes,
|
||||
)
|
||||
)
|
||||
format = ParquetFileFormat(
|
||||
|
@ -1096,6 +1106,8 @@ class DeltaTable:
|
|||
default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
|
||||
)
|
||||
|
||||
schema = schema or self.schema().to_pyarrow(as_large_types=as_large_types)
|
||||
|
||||
fragments = [
|
||||
format.make_fragment(
|
||||
file,
|
||||
|
@ -1103,12 +1115,10 @@ class DeltaTable:
|
|||
partition_expression=part_expression,
|
||||
)
|
||||
for file, part_expression in self._table.dataset_partitions(
|
||||
self.schema().to_pyarrow(), partitions
|
||||
schema, partitions
|
||||
)
|
||||
]
|
||||
|
||||
schema = self.schema().to_pyarrow()
|
||||
|
||||
dictionary_columns = format.read_options.dictionary_columns or set()
|
||||
if dictionary_columns:
|
||||
for index, field in enumerate(schema):
|
||||
|
|
|
@ -339,13 +339,18 @@ def write_deltalake(
|
|||
"schema_mode 'merge' is not supported in pyarrow engine. Use engine=rust"
|
||||
)
|
||||
# We need to write against the latest table version
|
||||
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))
|
||||
|
||||
def sort_arrow_schema(schema: pa.schema) -> pa.schema:
|
||||
sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type)))
|
||||
return pa.schema(sorted_cols)
|
||||
|
||||
if table: # already exists
|
||||
filesystem = pa_fs.PyFileSystem(
|
||||
DeltaStorageHandler.from_table(
|
||||
table=table._table, options=storage_options
|
||||
)
|
||||
)
|
||||
|
||||
if sort_arrow_schema(schema) != sort_arrow_schema(
|
||||
table.schema().to_pyarrow(as_large_types=large_dtypes)
|
||||
) and not (mode == "overwrite" and schema_mode == "overwrite"):
|
||||
|
@ -370,6 +375,9 @@ def write_deltalake(
|
|||
partition_by = table.metadata().partition_columns
|
||||
|
||||
else: # creating a new table
|
||||
filesystem = pa_fs.PyFileSystem(
|
||||
DeltaStorageHandler(table_uri, options=storage_options)
|
||||
)
|
||||
current_version = -1
|
||||
|
||||
dtype_map = {
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::PythonError;
|
||||
use crate::utils::{delete_dir, rt, walk_tree};
|
||||
use crate::RawDeltaTable;
|
||||
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
|
||||
use deltalake::DeltaTableBuilder;
|
||||
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
|
||||
use pyo3::prelude::*;
|
||||
use pyo3::types::{IntoPyDict, PyBytes};
|
||||
use pyo3::types::{IntoPyDict, PyBytes, PyType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
|
||||
|
@ -43,19 +43,39 @@ impl DeltaFileSystemHandler {
|
|||
#[new]
|
||||
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
|
||||
fn new(
|
||||
table_uri: &str,
|
||||
table_uri: String,
|
||||
options: Option<HashMap<String, String>>,
|
||||
known_sizes: Option<HashMap<String, i64>>,
|
||||
) -> PyResult<Self> {
|
||||
let storage = DeltaTableBuilder::from_uri(table_uri)
|
||||
let storage = DeltaTableBuilder::from_uri(&table_uri)
|
||||
.with_storage_options(options.clone().unwrap_or_default())
|
||||
.build_storage()
|
||||
.map_err(PythonError::from)?
|
||||
.object_store();
|
||||
|
||||
Ok(Self {
|
||||
inner: storage,
|
||||
config: FsConfig {
|
||||
root_url: table_uri.into(),
|
||||
root_url: table_uri,
|
||||
options: options.unwrap_or_default(),
|
||||
},
|
||||
known_sizes,
|
||||
})
|
||||
}
|
||||
|
||||
#[classmethod]
|
||||
#[pyo3(signature = (table, options = None, known_sizes = None))]
|
||||
fn from_table(
|
||||
_cls: &PyType,
|
||||
table: &RawDeltaTable,
|
||||
options: Option<HashMap<String, String>>,
|
||||
known_sizes: Option<HashMap<String, i64>>,
|
||||
) -> PyResult<Self> {
|
||||
let storage = table._table.object_store();
|
||||
Ok(Self {
|
||||
inner: storage,
|
||||
config: FsConfig {
|
||||
root_url: table._table.table_uri(),
|
||||
options: options.unwrap_or_default(),
|
||||
},
|
||||
known_sizes,
|
||||
|
|
|
@ -50,7 +50,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds, monkeypatch):
|
|||
# Create unauthenticated handler
|
||||
storage_handler = DeltaStorageHandler(
|
||||
"s3://deltars/",
|
||||
{
|
||||
options={
|
||||
"AWS_ENDPOINT_URL": s3_localstack_creds["AWS_ENDPOINT_URL"],
|
||||
# Grants anonymous access. If we don't do this, will timeout trying
|
||||
# to reading from EC2 instance provider.
|
||||
|
|
|
@ -13,7 +13,7 @@ import pyarrow as pa
|
|||
import pyarrow.compute as pc
|
||||
import pytest
|
||||
from packaging import version
|
||||
from pyarrow.dataset import ParquetFileFormat, ParquetReadOptions
|
||||
from pyarrow.dataset import ParquetFileFormat, ParquetReadOptions, dataset
|
||||
from pyarrow.lib import RecordBatchReader
|
||||
|
||||
from deltalake import DeltaTable, Schema, write_deltalake
|
||||
|
@ -1306,6 +1306,62 @@ def test_large_arrow_types(tmp_path: pathlib.Path):
|
|||
assert table.schema == dt.schema().to_pyarrow(as_large_types=True)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
int(pa.__version__.split(".")[0]) < 10, reason="map casts require pyarrow >= 10"
|
||||
)
|
||||
def test_large_arrow_types_dataset_as_large_types(tmp_path: pathlib.Path):
|
||||
pylist = [
|
||||
{"name": "Joey", "gender": b"M", "arr_type": ["x", "y"], "dict": {"a": b"M"}},
|
||||
{"name": "Ivan", "gender": b"F", "arr_type": ["x", "z"]},
|
||||
]
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("name", pa.large_string()),
|
||||
pa.field("gender", pa.large_binary()),
|
||||
pa.field("arr_type", pa.large_list(pa.large_string())),
|
||||
pa.field("map_type", pa.map_(pa.large_string(), pa.large_binary())),
|
||||
pa.field("struct", pa.struct([pa.field("sub", pa.large_string())])),
|
||||
]
|
||||
)
|
||||
table = pa.Table.from_pylist(pylist, schema=schema)
|
||||
|
||||
write_deltalake(tmp_path, table)
|
||||
|
||||
dt = DeltaTable(tmp_path)
|
||||
|
||||
ds = dt.to_pyarrow_dataset(as_large_types=True)
|
||||
union_ds = dataset([ds, dataset(table)])
|
||||
assert union_ds.to_table().shape[0] == 4
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
int(pa.__version__.split(".")[0]) < 10, reason="map casts require pyarrow >= 10"
|
||||
)
|
||||
def test_large_arrow_types_explicit_scan_schema(tmp_path: pathlib.Path):
|
||||
pylist = [
|
||||
{"name": "Joey", "gender": b"M", "arr_type": ["x", "y"], "dict": {"a": b"M"}},
|
||||
{"name": "Ivan", "gender": b"F", "arr_type": ["x", "z"]},
|
||||
]
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("name", pa.large_string()),
|
||||
pa.field("gender", pa.large_binary()),
|
||||
pa.field("arr_type", pa.large_list(pa.large_string())),
|
||||
pa.field("map_type", pa.map_(pa.large_string(), pa.large_binary())),
|
||||
pa.field("struct", pa.struct([pa.field("sub", pa.large_string())])),
|
||||
]
|
||||
)
|
||||
table = pa.Table.from_pylist(pylist, schema=schema)
|
||||
|
||||
write_deltalake(tmp_path, table)
|
||||
|
||||
dt = DeltaTable(tmp_path)
|
||||
|
||||
ds = dt.to_pyarrow_dataset(schema=schema)
|
||||
union_ds = dataset([ds, dataset(table)])
|
||||
assert union_ds.to_table().shape[0] == 4
|
||||
|
||||
|
||||
def test_partition_large_arrow_types(tmp_path: pathlib.Path):
|
||||
table = pa.table(
|
||||
{
|
||||
|
@ -1570,3 +1626,22 @@ def test_write_timestamp_ntz_on_table_with_features_not_enabled(tmp_path: pathli
|
|||
write_deltalake(
|
||||
tmp_path, data, mode="overwrite", engine="pyarrow", schema_mode="overwrite"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("engine", ["pyarrow", "rust"])
|
||||
def test_parse_stats_with_new_schema(tmp_path, engine):
|
||||
sample_data = pa.table(
|
||||
{
|
||||
"val": pa.array([1, 1], pa.int8()),
|
||||
}
|
||||
)
|
||||
write_deltalake(tmp_path, sample_data)
|
||||
|
||||
sample_data = pa.table(
|
||||
{
|
||||
"val": pa.array([1000000000000, 1000000000000], pa.int64()),
|
||||
}
|
||||
)
|
||||
write_deltalake(
|
||||
tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue