Compare commits

...

13 Commits

Author SHA1 Message Date
R Tyler Croy 61489c4ddc
Merge b2f55f441f into c7349269e0 2024-05-10 21:19:52 -04:00
Ion Koutsouris c7349269e0
fix(python): reuse state in `to_pyarrow_dataset` (#2485)
# Description
Reuse the state so we save time instead of reconstructing and verifying
the state.
2024-05-10 16:19:44 -07:00
Alex Wilcoxson 8a8702fd9b
test: add test for concurrent checkpoint during table load (#2151)
# Description
This draft PR adds a unit test to demonstrate a concurrency issue with
checkpoints and an object store with slow list_with_offset, e.g. Azure
when the log directory is large

This test outputs the following (with some comments for explanation)

```
### Initial table load, one checkpoint exists at version 2

Some("kernel::snapshot::tests::test_concurrent_checkpoint") - cp: CheckpointMetadata {
    version: 2,
    size: 1,
    parts: None,
    size_in_bytes: None,
    num_of_add_files: None,
    checkpoint_schema: None,
    checksum: None,
}, checkpoint_files: [
    ObjectMeta {
        location: Path {
            raw: "_delta_log/00000000000000000002.checkpoint.parquet",
        },
        last_modified: 2024-01-31T04:47:54.160159326Z,
        size: 12712,
        e_tag: Some(
            "3d46c38-6103694fd241f-31a8",
        ),
        version: None,
    },
]

### tokio task loading table with slow list

Some("tokio-runtime-worker") - getting: Path { raw: "_delta_log/_last_checkpoint" }
Some("tokio-runtime-worker") - slow list: Some(Path { raw: "_delta_log" })

### main thread loading table during checkpoint

Some("kernel::snapshot::tests::test_concurrent_checkpoint") - cp: CheckpointMetadata {
    version: 2,
    size: 1,
    parts: None,
    size_in_bytes: None,
    num_of_add_files: None,
    checkpoint_schema: None,
    checksum: None,
}, checkpoint_files: [
    ObjectMeta {
        location: Path {
            raw: "_delta_log/00000000000000000002.checkpoint.parquet",
        },
        last_modified: 2024-01-31T04:47:54.160159326Z,
        size: 12712,
        e_tag: Some(
            "3d46c38-6103694fd241f-31a8",
        ),
        version: None,
    },
]

### checkpoint created on main thread

Some("kernel::snapshot::tests::test_concurrent_checkpoint") - checkpoint created

### table load complete on tokio thread, the checkpoint in memory is version 2, but after the slow list the checkpoint file list contains checkpoint 2 and 3, which causes the panic

Some("tokio-runtime-worker") - cp: CheckpointMetadata {
    version: 2,
    size: 1,
    parts: None,
    size_in_bytes: None,
    num_of_add_files: None,
    checkpoint_schema: None,
    checksum: None,
}, checkpoint_files: [
    ObjectMeta {
        location: Path {
            raw: "_delta_log/00000000000000000003.checkpoint.parquet",
        },
        last_modified: 2024-01-31T16:22:45.874545313Z,
        size: 21472,
        e_tag: Some(
            "3d46c3a-610404a023771-53e0",
        ),
        version: None,
    },
    ObjectMeta {
        location: Path {
            raw: "_delta_log/00000000000000000002.checkpoint.parquet",
        },
        last_modified: 2024-01-31T04:47:54.160159326Z,
        size: 12712,
        e_tag: Some(
            "3d46c38-6103694fd241f-31a8",
        ),
        version: None,
    },
]


thread 'tokio-runtime-worker' panicked at crates/core/src/kernel/snapshot/log_segment.rs:462:5:
assertion `left == right` failed
  left: 2
 right: 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Error: JoinError::Panic(Id(17), ...)
test kernel::snapshot::tests::test_concurrent_checkpoint ... FAILED
```
2024-05-10 19:03:59 +02:00
dependabot[bot] 81593e9194 chore(deps): update sqlparser requirement from 0.44 to 0.46
Updates the requirements on [sqlparser](https://github.com/sqlparser-rs/sqlparser-rs) to permit the latest version.
- [Changelog](https://github.com/sqlparser-rs/sqlparser-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/sqlparser-rs/sqlparser-rs/compare/v0.44.0...v0.46.0)

---
updated-dependencies:
- dependency-name: sqlparser
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-07 08:53:54 -07:00
emcake 35664c0ef0 fix: Return unsupported error for merging schemas in the presence of partiton columns 2024-05-07 07:12:38 -07:00
KyJah Keys cfb20f1795 applied cargo fmt 2024-05-07 06:47:12 -07:00
KyJah Keys 7192997604 fix(python, rust): region lookup wasn't working correctly for dynamo 2024-05-07 06:47:12 -07:00
Yijie Shen e370d34571
fix(rust): unable to read delta table when table contains both null and non-null add stats (#2476)
# Description
To fix the issue when a delta table contains add action with
stats_parsed: null.

As shown in the test case, `001.json` contains an Add action with stats,
while `002.json` contains an Add action with `stats_parsed: null`,
before this fix, it will complain:

```
Arrow { source: InvalidArgumentError("all columns in a record batch must have the same length") }
```

The issue is that the array for `num_records` has two values, while for
other stats such as null_count, the None value is filtered out by
`flat_map`, so there is only one value in the array.


# Related Issue(s)
closes #2477 

# Documentation

<!---
Share links to useful documentation
--->
2024-05-06 21:47:48 +00:00
R Tyler Croy d7165cfef8
fix: check to see if the file exists before attempting to rename (#2482)
In the case of /tmp existing on tmpfs with musl, the prior version of
this would fail with a cross-device link error before bubbling up a not
found error
2024-05-06 21:28:48 +00:00
Ion Koutsouris e25aed70a0
fix(python, rust): use new schema for stats parsing instead of old (#2480)
# Description
In some edge cases where we schema evolve, it would parse the stats with
the old schema result in these kind of errors:
`Exception: Json error: whilst decoding field 'minValues': whilst
decoding field 'foo': failed to parse 1000000000000 as Int8`

```python
import polars as pl
from deltalake import write_deltalake

pl.DataFrame({
    "foo": [1]
}, schema={"foo": pl.Int8}).write_delta("TEST_TABLE_BUG")


write_deltalake("TEST_TABLE_BUG", data = pl.DataFrame({
    "foo": [1000000000000]
}, schema={"foo": pl.Int64}).to_arrow(), mode='overwrite', overwrite_schema=True,engine='rust')
```

Instead of taking the old schema, I added an optional schema to be
passed in the logMapper
2024-05-06 16:39:13 +00:00
Adrian Garcia Badaracco d0617b5ca1
feat(python): add parameter to DeltaTable.to_pyarrow_dataset() (#2465)
Otherwise there is no way to union this with another dataset.
2024-05-05 22:14:37 +00:00
R Tyler Croy b2f55f441f
Merge branch 'main' into optimize-spark-table-1648 2024-03-08 16:45:35 -08:00
R Tyler Croy 2acb62849a fix: optimize tables written by Spark
Fixes #1648
2024-02-02 21:58:23 -08:00
31 changed files with 579 additions and 86 deletions

View File

@ -22,15 +22,9 @@ pub struct ConfiguredCredentialChain {
#[derive(Debug)]
pub struct NoOpCredentials {}
pub fn new_region_provider(
configuration: &ProviderConfig,
disable_imds: bool,
imds_timeout: u64,
) -> RegionProviderChain {
pub fn new_region_provider(disable_imds: bool, imds_timeout: u64) -> RegionProviderChain {
let env_provider = EnvironmentVariableRegionProvider::new();
let profile_file = aws_config::profile::region::Builder::default()
.configure(configuration)
.build();
let profile_file = aws_config::profile::region::ProfileFileRegionProvider::default();
if disable_imds {
return RegionProviderChain::first_try(env_provider).or_else(profile_file);
}
@ -39,7 +33,6 @@ pub fn new_region_provider(
.or_else(profile_file)
.or_else(
aws_config::imds::region::Builder::default()
.configure(configuration)
.imds_client(
aws_config::imds::Client::builder()
.connect_timeout(Duration::from_millis(imds_timeout))
@ -61,6 +54,7 @@ impl ConfiguredCredentialChain {
let web_identity_token_provider = WebIdentityTokenCredentialsProvider::builder()
.configure(conf)
.build();
let ecs_provider = EcsCredentialsProvider::builder().configure(conf).build();
let provider_chain = CredentialsProviderChain::first_try("Environment", env_provider)

View File

@ -175,7 +175,10 @@ impl S3StorageOptions {
.unwrap_or(true);
let imds_timeout =
Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100);
let provider_config = ProviderConfig::default();
let region_provider = crate::credentials::new_region_provider(disable_imds, imds_timeout);
let region = execute_sdk_future(region_provider.region())?;
let provider_config = ProviderConfig::default().with_region(region);
let credentials_provider = crate::credentials::ConfiguredCredentialChain::new(
disable_imds,
imds_timeout,
@ -189,23 +192,15 @@ impl S3StorageOptions {
.map(|val| str_is_truthy(&val))
.unwrap_or(false),
))
.region(crate::credentials::new_region_provider(
&provider_config,
disable_imds,
imds_timeout,
))
.credentials_provider(credentials_provider)
.region(region_provider)
.load(),
)?;
#[cfg(feature = "rustls")]
let sdk_config = execute_sdk_future(
aws_config::from_env()
.credentials_provider(credentials_provider)
.region(crate::credentials::new_region_provider(
&provider_config,
disable_imds,
imds_timeout,
))
.region(region_provider)
.load(),
)?;
@ -252,16 +247,18 @@ impl S3StorageOptions {
}
}
fn execute_sdk_future<F: Future<Output = SdkConfig> + Send + 'static>(
future: F,
) -> DeltaResult<SdkConfig> {
fn execute_sdk_future<F, T>(future: F) -> DeltaResult<T>
where
T: Send,
F: Future<Output = T> + Send,
{
match tokio::runtime::Handle::try_current() {
Ok(handle) => match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => {
Ok(tokio::task::block_in_place(move || handle.block_on(future)))
}
_ => {
let mut cfg: Option<SdkConfig> = None;
let mut cfg: Option<T> = None;
std::thread::scope(|scope| {
scope.spawn(|| {
cfg = Some(handle.block_on(future));

View File

@ -97,13 +97,14 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.44", optional = true }
sqlparser = { version = "0.46", optional = true }
[dev-dependencies]
criterion = "0.5"
ctor = "0"
deltalake-test = { path = "../test", features = ["datafusion"] }
dotenvy = "0"
fs_extra = "1.2.0"
hyper = { version = "0.14", features = ["server"] }
maplit = "1"
pretty_assertions = "1.2.1"

View File

@ -518,6 +518,9 @@ pub(super) async fn list_log_files(
#[cfg(test)]
pub(super) mod tests {
use deltalake_test::utils::*;
use tokio::task::JoinHandle;
use crate::checkpoints::create_checkpoint_from_table_uri_and_cleanup;
use super::*;
@ -620,6 +623,115 @@ pub(super) mod tests {
Ok(())
}
pub(crate) async fn concurrent_checkpoint(context: &IntegrationContext) -> TestResult {
context
.load_table(TestTables::LatestNotCheckpointed)
.await?;
let table_to_checkpoint = context
.table_builder(TestTables::LatestNotCheckpointed)
.load()
.await?;
let store = context
.table_builder(TestTables::LatestNotCheckpointed)
.build_storage()?
.object_store();
let slow_list_store = Arc::new(slow_store::SlowListStore { store });
let version = table_to_checkpoint.version();
let load_task: JoinHandle<Result<LogSegment, DeltaTableError>> = tokio::spawn(async move {
let segment =
LogSegment::try_new(&Path::default(), Some(version), slow_list_store.as_ref())
.await?;
Ok(segment)
});
create_checkpoint_from_table_uri_and_cleanup(
&table_to_checkpoint.table_uri(),
version,
Some(false),
)
.await?;
let segment = load_task.await??;
assert_eq!(segment.version, version);
Ok(())
}
mod slow_store {
use std::sync::Arc;
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result,
};
use tokio::io::AsyncWrite;
#[derive(Debug)]
pub(super) struct SlowListStore {
pub store: Arc<dyn ObjectStore>,
}
impl std::fmt::Display for SlowListStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SlowListStore {{ store: {} }}", self.store)
}
}
#[async_trait::async_trait]
impl object_store::ObjectStore for SlowListStore {
async fn put_opts(
&self,
location: &Path,
bytes: Bytes,
opts: PutOptions,
) -> Result<PutResult> {
self.store.put_opts(location, bytes, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.store.put_multipart(location).await
}
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
self.store.abort_multipart(location, multipart_id).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.store.get_opts(location, options).await
}
async fn delete(&self, location: &Path) -> Result<()> {
self.store.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
std::thread::sleep(std::time::Duration::from_secs(1));
self.store.list(prefix)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.store.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.store.copy(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.store.copy_if_not_exists(from, to).await
}
}
}
#[test]
pub fn is_commit_file_only_matches_commits() {
for path in [0, 1, 5, 10, 100, i64::MAX]

View File

@ -287,11 +287,13 @@ impl Snapshot {
}
/// Get the statistics schema of the snapshot
pub fn stats_schema(&self) -> DeltaResult<StructType> {
pub fn stats_schema(&self, table_schema: Option<&StructType>) -> DeltaResult<StructType> {
let schema = table_schema.unwrap_or_else(|| self.schema());
let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
stats_cols
.iter()
.map(|col| match self.schema().field_with_name(col) {
.map(|col| match schema.field_with_name(col) {
Ok(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
@ -314,7 +316,7 @@ impl Snapshot {
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = self.table_config().num_indexed_cols();
self.schema()
schema
.fields
.iter()
.enumerate()
@ -362,7 +364,7 @@ impl EagerSnapshot {
let mut files = Vec::new();
let mut scanner = LogReplayScanner::new();
files.push(scanner.process_files_batch(&batch, true)?);
let mapper = LogMapper::try_new(&snapshot)?;
let mapper = LogMapper::try_new(&snapshot, None)?;
files = files
.into_iter()
.map(|b| mapper.map_batch(b))
@ -401,7 +403,7 @@ impl EagerSnapshot {
)
.boxed()
};
let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = LogMapper::try_new(&self.snapshot, None)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
@ -517,7 +519,13 @@ impl EagerSnapshot {
files.push(scanner.process_files_batch(&batch?, true)?);
}
let mapper = LogMapper::try_new(&self.snapshot)?;
let mapper = if let Some(metadata) = &metadata {
let new_schema: StructType = serde_json::from_str(&metadata.schema_string)?;
LogMapper::try_new(&self.snapshot, Some(&new_schema))?
} else {
LogMapper::try_new(&self.snapshot, None)?
};
self.files = files
.into_iter()
.chain(
@ -605,7 +613,7 @@ mod tests {
use futures::TryStreamExt;
use itertools::Itertools;
use super::log_segment::tests::test_log_segment;
use super::log_segment::tests::{concurrent_checkpoint, test_log_segment};
use super::replay::tests::test_log_replay;
use super::*;
use crate::kernel::Remove;
@ -627,6 +635,13 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_checkpoint() -> TestResult {
let context = IntegrationContext::new(Box::<LocalStorageIntegration>::default())?;
concurrent_checkpoint(&context).await?;
Ok(())
}
async fn test_snapshot(context: &IntegrationContext) -> TestResult {
let store = context
.table_builder(TestTables::Simple)

View File

@ -21,6 +21,7 @@ use tracing::debug;
use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::kernel::StructType;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
use super::Snapshot;
@ -41,7 +42,7 @@ pin_project! {
impl<S> ReplayStream<S> {
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let stats_schema = Arc::new((&snapshot.stats_schema(None)?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
config: snapshot.config.clone(),
@ -61,9 +62,12 @@ pub(super) struct LogMapper {
}
impl LogMapper {
pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult<Self> {
pub(super) fn try_new(
snapshot: &Snapshot,
table_schema: Option<&StructType>,
) -> DeltaResult<Self> {
Ok(Self {
stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?),
stats_schema: Arc::new((&snapshot.stats_schema(table_schema)?).try_into()?),
config: snapshot.config.clone(),
})
}

View File

@ -1561,5 +1561,30 @@ pub(super) mod zorder {
assert_eq!(data.value_data().len(), 3 * 16 * 3);
assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16));
}
#[tokio::test]
async fn works_on_spark_table() {
use crate::DeltaOps;
use tempfile::TempDir;
// Create a temporary directory
let tmp_dir = TempDir::new().expect("Failed to make temp dir");
let table_name = "delta-1.2.1-only-struct-stats";
// Copy recursively from the test data directory to the temporary directory
let source_path = format!("../test/tests/data/{table_name}");
fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap();
// Run optimize
let (_, metrics) =
DeltaOps::try_from_uri(tmp_dir.path().join(table_name).to_str().unwrap())
.await
.unwrap()
.optimize()
.await
.unwrap();
// Verify it worked
assert_eq!(metrics.num_files_added, 1);
}
}
}

View File

@ -1190,6 +1190,32 @@ mod tests {
assert_eq!(expected, actions);
}
#[tokio::test]
async fn test_table_not_always_with_stats() {
let path = "../test/tests/data/delta-stats-optional";
let mut table = crate::open_table(path).await.unwrap();
table.load().await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
// get column-0 path, and column-4 num_records, and column_5 null_count.integer
let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
]));
let expected_num_records: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(1)]));
let expected_null_count: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(0)]));
let path_column = actions.column(0);
let num_records_column = actions.column(4);
let null_count_column = actions.column(5);
assert_eq!(&expected_path, path_column);
assert_eq!(&expected_num_records, num_records_column);
assert_eq!(&expected_null_count, null_count_column);
}
#[tokio::test]
async fn test_only_struct_stats() {
// test table with no json stats

View File

@ -447,11 +447,12 @@ impl DeltaTableState {
.map(|(path, datatype)| -> Result<ColStats, DeltaTableError> {
let null_count = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_count_stat(&stat.null_count, &path))
})
.map(|null_count| null_count.flatten())
.collect::<Vec<Option<i64>>>();
let null_count = Some(value_vec_to_array(null_count, |values| {
Ok(Arc::new(arrow::array::Int64Array::from(values)))
@ -463,11 +464,12 @@ impl DeltaTableState {
let min_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
let min_values = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.min_values, &path))
})
.map(|min_value| min_value.flatten())
.collect::<Vec<Option<&serde_json::Value>>>();
Some(value_vec_to_array(min_values, |values| {
@ -480,11 +482,12 @@ impl DeltaTableState {
let max_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
let max_values = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.max_values, &path))
})
.map(|max_value| max_value.flatten())
.collect::<Vec<Option<&serde_json::Value>>>();
Some(value_vec_to_array(max_values, |values| {
json_value_to_array_general(&arrow_type, values.into_iter())

View File

@ -192,6 +192,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
values: RecordBatch,
mode: WriteMode,
) -> Result<(), DeltaTableError> {
if mode == WriteMode::MergeSchema && !self.partition_columns.is_empty() {
return Err(DeltaTableError::Generic(
"Merging Schemas with partition columns present is currently unsupported"
.to_owned(),
));
}
// Set the should_evolve flag for later in case the writer should perform schema evolution
// on its flush_and_commit
self.should_evolve = mode == WriteMode::MergeSchema;
@ -237,8 +243,12 @@ impl DeltaWriter<RecordBatch> for RecordBatchWriter {
if self.arrow_schema_ref != self.original_schema_ref && self.should_evolve {
let schema: StructType = self.arrow_schema_ref.clone().try_into()?;
// TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe
// this should just propagate the existing columns in the new action
if !self.partition_columns.is_empty() {
return Err(DeltaTableError::Generic(
"Merging Schemas with partition columns present is currently unsupported"
.to_owned(),
));
}
let part_cols: Vec<String> = vec![];
let metadata = Metadata::try_new(schema, part_cols, HashMap::new())?;
adds.push(Action::Metadata(metadata));
@ -662,6 +672,8 @@ mod tests {
// The following sets of tests are related to #1386 and mergeSchema support
// <https://github.com/delta-io/delta-rs/issues/1386>
mod schema_evolution {
use itertools::Itertools;
use super::*;
#[tokio::test]
@ -772,6 +784,80 @@ mod tests {
);
}
#[tokio::test]
async fn test_write_schema_evolution_with_partition_columns_should_fail_as_unsupported() {
let table_schema = get_delta_schema();
let table_dir = tempfile::tempdir().unwrap();
let table_path = table_dir.path();
let mut table = CreateBuilder::new()
.with_location(table_path.to_str().unwrap())
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(table_schema.fields().clone())
.with_partition_columns(["id"])
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);
let batch = get_record_batch(None, false);
let mut writer = RecordBatchWriter::for_table(&table).unwrap();
writer.write(batch).await.unwrap();
let version = writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(version, 1);
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 1);
// Create a second batch with appended columns
let second_batch = {
let second = get_record_batch(None, false);
let second_schema = ArrowSchema::new(
second
.schema()
.fields
.iter()
.cloned()
.chain([
Field::new("vid", DataType::Int32, true).into(),
Field::new("name", DataType::Utf8, true).into(),
])
.collect_vec(),
);
let len = second.num_rows();
let second_arrays = second
.columns()
.iter()
.cloned()
.chain([
Arc::new(Int32Array::from(vec![Some(1); len])) as _, // vid
Arc::new(StringArray::from(vec![Some("will"); len])) as _, // name
])
.collect_vec();
RecordBatch::try_new(second_schema.into(), second_arrays).unwrap()
};
let result = writer
.write_with_mode(second_batch, WriteMode::MergeSchema)
.await;
assert!(result.is_err());
match result.unwrap_err() {
DeltaTableError::Generic(s) => {
assert_eq!(
s,
"Merging Schemas with partition columns present is currently unsupported"
)
}
e => panic!("unexpected error: {e:?}"),
}
}
#[tokio::test]
async fn test_schema_evolution_column_type_mismatch() {
let batch = get_record_batch(None, false);

View File

@ -276,22 +276,19 @@ async fn regular_rename(from: &str, to: &str) -> Result<(), LocalFileSystemError
"Already exists",
)),
})
} else if std::path::Path::new(&from_path).exists() {
std::fs::rename(&from_path, &to_path).map_err(|err| LocalFileSystemError::Generic {
store: STORE_NAME,
source: Box::new(err),
})
} else {
std::fs::rename(&from_path, &to_path).map_err(|err| {
println!("err: {err:?}");
if err.kind() == std::io::ErrorKind::NotFound {
LocalFileSystemError::NotFound {
path: from_path.clone(),
source: Box::new(err),
}
} else {
LocalFileSystemError::Generic {
store: STORE_NAME,
source: Box::new(err),
}
}
})?;
Ok(())
Err(LocalFileSystemError::NotFound {
path: from_path.clone(),
source: Box::new(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Could not find {from_path}"),
)),
})
}
})
.await

View File

@ -154,6 +154,7 @@ pub enum TestTables {
Delta0_8_0Partitioned,
Delta0_8_0SpecialPartitioned,
Checkpoints,
LatestNotCheckpointed,
WithDvSmall,
Custom(String),
}
@ -189,6 +190,11 @@ impl TestTables {
.unwrap()
.to_owned(),
Self::Checkpoints => data_path.join("checkpoints").to_str().unwrap().to_owned(),
Self::LatestNotCheckpointed => data_path
.join("latest_not_checkpointed")
.to_str()
.unwrap()
.to_owned(),
Self::WithDvSmall => data_path
.join("table-with-dv-small")
.to_str()
@ -208,6 +214,7 @@ impl TestTables {
Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(),
Self::Delta0_8_0SpecialPartitioned => "delta-0.8.0-special-partition".into(),
Self::Checkpoints => "checkpoints".into(),
Self::LatestNotCheckpointed => "latest_not_checkpointed".into(),
Self::WithDvSmall => "table-with-dv-small".into(),
Self::Custom(name) => name.to_owned(),
}

View File

@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1666652369577,"userId":"6114986638742036","userName":"dummy_username","operation":"CREATE OR REPLACE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"a8510a45-92dc-4e9f-9f7a-42bbcc9b752d"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}

View File

@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1666652373383,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"35e88c76-9cfb-4e0e-bce8-2317f3c49c75"}}
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"null\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal\",\"type\":\"decimal(8,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array\",\"type\":{\"type\":\"array\",\"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_array_of_map\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
{"add":{"path":"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652373000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"nullCount\":{\"integer\":0,\"null\":1,\"boolean\":0,\"double\":0,\"decimal\":0,\"string\":0,\"binary\":0,\"date\":0,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"map\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0}}}","tags":{"INSERTION_TIME":"1666652373000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}

View File

@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1666652374424,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"efe25f5f-e03a-458d-8fbe-34ed2111b3c1"}}
{"add":{"path":"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652374000,"dataChange":true,"stats_parsed":null,"tags":{"INSERTION_TIME":"1666652374000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}

View File

@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"84b09beb-329c-4b5e-b493-f58c6c78b8fd","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"2"},"createdTime":1674611455081}}
{"commitInfo":{"timestamp":1674611455099,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpointInterval\":\"2\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"d87e63fb-7388-4b1c-9afc-750a561012b7"}}

View File

@ -0,0 +1,2 @@
{"add":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","partitionValues":{},"size":965,"modificationTime":1674611456921,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"b\",\"int\":288,\"date\":\"1978-02-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":988,\"date\":\"2020-05-01\"},\"nullCount\":{\"letter\":3,\"int\":0,\"date\":0}}"}}
{"commitInfo":{"timestamp":1674611457269,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"965"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"71d9bcd1-7f2b-46f8-bd1f-e0a8e872f3c3"}}

View File

@ -0,0 +1,3 @@
{"add":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","partitionValues":{},"size":976,"modificationTime":1674611458901,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":120,\"date\":\"1971-07-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":667,\"date\":\"2018-02-01\"},\"nullCount\":{\"letter\":2,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-ad1a4bb7-07e8-4f40-b50b-49910d209e0c-c000.snappy.parquet","deletionTimestamp":1674611459307,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":965}}
{"commitInfo":{"timestamp":1674611459307,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"976"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"b08f5758-a8e9-4dd1-af7e-7b6e53928d7a"}}

View File

@ -0,0 +1,3 @@
{"add":{"path":"part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet","partitionValues":{},"size":1010,"modificationTime":1674611461541,"dataChange":true,"stats":"{\"numRecords\":5,\"minValues\":{\"letter\":\"a\",\"int\":93,\"date\":\"1975-06-01\"},\"maxValues\":{\"letter\":\"c\",\"int\":753,\"date\":\"2013-03-01\"},\"nullCount\":{\"letter\":1,\"int\":0,\"date\":0}}"}}
{"remove":{"path":"part-00000-a190be9e-e3df-439e-b366-06a863f51e99-c000.snappy.parquet","deletionTimestamp":1674611461982,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":976}}
{"commitInfo":{"timestamp":1674611461982,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"1010"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.1.1","txnId":"0403bbaf-a6f2-4543-9e6c-bd068e76670f"}}

View File

@ -0,0 +1 @@
{"version":2,"size":1}

View File

@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.17.3"
version = "0.17.4"
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"

View File

@ -721,10 +721,17 @@ class DeltaFileSystemHandler:
def __init__(
self,
root: str,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
table_uri: str,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> None: ...
@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Dict[str, str] | None = None,
known_sizes: Dict[str, int] | None = None,
) -> "DeltaFileSystemHandler": ...
def get_type_name(self) -> str: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
@ -776,7 +783,7 @@ class DeltaFileSystemHandler:
def open_input_file(self, path: str) -> ObjectInputFile:
"""Open an input file for random access reading."""
def open_output_stream(
self, path: str, metadata: dict[str, str] | None = None
self, path: str, metadata: Dict[str, str] | None = None
) -> ObjectOutputStream:
"""Open an output stream for sequential writing."""

View File

@ -1,17 +1,102 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Mapping, Optional
import pyarrow as pa
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler
from ._internal import DeltaFileSystemHandler
from ._internal import DeltaFileSystemHandler, RawDeltaTable
# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
class DeltaStorageHandler(FileSystemHandler):
"""
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""
def __init__(
self,
table_uri: str,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
self._handler = DeltaFileSystemHandler(
table_uri=table_uri, options=options, known_sizes=known_sizes
)
@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
) -> "DeltaStorageHandler":
self = cls.__new__(cls)
self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes)
return self
def get_type_name(self) -> str:
return self._handler.get_type_name()
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
If the destination exists and is a directory, an error is returned. Otherwise, it is replaced.
"""
return self._handler.copy_file(src=src, dst=dst)
def create_dir(self, path: str, recursive: bool = True) -> None:
"""Create a directory and subdirectories.
This function succeeds if the directory already exists.
"""
return self._handler.create_dir(path, recursive)
def delete_dir(self, path: str) -> None:
"""Delete a directory and its contents, recursively."""
return self._handler.delete_dir(path)
def delete_file(self, path: str) -> None:
"""Delete a file."""
return self._handler.delete_file(path)
def equals(self, other: Any) -> bool:
return self._handler.equals(other)
def delete_dir_contents(
self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False
) -> None:
"""Delete a directory's contents, recursively.
Like delete_dir, but doesn't delete the directory itself.
"""
return self._handler.delete_dir_contents(
path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok
)
def delete_root_dir_contents(self) -> None:
"""Delete the root directory contents, recursively."""
return self._handler.delete_root_dir_contents()
def get_file_info(self, paths: List[str]) -> List[FileInfo]:
"""Get info for the given files.
A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound.
An exception indicates a truly exceptional condition (low-level I/O error, etc.).
"""
return self._handler.get_file_info(paths)
def move(self, src: str, dest: str) -> None:
"""Move / rename a file or directory.
If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
if it has the same type as the source, it is replaced - otherwise, behavior is
unspecified (implementation-dependent).
"""
self._handler.move_file(src=src, dest=dest)
def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
return self._handler.normalize_path(path)
def open_input_file(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
@ -22,7 +107,7 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))
def open_input_stream(self, path: str) -> pa.PythonFile:
"""
@ -34,7 +119,7 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))
def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
@ -51,11 +136,9 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
Returns:
NativeFile
"""
return pa.PythonFile(
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
)
return pa.PythonFile(self._handler.open_output_stream(path, metadata))
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]:
"""
Get info for the files defined by FileSelector.
@ -65,6 +148,9 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
Returns:
list of file info objects
"""
return DeltaFileSystemHandler.get_file_info_selector(
self, selector.base_dir, selector.allow_not_found, selector.recursive
return self._handler.get_file_info_selector(
selector.base_dir, selector.allow_not_found, selector.recursive
)
def open_append_stream(self, path: str, metadata: Mapping[str, str]) -> None:
raise NotImplementedError

View File

@ -1030,6 +1030,8 @@ class DeltaTable:
partitions: Optional[List[Tuple[str, str, Any]]] = None,
filesystem: Optional[Union[str, pa_fs.FileSystem]] = None,
parquet_read_options: Optional[ParquetReadOptions] = None,
schema: Optional[pyarrow.Schema] = None,
as_large_types: bool = False,
) -> pyarrow.dataset.Dataset:
"""
Build a PyArrow Dataset using data from the DeltaTable.
@ -1038,6 +1040,12 @@ class DeltaTable:
partitions: A list of partition filters, see help(DeltaTable.files_by_partitions) for filter syntax
filesystem: A concrete implementation of the Pyarrow FileSystem or a fsspec-compatible interface. If None, the first file path will be used to determine the right FileSystem
parquet_read_options: Optional read options for Parquet. Use this to handle INT96 to timestamp conversion for edge cases like 0001-01-01 or 9999-12-31
schema: The schema to use for the dataset. If None, the schema of the DeltaTable will be used. This can be used to force reading of Parquet/Arrow datatypes
that DeltaLake can't represent in it's schema (e.g. LargeString).
If you only need to read the schema with large types (e.g. for compatibility with Polars) you may want to use the `as_large_types` parameter instead.
as_large_types: get schema with all variable size types (list, binary, string) as large variants (with int64 indices).
This is for compatibility with systems like Polars that only support the large versions of Arrow types.
If `schema` is passed it takes precedence over this option.
More info: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.ParquetReadOptions.html
@ -1087,8 +1095,10 @@ class DeltaTable:
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
}
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
self._table.table_uri(), self._storage_options, file_sizes
DeltaStorageHandler.from_table(
self._table,
self._storage_options,
file_sizes,
)
)
format = ParquetFileFormat(
@ -1096,6 +1106,8 @@ class DeltaTable:
default_fragment_scan_options=ParquetFragmentScanOptions(pre_buffer=True),
)
schema = schema or self.schema().to_pyarrow(as_large_types=as_large_types)
fragments = [
format.make_fragment(
file,
@ -1103,12 +1115,10 @@ class DeltaTable:
partition_expression=part_expression,
)
for file, part_expression in self._table.dataset_partitions(
self.schema().to_pyarrow(), partitions
schema, partitions
)
]
schema = self.schema().to_pyarrow()
dictionary_columns = format.read_options.dictionary_columns or set()
if dictionary_columns:
for index, field in enumerate(schema):

View File

@ -339,13 +339,18 @@ def write_deltalake(
"schema_mode 'merge' is not supported in pyarrow engine. Use engine=rust"
)
# We need to write against the latest table version
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))
def sort_arrow_schema(schema: pa.schema) -> pa.schema:
sorted_cols = sorted(iter(schema), key=lambda x: (x.name, str(x.type)))
return pa.schema(sorted_cols)
if table: # already exists
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
table=table._table, options=storage_options
)
)
if sort_arrow_schema(schema) != sort_arrow_schema(
table.schema().to_pyarrow(as_large_types=large_dtypes)
) and not (mode == "overwrite" and schema_mode == "overwrite"):
@ -370,6 +375,9 @@ def write_deltalake(
partition_by = table.metadata().partition_columns
else: # creating a new table
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(table_uri, options=storage_options)
)
current_version = -1
dtype_map = {

View File

@ -1,14 +1,14 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use crate::RawDeltaTable;
use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use pyo3::types::{IntoPyDict, PyBytes, PyType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
@ -43,19 +43,39 @@ impl DeltaFileSystemHandler {
#[new]
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
fn new(
table_uri: &str,
table_uri: String,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
let storage = DeltaTableBuilder::from_uri(&table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.object_store();
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table_uri.into(),
root_url: table_uri,
options: options.unwrap_or_default(),
},
known_sizes,
})
}
#[classmethod]
#[pyo3(signature = (table, options = None, known_sizes = None))]
fn from_table(
_cls: &PyType,
table: &RawDeltaTable,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table._table.object_store();
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table._table.table_uri(),
options: options.unwrap_or_default(),
},
known_sizes,

View File

@ -50,7 +50,7 @@ def test_s3_authenticated_read_write(s3_localstack_creds, monkeypatch):
# Create unauthenticated handler
storage_handler = DeltaStorageHandler(
"s3://deltars/",
{
options={
"AWS_ENDPOINT_URL": s3_localstack_creds["AWS_ENDPOINT_URL"],
# Grants anonymous access. If we don't do this, will timeout trying
# to reading from EC2 instance provider.

View File

@ -13,7 +13,7 @@ import pyarrow as pa
import pyarrow.compute as pc
import pytest
from packaging import version
from pyarrow.dataset import ParquetFileFormat, ParquetReadOptions
from pyarrow.dataset import ParquetFileFormat, ParquetReadOptions, dataset
from pyarrow.lib import RecordBatchReader
from deltalake import DeltaTable, Schema, write_deltalake
@ -1306,6 +1306,62 @@ def test_large_arrow_types(tmp_path: pathlib.Path):
assert table.schema == dt.schema().to_pyarrow(as_large_types=True)
@pytest.mark.skipif(
int(pa.__version__.split(".")[0]) < 10, reason="map casts require pyarrow >= 10"
)
def test_large_arrow_types_dataset_as_large_types(tmp_path: pathlib.Path):
pylist = [
{"name": "Joey", "gender": b"M", "arr_type": ["x", "y"], "dict": {"a": b"M"}},
{"name": "Ivan", "gender": b"F", "arr_type": ["x", "z"]},
]
schema = pa.schema(
[
pa.field("name", pa.large_string()),
pa.field("gender", pa.large_binary()),
pa.field("arr_type", pa.large_list(pa.large_string())),
pa.field("map_type", pa.map_(pa.large_string(), pa.large_binary())),
pa.field("struct", pa.struct([pa.field("sub", pa.large_string())])),
]
)
table = pa.Table.from_pylist(pylist, schema=schema)
write_deltalake(tmp_path, table)
dt = DeltaTable(tmp_path)
ds = dt.to_pyarrow_dataset(as_large_types=True)
union_ds = dataset([ds, dataset(table)])
assert union_ds.to_table().shape[0] == 4
@pytest.mark.skipif(
int(pa.__version__.split(".")[0]) < 10, reason="map casts require pyarrow >= 10"
)
def test_large_arrow_types_explicit_scan_schema(tmp_path: pathlib.Path):
pylist = [
{"name": "Joey", "gender": b"M", "arr_type": ["x", "y"], "dict": {"a": b"M"}},
{"name": "Ivan", "gender": b"F", "arr_type": ["x", "z"]},
]
schema = pa.schema(
[
pa.field("name", pa.large_string()),
pa.field("gender", pa.large_binary()),
pa.field("arr_type", pa.large_list(pa.large_string())),
pa.field("map_type", pa.map_(pa.large_string(), pa.large_binary())),
pa.field("struct", pa.struct([pa.field("sub", pa.large_string())])),
]
)
table = pa.Table.from_pylist(pylist, schema=schema)
write_deltalake(tmp_path, table)
dt = DeltaTable(tmp_path)
ds = dt.to_pyarrow_dataset(schema=schema)
union_ds = dataset([ds, dataset(table)])
assert union_ds.to_table().shape[0] == 4
def test_partition_large_arrow_types(tmp_path: pathlib.Path):
table = pa.table(
{
@ -1570,3 +1626,22 @@ def test_write_timestamp_ntz_on_table_with_features_not_enabled(tmp_path: pathli
write_deltalake(
tmp_path, data, mode="overwrite", engine="pyarrow", schema_mode="overwrite"
)
@pytest.mark.parametrize("engine", ["pyarrow", "rust"])
def test_parse_stats_with_new_schema(tmp_path, engine):
sample_data = pa.table(
{
"val": pa.array([1, 1], pa.int8()),
}
)
write_deltalake(tmp_path, sample_data)
sample_data = pa.table(
{
"val": pa.array([1000000000000, 1000000000000], pa.int64()),
}
)
write_deltalake(
tmp_path, sample_data, mode="overwrite", schema_mode="overwrite", engine=engine
)