Skip to content

Conversation

fvaleye
Copy link
Contributor

@fvaleye fvaleye commented Aug 21, 2025

Which issue does this PR close?

What changes are included in this PR?

Implement a physical execution repartition node that determines the relevant DataFusion partitioning strategy based on the Iceberg table schema and metadata.

  • Implement hash partitioning for partitioned/bucketed tables
  • Use round-robin partitioning for unpartitioned tables

Minor change: I created a new schema_ref() helper method.

Are these changes tested?

Yes, with unit tests

…the best partition strategy for Iceberg for writing

- Implement hash partitioning for partitioned/bucketed tables
- Use round-robin partitioning for unpartitioned tables
- Support range distribution mode approximation via sort columns
@fvaleye fvaleye changed the title feat(datafusion): implement repartition node for DataFusion with feat(datafusion): implement the partitioning node for DataFusion to define the partitioning Aug 21, 2025
///
/// If no suitable hash columns are found (e.g., unpartitioned, non-bucketed table),
/// falls back to round-robin batch partitioning for even load distribution.
fn determine_partitioning_strategy(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting to see. At first I just thought two cases:

  1. If it's partitioned table, we should just hash partition.
  2. If it's not partitioned, we should just use round robin partition.

However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.

Also I don't think we should take into account write.distribution-mode for now. The example you use are for spark, but not applicable for datafusion.

Copy link
Contributor Author

@fvaleye fvaleye Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, this reminds me another case: range only partition, e.g. we only has partitions like date, time. I think in this case we should also use round robin partition since in this case most data are focused in several partitions.

Hum. You are right. The range partitions concentrate data in recent partitions, making hash partitioning counterproductive (considering a date with a temporal partition).
Since DataFusion doesn't provide Range, the fallback is round-robin and not hashing.

Briefly:

  • Hash partition: Only on bucket columns (partition spec + sort order)
  • Round-robin: Everything else (unpartitioned, range, identity, temporal transforms)

Also I don't think we should take into account write.distribution-mode for now. The example you use are for spark, but not applicable for datafusion.

Oh, good point, I misunderstood this. I thought it was an iceberg-rust table property.

…robin for range partitions

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
…rtitioning strategy

Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
@fvaleye fvaleye force-pushed the feature/implement-repartition-node-for-insert-into-datafusion branch from e8eb255 to 48ffd26 Compare August 29, 2025 12:10
let idx = input_schema
.index_of(name)
.map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?;
Ok(Arc::new(Column::new(name, idx))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? I assume the project node should happen before repartition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding more information to the error log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's I'm saying is that when inserting into a partitioned table in iceberg, the physical plan should looks like

WriteExec
      |
RepartitionExec
     |
ProjectExec
    |
TableScanExec      

Where ProjectExec will add an extra column called _partition, which is the partition value of each row. And when the RepartitionExec chooses to use HashPartition, it should repartition using the _partition field, rather than the source field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearer, thanks!
Indeed, this part needs to be reworked when ProjectExec is merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should resolve #1602 first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

@fvaleye fvaleye force-pushed the feature/implement-repartition-node-for-insert-into-datafusion branch from f24ba1a to 77ca951 Compare September 1, 2025 18:20
@fvaleye fvaleye force-pushed the feature/implement-repartition-node-for-insert-into-datafusion branch from 77ca951 to 1a8d65d Compare September 1, 2025 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Repartition Node: Decide when the partitioning mode for the best parallelism
2 participants