chore(rust): bump arrow v51 and datafusion v37.1 (#2395)

# Description
Update the arrow and datafusion dependencies.

# Related Issue(s)
- closes #2328

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
Co-authored-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
This commit is contained in:
Luis 2024-04-26 20:28:50 +02:00 committed by GitHub
parent 6a7c684d9b
commit 9d3ecbeb62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 284 additions and 135 deletions

View File

@ -1,9 +1,5 @@
[workspace]
members = [
"crates/*",
"delta-inspect",
"python",
]
members = ["crates/*", "delta-inspect", "python"]
exclude = ["proofs"]
resolver = "2"
@ -31,28 +27,29 @@ debug = "line-tables-only"
[workspace.dependencies]
# arrow
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50", features = ["chrono-tz"]}
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
arrow = { version = "51" }
arrow-arith = { version = "51" }
arrow-array = { version = "51", features = ["chrono-tz"] }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51" }
arrow-ipc = { version = "51" }
arrow-json = { version = "51" }
arrow-ord = { version = "51" }
arrow-row = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
object_store = { version = "0.9" }
parquet = { version = "50" }
parquet = { version = "51" }
# datafusion
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }
datafusion = { version = "37.1" }
datafusion-expr = { version = "37.1" }
datafusion-common = { version = "37.1" }
datafusion-proto = { version = "37.1" }
datafusion-sql = { version = "37.1" }
datafusion-physical-expr = { version = "37.1" }
datafusion-functions = { version = "37.1" }
datafusion-functions-array = { version = "37.1" }
# serde
serde = { version = "1.0.194", features = ["derive"] }

View File

@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }
datafusion-functions-array = { workspace = true, optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
@ -123,6 +124,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"datafusion-functions-array",
"sqlparser",
]
datafusion-ext = ["datafusion"]

View File

@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider {
self.tables.iter().map(|t| t.key().clone()).collect()
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let location = self.tables.get(name).map(|t| t.clone())?;
let provider = open_table_with_storage_options(location, self.storage_options.0.clone())
.await
.ok()?;
Some(Arc::new(provider) as Arc<dyn TableProvider>)
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
}
fn register_table(

View File

@ -8,6 +8,7 @@ use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use tracing::error;
use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse};
@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider {
self.table_names.clone()
}
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let maybe_table = self
.client
.get_table(&self.catalog_name, &self.schema_name, name)
.await
.ok()?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;
match maybe_table {
GetTableResponse::Success(table) => {
let table = DeltaTableBuilder::from_uri(table.storage_location)
.with_storage_options(self.storage_options.clone())
.load()
.await
.ok()?;
Some(Arc::new(table))
.await?;
Ok(Some(Arc::new(table)))
}
GetTableResponse::Error(err) => {
error!("failed to fetch table from unity catalog: {}", err.message);
None
Err(DataFusionError::External(Box::new(err)))
}
}
}

View File

@ -1,17 +1,24 @@
//! Api models for databricks unity catalog APIs
use core::fmt;
use std::collections::HashMap;
use serde::Deserialize;
/// Error response from unity API
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
/// The error code
pub error_code: String,
/// The error message
pub message: String,
}
impl fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "[{}] {}", self.error_code, self.message)
}
}
impl std::error::Error for ErrorResponse {}
/// List catalogs response
#[derive(Deserialize)]

View File

