-
Notifications
You must be signed in to change notification settings - Fork 319
feat(datafusion): implement the partitioning node for DataFusion to define the partitioning #1620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…the best partition strategy for Iceberg for writing - Implement hash partitioning for partitioned/bucketed tables - Use round-robin partitioning for unpartitioned tables - Support range distribution mode approximation via sort columns
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
/// | ||
/// If no suitable hash columns are found (e.g., unpartitioned, non-bucketed table), | ||
/// falls back to round-robin batch partitioning for even load distribution. | ||
fn determine_partitioning_strategy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting to see. At first I just thought two cases:
- If it's partitioned table, we should just hash partition.
- If it's not partitioned, we should just use round robin partition.
However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.
Also I don't think we should take into account write.distribution-mode
for now. The example you use are for spark, but not applicable for datafusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.
Hum. You are right. The range partitions concentrate data in recent partitions, making hash partitioning counterproductive (considering a date with a temporal partition).
Since DataFusion doesn't provide Range, the fallback is round-robin and not hashing.
Briefly:
- Hash partition: Only on bucket columns (partition spec + sort order)
- Round-robin: Everything else (unpartitioned, range, identity, temporal transforms)
Also I don't think we should take into account write.distribution-mode for now. The example you use are for spark, but not applicable for datafusion.
Oh, good point, I misunderstood this. I thought it was an iceberg-rust table property.
…robin for range partitions Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
…rtitioning strategy Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
e8eb255
to
48ffd26
Compare
…t-into-datafusion
…t-into-datafusion
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
crates/integrations/datafusion/src/physical_plan/repartition.rs
Outdated
Show resolved
Hide resolved
let idx = input_schema | ||
.index_of(name) | ||
.map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?; | ||
Ok(Arc::new(Column::new(name, idx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? I assume the project node should happen before repartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding more information to the error log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's I'm saying is that when inserting into a partitioned table in iceberg, the physical plan should looks like
WriteExec
|
RepartitionExec
|
ProjectExec
|
TableScanExec
Where ProjectExec
will add an extra column called _partition
, which is the partition value of each row. And when the RepartitionExec
chooses to use HashPartition
, it should repartition using the _partition
field, rather than the source field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearer, thanks!
Indeed, this part needs to be reworked when ProjectExec
is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we should resolve #1602 first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
f24ba1a
to
77ca951
Compare
…e unused parameters
77ca951
to
1a8d65d
Compare
Which issue does this PR close?
What changes are included in this PR?
Implement a physical execution repartition node that determines the relevant DataFusion partitioning strategy based on the Iceberg table schema and metadata.
Minor change: I created a new
schema_ref()
helper method.Are these changes tested?
Yes, with unit tests