chore: update datafusion and related crates (#1504)

# Description

Updating datafusion and related crates to latest version. With the
updated object store, we unfortunately loose support for `aws-profile`.
Since object sore now also contains logic for parsing urls, that we
currently maintain here, I was planning on adopting these new APIs and
recovering profile support in a follow up PR. This will then also remove
the ignored deprecations from this PR.
This commit is contained in:
Robert Pack 2023-07-05 21:10:52 +02:00 committed by GitHub
parent 6650bd2bc6
commit 56dfd25a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 123 additions and 100 deletions

View File

@ -18,8 +18,9 @@ services:
test: [ "CMD", "curl", "-f", "http://localhost:4566/health" ]
fake-gcs:
image: fsouza/fake-gcs-server
command: ["-scheme", "http", "-port", "4443", "-external-url", "http://[::]:4443", "-backend", "memory"]
# Custom image - see fsouza/fake-gcs-server#1164
image: tustvold/fake-gcs-server
command: ["-scheme", "http", "-public-host", "localhost:4443", "-backend", "memory"]
ports:
- 4443:4443

View File

@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"
[dependencies]
arrow-schema = { version = "40", features = ["serde"] }
arrow-schema = { version = "42", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
@ -35,7 +35,7 @@ num_cpus = "1"
reqwest = { version = "*", features = ["native-tls-vendored"] }
[dependencies.pyo3]
version = "0.18"
version = "0.19"
features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]

View File

@ -775,7 +775,7 @@ fn write_new_deltalake(
Ok(())
}
#[pyclass(name = "DeltaDataChecker", text_signature = "(invariants)")]
#[pyclass(name = "DeltaDataChecker")]
struct PyDeltaDataChecker {
inner: DeltaDataChecker,
rt: tokio::runtime::Runtime,
@ -784,6 +784,7 @@ struct PyDeltaDataChecker {
#[pymethods]
impl PyDeltaDataChecker {
#[new]
#[pyo3(signature = (invariants))]
fn new(invariants: Vec<(String, String)>) -> Self {
let invariants: Vec<Invariant> = invariants
.into_iter()

View File

@ -113,7 +113,7 @@ fn python_type_to_schema(ob: PyObject, py: Python) -> PyResult<SchemaDataType> {
/// * "decimal(<precision>, <scale>)"
///
/// :param data_type: string representation of the data type
#[pyclass(module = "deltalake.schema", text_signature = "(data_type)")]
#[pyclass(module = "deltalake.schema")]
#[derive(Clone)]
pub struct PrimitiveType {
inner_type: String,
@ -132,6 +132,7 @@ impl TryFrom<SchemaDataType> for PrimitiveType {
#[pymethods]
impl PrimitiveType {
#[new]
#[pyo3(signature = (data_type))]
fn new(data_type: String) -> PyResult<Self> {
if data_type.starts_with("decimal") {
if try_parse_decimal_type(&data_type).is_none() {
@ -246,10 +247,7 @@ impl PrimitiveType {
/// ArrayType(PrimitiveType("integer"), contains_null=True)
/// >>> ArrayType("integer", contains_null=False)
/// ArrayType(PrimitiveType("integer"), contains_null=False)
#[pyclass(
module = "deltalake.schema",
text_signature = "(element_type, contains_null=True)"
)]
#[pyclass(module = "deltalake.schema")]
#[derive(Clone)]
pub struct ArrayType {
inner_type: SchemaTypeArray,
@ -411,10 +409,7 @@ impl ArrayType {
/// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=True)
/// >>> MapType("integer", "string", value_contains_null=False)
/// MapType(PrimitiveType("integer"), PrimitiveType("string"), value_contains_null=False)
#[pyclass(
module = "deltalake.schema",
text_signature = "(key_type, value_type, value_contains_null=True)"
)]
#[pyclass(module = "deltalake.schema")]
#[derive(Clone)]
pub struct MapType {
inner_type: SchemaTypeMap,
@ -597,10 +592,7 @@ impl MapType {
///
/// >>> Field("my_col", "integer", metadata={"custom_metadata": {"test": 2}})
/// Field("my_col", PrimitiveType("integer"), nullable=True, metadata={"custom_metadata": {"test": 2}})
#[pyclass(
module = "deltalake.schema",
text_signature = "(name, type, nullable=True, metadata=None)"
)]
#[pyclass(module = "deltalake.schema")]
#[derive(Clone)]
pub struct Field {
inner: SchemaField,
@ -778,7 +770,7 @@ impl Field {
///
/// >>> StructType([Field("x", "integer"), Field("y", "string")])
/// StructType([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)])
#[pyclass(subclass, module = "deltalake.schema", text_signature = "(fields)")]
#[pyclass(subclass, module = "deltalake.schema")]
#[derive(Clone)]
pub struct StructType {
inner_type: SchemaTypeStruct,
@ -951,13 +943,13 @@ pub fn schema_to_pyobject(schema: &Schema, py: Python) -> PyResult<PyObject> {
/// >>> import pyarrow as pa
/// >>> Schema.from_pyarrow(pa.schema({"x": pa.int32(), "y": pa.string()}))
/// Schema([Field(x, PrimitiveType("integer"), nullable=True), Field(y, PrimitiveType("string"), nullable=True)])
#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema",
text_signature = "(fields)")]
#[pyclass(extends = StructType, name = "Schema", module = "deltalake.schema")]
pub struct PySchema;
#[pymethods]
impl PySchema {
#[new]
#[pyo3(signature = (fields))]
fn new(fields: Vec<PyRef<Field>>) -> PyResult<(Self, StructType)> {
let fields: Vec<SchemaField> = fields
.into_iter()

View File

@ -13,14 +13,14 @@ readme = "README.md"
edition = "2021"
[dependencies]
arrow = { version = "40", optional = true }
arrow-array = { version = "40", optional = true }
arrow-buffer = { version = "40", optional = true }
arrow-cast = { version = "40", optional = true }
arrow-ord = { version = "40", optional = true }
arrow-row = { version = "40", optional = true }
arrow-schema = { version = "40", optional = true }
arrow-select = { version = "40", optional = true }
arrow = { version = "42", optional = true }
arrow-array = { version = "42", optional = true }
arrow-buffer = { version = "42", optional = true }
arrow-cast = { version = "42", optional = true }
arrow-ord = { version = "42", optional = true }
arrow-row = { version = "42", optional = true }
arrow-schema = { version = "42", optional = true }
arrow-select = { version = "42", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
@ -38,10 +38,10 @@ libc = ">=0.2.90, <1"
num-bigint = "0.4"
num_cpus = "1"
num-traits = "0.2.15"
object_store = "0.5.6"
object_store = "0.6.1"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "40", features = [
parquet = { version = "42", features = [
"async",
"object_store",
], optional = true }
@ -50,7 +50,7 @@ percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt", "parking_lot"] }
tokio = { version = "1.28", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
regex = "1"
uuid = { version = "1", features = ["serde", "v4"] }
url = "2.3"
@ -65,7 +65,7 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
rusoto_glue = { version = "0.47", default-features = false, optional = true }
# Unity
reqwest = { version = "0.11", default-features = false, features = [
reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
@ -74,15 +74,15 @@ reqwest-retry = { version = "0.2.2", optional = true }
# Datafusion
dashmap = { version = "5", optional = true }
datafusion = { version = "26", optional = true }
datafusion-expr = { version = "26", optional = true }
datafusion-common = { version = "26", optional = true }
datafusion-proto = { version = "26", optional = true }
datafusion-sql = { version = "26", optional = true }
datafusion-physical-expr = { version = "26", optional = true }
datafusion = { version = "27", optional = true }
datafusion-expr = { version = "27", optional = true }
datafusion-common = { version = "27", optional = true }
datafusion-proto = { version = "27", optional = true }
datafusion-sql = { version = "27", optional = true }
datafusion-physical-expr = { version = "27", optional = true }
sqlparser = { version = "0.34", optional = true }
sqlparser = { version = "0.35", optional = true }
# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
@ -135,7 +135,6 @@ s3-native-tls = [
"rusoto_dynamodb/native-tls",
"dynamodb_lock/native-tls",
"object_store/aws",
"object_store/aws_profile",
]
s3 = [
"rusoto_core/rustls",
@ -144,7 +143,6 @@ s3 = [
"rusoto_dynamodb/rustls",
"dynamodb_lock/rustls",
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]

View File

@ -211,6 +211,7 @@ pub async fn cleanup_expired_logs_for(
location: Path::from(""),
last_modified: DateTime::<Utc>::MIN_UTC,
size: 0,
e_tag: None,
},
);
let file_needs_time_adjustment =
@ -255,6 +256,7 @@ pub async fn cleanup_expired_logs_for(
location: current_file.1.location.clone(),
last_modified: last_file.1.last_modified.add(Duration::seconds(1)),
size: 0,
e_tag: None,
},
);
maybe_delete_files.push(updated);

View File

@ -145,7 +145,7 @@ impl SchemaProvider for ListingSchemaProvider {
mod tests {
use super::*;
use datafusion::assert_batches_sorted_eq;
use datafusion::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::execution::context::SessionContext;
#[test]

View File

@ -36,8 +36,9 @@ use arrow_array::StringArray;
use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
@ -45,7 +46,6 @@ use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
@ -1377,7 +1377,6 @@ mod tests {
use arrow::array::StructArray;
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
@ -1558,6 +1557,7 @@ mod tests {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
range: None,
@ -1575,8 +1575,8 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["a", "b", "c", "d"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
],
)
.unwrap();

View File

@ -333,7 +333,6 @@ mod tests {
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::assert_batches_sorted_eq;
use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;
use std::sync::Arc;
@ -358,9 +357,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-02",
@ -411,9 +410,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-02",
@ -435,9 +434,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-02",
@ -586,9 +585,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-03",
"2021-02-02",
@ -644,9 +643,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-03",
"2021-02-02",

View File

@ -4,9 +4,9 @@ use arrow::array::ArrayRef;
use arrow::datatypes::{
DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::file_format::wrap_partition_type_in_dict;
use datafusion_common::config::ConfigOptions;
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference};
@ -362,6 +362,10 @@ impl ContextProvider for DummyContextProvider {
fn options(&self) -> &ConfigOptions {
&self.options
}
fn get_window_meta(&self, _name: &str) -> Option<Arc<datafusion_expr::WindowUDF>> {
unimplemented!()
}
}
#[cfg(test)]

View File

@ -596,7 +596,6 @@ mod tests {
use arrow::record_batch::RecordBatch;
use arrow_array::Int32Array;
use datafusion::assert_batches_sorted_eq;
use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;
use std::sync::Arc;
@ -650,9 +649,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-02",
@ -701,9 +700,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-03",
@ -756,9 +755,9 @@ mod tests {
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-02",
"2021-02-03",

View File

@ -149,6 +149,7 @@ fn try_configure_memory(storage_url: &Url) -> DeltaResult<Arc<DynObjectStore>> {
}
#[cfg(feature = "gcs")]
#[allow(deprecated)]
fn try_configure_gcs(
storage_url: &Url,
options: &StorageOptions,
@ -172,6 +173,7 @@ fn try_configure_gcs(
}
#[cfg(feature = "azure")]
#[allow(deprecated)]
fn try_configure_azure(
storage_url: &Url,
options: &StorageOptions,
@ -196,6 +198,7 @@ fn try_configure_azure(
}
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[allow(deprecated)]
fn try_configure_s3(
storage_url: &Url,
options: &StorageOptions,

View File

@ -5,8 +5,8 @@
use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::{
local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetResult,
ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore,
local::LocalFileSystem, path::Path as ObjectStorePath, Error as ObjectStoreError, GetOptions,
GetResult, ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore,
Result as ObjectStoreResult,
};
use std::ops::Range;
@ -153,6 +153,14 @@ impl ObjectStore for FileStorageBackend {
self.inner.get(location).await
}
async fn get_opts(
&self,
location: &ObjectStorePath,
options: GetOptions,
) -> ObjectStoreResult<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_range(
&self,
location: &ObjectStorePath,

View File

@ -8,6 +8,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::{stream::BoxStream, StreamExt};
use lazy_static::lazy_static;
use object_store::GetOptions;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@ -210,6 +211,13 @@ impl ObjectStore for DeltaObjectStore {
self.storage.get(location).await
}
/// Perform a get request with options
///
/// Note: options.range will be ignored if [`GetResult::File`]
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.storage.get_opts(location, options).await
}
/// Return the bytes that are stored at the specified location
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {

View File

@ -7,8 +7,8 @@ use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore, Result as ObjectStoreResult,
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
};
use rusoto_core::{HttpClient, Region};
use rusoto_credential::AutoRefreshingProvider;
@ -451,6 +451,10 @@ impl ObjectStore for S3StorageBackend {
self.inner.get(location).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
self.inner.get_range(location, range).await
}

View File

@ -94,6 +94,7 @@ impl TryFrom<&Add> for ObjectMeta {
location: Path::parse(value.path.as_str())?,
last_modified,
size: value.size as usize,
e_tag: None,
})
}
}

View File

@ -310,27 +310,25 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
async fn write(&mut self, values: Vec<Value>) -> Result<(), DeltaTableError> {
let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new();
let arrow_schema = self.arrow_schema();
let divided = self.divide_by_partition_values(values)?;
let partition_columns = self.partition_columns.clone();
let writer_properties = self.writer_properties.clone();
for (key, values) in self.divide_by_partition_values(values)? {
for (key, values) in divided {
match self.arrow_writers.get_mut(&key) {
Some(writer) => collect_partial_write_failure(
&mut partial_writes,
writer
.write_values(&self.partition_columns, arrow_schema.clone(), values)
.await,
)?,
Some(writer) => {
let result = writer
.write_values(&partition_columns, arrow_schema.clone(), values)
.await;
collect_partial_write_failure(&mut partial_writes, result)?;
}
None => {
let schema =
arrow_schema_without_partitions(&arrow_schema, &self.partition_columns);
let mut writer = DataArrowWriter::new(schema, self.writer_properties.clone())?;
collect_partial_write_failure(
&mut partial_writes,
writer
.write_values(&self.partition_columns, self.arrow_schema(), values)
.await,
)?;
let schema = arrow_schema_without_partitions(&arrow_schema, &partition_columns);
let mut writer = DataArrowWriter::new(schema, writer_properties.clone())?;
let result = writer
.write_values(&partition_columns, arrow_schema.clone(), values)
.await;
collect_partial_write_failure(&mut partial_writes, result)?;
self.arrow_writers.insert(key, writer);
}
}

View File

@ -11,10 +11,11 @@ use arrow::datatypes::{
use arrow::record_batch::RecordBatch;
use common::datafusion::context_with_delta_table_factory;
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label};
use datafusion::physical_plan::{common::collect, metrics::Label};
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::ScalarValue::*;

View File

@ -9,8 +9,8 @@ use deltalake::{storage::s3::S3StorageBackend, DeltaTableBuilder, ObjectStore};
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta,
Result as ObjectStoreResult,
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, Result as ObjectStoreResult,
};
use serial_test::serial;
use std::ops::Range;
@ -177,6 +177,10 @@ impl ObjectStore for DelayedObjectStore {
self.inner.get(location).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
self.inner.get_range(location, range).await
}