@ -22,7 +22,7 @@
//! Utility functions for Datafusion's Expressions
use std::{
fmt::{self, format, Display, Error, Formatter, Write},
fmt::{self, Display, Error, Formatter, Write},
sync::Arc,
};
@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}
fn udfs_names(&self) -> Vec<String> {
unimplemented!()
}
fn udafs_names(&self) -> Vec<String> {
unimplemented!()
}
fn udwfs_names(&self) -> Vec<String> {
unimplemented!()
}
}
/// Parse a string predicate into an `Expr`
@ -416,8 +428,13 @@ mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
};
use datafusion_functions::core::arrow_cast;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions_array::expr_fn::cardinality;
use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
@ -539,13 +556,24 @@ mod test {
// String expression that we output must be parsable for conflict resolution.
let tests = vec![
simple!(
Expr::Cast(Cast {
ParseTest {
expr: Expr::Cast(Cast {
expr: Box::new(lit(1_i64)),
data_type: ArrowDataType::Int32
}),
"arrow_cast(1, 'Int32')".to_string()
),
expected: "arrow_cast(1, 'Int32')".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Utf8(Some("Int32".into())))
]
}
)
),
},
simple!(
Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)),
"Value3 = 3".to_string()
@ -624,9 +652,8 @@ mod test {
substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")),
"substr(modified, 0, 4) = '2021'".to_string()
),
simple!(
col("value")
.cast_to(
ParseTest {
expr: col("value").cast_to(
&arrow_schema::DataType::Utf8,
&table
.snapshot()
@ -640,8 +667,23 @@ mod test {
)
.unwrap()
.eq(lit("1")),
"arrow_cast(value, 'Utf8') = '1'".to_string()
),
expected: "arrow_cast(value, 'Utf8') = '1'".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::BinaryExpr(BinaryExpr {
left: Box::new(datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
col("value"),
lit(ScalarValue::Utf8(Some("Utf8".into())))
]
}
)),
op: datafusion_expr::Operator::Eq,
right: Box::new(lit(ScalarValue::Utf8(Some("1".into()))))
})
),
},
simple!(
col("_struct").field("a").eq(lit(20_i64)),
"_struct['a'] = 20".to_string()
@ -662,11 +704,16 @@ mod test {
expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))),
expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(),
override_expected_expr: Some(col("_timestamp_ntz").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
}
))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, None)".into())))
]
}
)
)),
},
ParseTest {
expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond(
@ -675,10 +722,16 @@ mod test {
))),
expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(),
override_expected_expr: Some(col("_timestamp").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
}))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, Some(\"UTC\"))".into())))
]
}
)
)),
},
];

View File

@ -17,7 +17,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::create_physical_expr;
use lazy_static::lazy_static;
use crate::delta_datafusion::find_files::logical::FindFilesNode;
@ -29,6 +28,8 @@ use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
use crate::DeltaTableError;
use super::create_physical_expr_fix;
pub mod logical;
pub mod physical;
@ -160,8 +161,8 @@ async fn scan_table_by_files(
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
let predicate_expr = create_physical_expr_fix(
Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
state.execution_props(),
)?;

View File

@ -9,11 +9,13 @@ use arrow_schema::SchemaRef;
use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::Expr;
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::stream::BoxStream;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
@ -28,6 +30,7 @@ pub struct FindFilesExec {
predicate: Expr,
state: DeltaTableState,
log_store: LogStoreRef,
plan_properties: PlanProperties,
}
impl FindFilesExec {
@ -36,6 +39,11 @@ impl FindFilesExec {
predicate,
log_store,
state,
plan_properties: PlanProperties::new(
EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()),
Partitioning::RoundRobinBatch(num_cpus::get()),
ExecutionMode::Bounded,
),
})
}
}
@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec {
ONLY_FILES_SCHEMA.clone()
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::RoundRobinBatch(num_cpus::get())
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {

View File

@ -49,16 +49,15 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider,
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
Statistics,
};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
ToDFSchema,
@ -66,9 +65,14 @@ use datafusion_common::{
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
use datafusion_expr::{
col, Expr, Extension, GetFieldAccess, GetIndexedField, LogicalPlan,
TableProviderFilterPushDown, Volatility,
};
use datafusion_functions::expr_fn::get_field;
use datafusion_functions_array::extract::{array_element, array_slice};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
@ -248,7 +252,7 @@ pub(crate) fn files_matching_predicate<'a>(
if let Some(Some(predicate)) =
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let expr = logical_expr_to_physical_expr(&predicate, snapshot.arrow_schema()?.as_ref());
let expr = logical_expr_to_physical_expr(predicate, snapshot.arrow_schema()?.as_ref());
let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?;
Ok(Either::Left(
snapshot
@ -528,7 +532,7 @@ impl<'a> DeltaScanBuilder<'a> {
let logical_filter = self
.filter
.map(|expr| logical_expr_to_physical_expr(&expr, &logical_schema));
.map(|expr| logical_expr_to_physical_expr(expr, &logical_schema));
// Perform Pruning of files to scan
let files = match self.files {
@ -820,12 +824,8 @@ impl ExecutionPlan for DeltaScan {
self.parquet_scan.schema()
}
fn output_partitioning(&self) -> Partitioning {
self.parquet_scan.output_partitioning()
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.parquet_scan.output_ordering()
fn properties(&self) -> &PlanProperties {
self.parquet_scan.properties()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@ -934,6 +934,10 @@ pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarVal
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::RunEndEncoded(_, _)
| ArrowDataType::BinaryView
| ArrowDataType::Utf8View
| ArrowDataType::LargeListView(_)
| ArrowDataType::ListView(_)
| ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
"Unsupported data type for Delta Lake {}",
t
@ -1039,20 +1043,64 @@ pub(crate) fn to_correct_scalar_value(
}
pub(crate) fn logical_expr_to_physical_expr(
expr: &Expr,
expr: Expr,
schema: &ArrowSchema,
) -> Arc<dyn PhysicalExpr> {
let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
create_physical_expr_fix(expr, &df_schema, &execution_props).unwrap()
}
// TODO This should be removed after datafusion v38
pub(crate) fn create_physical_expr_fix(
expr: Expr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
// Support Expr::struct by rewriting expressions.
let expr = expr
.transform_up(&|expr| {
// see https://github.com/apache/datafusion/issues/10181
// This is part of the function rewriter code in DataFusion inlined here temporarily
Ok(match expr {
Expr::GetIndexedField(GetIndexedField {
expr,
field: GetFieldAccess::NamedStructField { name },
}) => {
let name = Expr::Literal(name);
Transformed::yes(get_field(*expr, name))
}
// expr[idx] ==> array_element(expr, idx)
Expr::GetIndexedField(GetIndexedField {
expr,
field: GetFieldAccess::ListIndex { key },
}) => Transformed::yes(array_element(*expr, *key)),
// expr[start, stop, stride] ==> array_slice(expr, start, stop, stride)
Expr::GetIndexedField(GetIndexedField {
expr,
field:
GetFieldAccess::ListRange {
start,
stop,
stride,
},
}) => Transformed::yes(array_slice(*expr, *start, *stop, *stride)),
_ => Transformed::no(expr),
})
})?
.data;
datafusion_physical_expr::create_physical_expr(&expr, input_dfschema, execution_props)
}
pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let data = futures::future::try_join_all(
(0..plan.properties().output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
@ -1064,8 +1112,9 @@ pub(crate) async fn execute_plan_to_batch(
DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;
}),
)
.await?;
let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;
@ -1315,9 +1364,9 @@ pub(crate) struct FindFilesExprProperties {
/// non-deterministic functions, and determine if the expression only contains
/// partition columns
impl TreeNodeVisitor for FindFilesExprProperties {
type N = Expr;
type Node = Expr;
fn pre_visit(&mut self, expr: &Self::N) -> datafusion_common::Result<VisitRecursion> {
fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
// TODO: We can likely relax the volatility to STABLE. Would require further
// research to confirm the same value is generated during the scan and
// rewrite phases.
@ -1358,7 +1407,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
};
if v > Volatility::Immutable {
@ -1366,7 +1415,7 @@ impl TreeNodeVisitor for FindFilesExprProperties {
"Find files predicate contains nondeterministic function {}",
func_def.name()
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
}
_ => {
@ -1374,11 +1423,11 @@ impl TreeNodeVisitor for FindFilesExprProperties {
"Find files predicate contains unsupported expression {}",
expr
)));
return Ok(VisitRecursion::Stop);
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(VisitRecursion::Continue)
Ok(TreeNodeRecursion::Continue)
}
}
@ -1478,8 +1527,8 @@ pub(crate) async fn find_files_scan<'a>(
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;
let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
let predicate_expr = create_physical_expr_fix(
Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
state.execution_props(),
)?;

View File

@ -82,12 +82,8 @@ impl ExecutionPlan for MetricObserverExec {
self.parent.schema()
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
self.parent.output_partitioning()
}
fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
self.parent.output_ordering()
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.parent.properties()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {

View File

@ -273,6 +273,10 @@ impl Scalar {
| Dictionary(_, _)
| RunEndEncoded(_, _)
| Union(_, _)
| Utf8View
| BinaryView
| ListView(_)
| LargeListView(_)
| Null => None,
}
}

View File

@ -128,7 +128,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
let plan: Arc<dyn ExecutionPlan> = Arc::new(scan);
let mut tasks = vec![];
for p in 0..plan.output_partitioning().partition_count() {
for p in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_checker = checker.clone();
let task_ctx = Arc::new(TaskContext::from(&state));

View File

@ -23,7 +23,6 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
use crate::logstore::LogStoreRef;
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
@ -37,7 +36,8 @@ use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext,
create_physical_expr_fix, find_files, register_store, DataFusionMixins, DeltaScanBuilder,
DeltaSessionContext,
};
use crate::errors::DeltaResult;
use crate::kernel::{Action, Add, Remove};
@ -148,11 +148,8 @@ async fn excute_non_empty_expr(
// Apply the negation of the filter and rewrite files
let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));
let predicate_expr = create_physical_expr(
&negated_expression,
&input_dfschema,
state.execution_props(),
)?;
let predicate_expr =
create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);

View File

@ -75,18 +75,14 @@ impl ExecutionPlan for MergeBarrierExec {
self.input.schema()
}
fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning {
self.input.output_partitioning()
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.input.properties()
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::HashPartitioned(vec![self.expr.clone()]); 1]
}
fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<std::sync::Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}

View File

@ -850,11 +850,12 @@ fn replace_placeholders(expr: Expr, placeholders: &HashMap<String, ScalarValue>)
Expr::Placeholder(Placeholder { id, .. }) => {
let value = placeholders[&id].clone();
// Replace the placeholder with the value
Ok(Transformed::Yes(Expr::Literal(value)))
Ok(Transformed::yes(Expr::Literal(value)))
}
_ => Ok(Transformed::No(expr)),
_ => Ok(Transformed::no(expr)),
})
.unwrap()
.data
}
async fn try_construct_early_filter(
@ -1468,16 +1469,17 @@ async fn execute(
fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
expr.transform(&|expr| match expr {
Expr::Column(c) => match c.relation {
Some(rel) if rel.table() == table_alias => Ok(Transformed::Yes(Expr::Column(
Some(rel) if rel.table() == table_alias => Ok(Transformed::yes(Expr::Column(
Column::new_unqualified(c.name),
))),
_ => Ok(Transformed::No(Expr::Column(Column::new(
_ => Ok(Transformed::no(Expr::Column(Column::new(
c.relation, c.name,
)))),
},
_ => Ok(Transformed::No(expr)),
_ => Ok(Transformed::no(expr)),
})
.unwrap()
.data
}
// TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future.

View File

@ -144,7 +144,7 @@ impl<'a> AddContainer<'a> {
/// so evaluating expressions is inexact. However, excluded files are guaranteed (for a correct log)
/// to not contain matches by the predicate expression.
pub fn predicate_matches(&self, predicate: Expr) -> DeltaResult<impl Iterator<Item = &Add>> {
let expr = logical_expr_to_physical_expr(&predicate, &self.schema);
let expr = logical_expr_to_physical_expr(predicate, &self.schema);
let pruning_predicate = PruningPredicate::try_new(expr, self.schema.clone())?;
Ok(self
.inner
@ -214,6 +214,21 @@ impl<'a> PruningStatistics for AddContainer<'a> {
ScalarValue::iter_to_array(values).ok()
}
/// return the number of rows for the named column in each container
/// as an `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
let values = self.inner.iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
ScalarValue::UInt64(Some(statistics.num_records as u64))
} else {
ScalarValue::UInt64(None)
}
});
ScalarValue::iter_to_array(values).ok()
}
// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
@ -257,6 +272,17 @@ impl PruningStatistics for EagerSnapshot {
container.null_counts(column)
}
/// return the number of rows for the named column in each container
/// as an `Option<UInt64Array>`.
///
/// Note: the returned array must contain `num_containers()` rows
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
let files = self.file_actions().ok()?.collect_vec();
let partition_columns = &self.metadata().partition_columns;
let container = AddContainer::new(&files, partition_columns, self.arrow_schema().ok()?);
container.row_counts(column)
}
// This function is required since DataFusion 35.0, but is implemented as a no-op
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
fn contained(&self, _column: &Column, _value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
@ -281,6 +307,10 @@ impl PruningStatistics for DeltaTableState {
self.snapshot.null_counts(column)
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
self.snapshot.row_counts(column)
}
fn contained(&self, column: &Column, values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
self.snapshot.contained(column, values)
}

View File

@ -34,7 +34,6 @@ use datafusion::{
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{case, col, lit, when, Expr};
use datafusion_physical_expr::{
create_physical_expr,
expressions::{self},
PhysicalExpr,
};
@ -49,8 +48,8 @@ use super::{
transaction::{CommitBuilder, CommitProperties},
};
use crate::delta_datafusion::{
expr::fmt_expr_to_sql, physical::MetricObserverExec, DataFusionMixins, DeltaColumn,
DeltaSessionContext,
create_physical_expr_fix, expr::fmt_expr_to_sql, physical::MetricObserverExec,
DataFusionMixins, DeltaColumn, DeltaSessionContext,
};
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::kernel::{Action, Remove};
@ -265,7 +264,8 @@ async fn execute(
let predicate_null =
when(predicate.clone(), lit(true)).otherwise(lit(ScalarValue::Boolean(None)))?;
let predicate_expr = create_physical_expr(&predicate_null, &input_dfschema, execution_props)?;
let predicate_expr =
create_physical_expr_fix(predicate_null, &input_dfschema, execution_props)?;
expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string()));
let projection_predicate: Arc<dyn ExecutionPlan> =
@ -312,7 +312,7 @@ async fn execute(
let expr = case(col("__delta_rs_update_predicate"))
.when(lit(true), expr.to_owned())
.otherwise(col(column.to_owned()))?;
let predicate_expr = create_physical_expr(&expr, &input_dfschema, execution_props)?;
let predicate_expr = create_physical_expr_fix(expr, &input_dfschema, execution_props)?;
map.insert(column.name.clone(), expressions.len());
let c = "__delta_rs_".to_string() + &column.name;
expressions.push((predicate_expr, c.clone()));

View File

@ -34,7 +34,6 @@ use arrow_array::RecordBatch;
use arrow_cast::can_cast_types;
use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use datafusion_common::DFSchema;
@ -49,7 +48,9 @@ use super::writer::{DeltaWriter, WriterConfig};
use super::CreateBuilder;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::expr::parse_predicate_expression;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::delta_datafusion::{
create_physical_expr_fix, find_files, register_store, DeltaScanBuilder,
};
use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType};
@ -375,7 +376,7 @@ async fn write_execution_plan_with_predicate(
// Write data to disk
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
for i in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_schema = schema.clone();
let task_ctx = Arc::new(TaskContext::from(&state));
@ -478,11 +479,8 @@ async fn execute_non_empty_expr(
// Apply the negation of the filter and rewrite files
let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));
let predicate_expr = create_physical_expr(
&negated_expression,
&input_dfschema,
state.execution_props(),
)?;
let predicate_expr =
create_physical_expr_fix(negated_expression, &input_dfschema, state.execution_props())?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);

View File

@ -440,7 +440,7 @@ mod local {
) -> Result<ExecutionMetricsCollector> {
let mut metrics = ExecutionMetricsCollector::default();
let scan = table.scan(state, None, e, None).await?;
if scan.output_partitioning().partition_count() > 0 {
if scan.properties().output_partitioning().partition_count() > 0 {
let plan = CoalescePartitionsExec::new(scan);
let task_ctx = Arc::new(TaskContext::from(state));
let _result = collect(plan.execute(0, task_ctx)?).await?;

View File

@ -155,6 +155,18 @@ mod tests {
fn get_window_meta(&self, _name: &str) -> Option<Arc<datafusion_expr::WindowUDF>> {
None
}
fn udfs_names(&self) -> Vec<String> {
Vec::new()
}
fn udafs_names(&self) -> Vec<String> {
Vec::new()
}
fn udwfs_names(&self) -> Vec<String> {
Vec::new()
}
}
fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {