Skip to content

Conversation

CTTY
Copy link
Contributor

@CTTY CTTY commented Sep 7, 2025

Which issue does this PR close?

What changes are included in this PR?

Refactored the writer layers; from a bird’s-eye view, the structure now looks like this:

flowchart TD
    subgraph PartitioningWriter
        PW[PartitioningWriter]

        subgraph DataFileWriter
            RW[DataFileWriter]

            subgraph RollingWriter
                DFW[RollingWriter]

                subgraph FileWriter
                    FW[FileWriter]
                end

                DFW --> FW
            end

            RW --> DFW
        end

        PW --> RW
    end


Loading

Key Changes

  1. Enhanced Partition Handling

    • Added PartitionKey accessor methods and utility functions
    • Introduced PartitioningWriter and PartitioningWriterBuilder traits
  2. Refactored File Writer Architecture

    • Modified RollingFileWriter to handle location generator, file name generator, and partition keys directly
    • Simplified ParquetWriterBuilder interface to accept output files during build
    • Restructured DataFileWriterBuilder to use RollingFileWriter with partition keys
    • Updated DataFusion integration to work with the new writer architecture
  • NOTE: Technically DataFusion or any engine should use TaskWriter -> PartitioningWriter -> RollingWriter -> ..., but TaskWriter and PartitioningWriter are not included in this draft so far

Are these changes tested?

Not yet, but changing the existing tests accordingly should be enough

/// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size.
impl<B, L, F> RollingWriter<B, L, F>
where
B: IcebergWriterBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing need to noticed is that following is what IcebergWriterBuilder looks like.

#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
    Send + Clone + 'static
{
    /// The associated writer type.
    type R: IcebergWriter<I, O>;
    /// Build the iceberg writer.
    async fn build(self) -> Result<Self::R>;
}

For writer like position delete writer, it has different input like following, see: #704

#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
    for PositionDeleteWriterBuilder<B>
{
    type R = PositionDeleteWriter<B>;

    async fn build(self) -> Result<Self::R> {
        Ok(PositionDeleteWriter {
            inner_writer: Some(self.inner.build().await?),
            partition_value: self.partition_value.unwrap_or(Struct::empty()),
        })
    }
}

And that's why rolling writer is a FileWriter at first. After we adopt this design, how can we something like

RollingWriter<PostitionDeletWriter>

Copy link
Contributor Author

@CTTY CTTY Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your concern is valid, we may need to expose I and O in the RollingWriter as well, and that should solve this problem?

pub struct RollingWriter<B, L, F, I, O>
where
    B: IcebergWriterBuilder<I, O>,
    L: LocationGenerator,
    F: FileNameGenerator,

Meanwhile I've been wondering how useful is the abstraction of IcebergWriter... If we separate RollingWriter into RollingPositionalDeletesWriter and RollingXXXWriter and have them use concrete types then this would be a lot easier

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile I've been wondering how useful is the abstraction of IcebergWriter

E.g the user want to custom their own writer with to track some metrics like following:

RollingWriter<TrackPositionalDeletesWriter>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think custom writers can either implement FileWriter (lightweighted, file-level customization) or PartitioningWriter (heavier, customization across multiple partitions).

In your example, the custom writer can implement FileWriter and be used like this:

RollingPositionalDeletesWriter<TrackWriter>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think custom writers can either implement FileWriter (lightweighted, file-level customization) or PartitioningWriter (heavier, customization across multiple partitions).

E.g. user want to access PositionDeleteInput directly.

pub struct RollingWriter<B, L, F, I, O>
where
    B: IcebergWriterBuilder<I, O>,
    L: LocationGenerator,
    F: FileNameGenerator,

I think this way can be easier to extend in the future and give more feasible to let user custom. But both way looks good to me if it introduce too much unnecessary complication.


/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we will also need to change the DefaultOutput for IcebergWriter from Vec<DataFile> to Vec<DataFileBuilder> since IcebergWriter is no longer the outermost writer

@liurenjie1024
Copy link
Contributor

Hi, @CTTY Seems this is not updated following our discussion?

@CTTY
Copy link
Contributor Author

CTTY commented Sep 17, 2025

Hi @liurenjie1024 , do you mean that we should also include TaskWriter and have TaskWriter to split batches by partition? This draft mainly focuses on refactoring the existing layers and have RollingWriter to become the top-level writer as of now, and I haven't incoporated this with an actual partitioning writer or task writer yet. Or do you think it's better to have everything in one draft?

type DefaultOutput = Vec<DataFile>;

/// The partitioning writer used to write data to multiple partitions.
pub trait PartitioningWriter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add #[async_trait] annotation?

/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
/// Build the iceberg writer with the provided output file.
async fn build(self, output_file: OutputFile) -> Result<Self::R>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this change per our discussion?

@liurenjie1024
Copy link
Contributor

Hi @liurenjie1024 , do you mean that we should also include TaskWriter and have TaskWriter to split batches by partition? This draft mainly focuses on refactoring the existing layers and have RollingWriter to become the top-level writer as of now, and I haven't incoporated this with an actual partitioning writer or task writer yet. Or do you think it's better to have everything in one draft?

Hi, @CTTY I'm not saying we should include TaskWriter. Per our discussion, we should have following dependency:

PartitionedWriter
           |
 DataFileWriter(EqDeleateWriter, PositionDeleteWriter) -> This layer is IcebergWriter
          |
RollingFileWriter 
         |
FileWriter(Parquet, ORC)    -> This layer  is file format writer

@CTTY CTTY force-pushed the ctty/idk-partition branch from ac264fc to 2ac588f Compare September 21, 2025 03:26
// ///
// /// Once a partition has been written to and closed, any further attempts
// /// to write to that partition will result in an error.
// pub struct ClusteredWriter<B: IcebergWriterBuilder, I: Default + Send = DefaultInput, O: Default + Send = DefaultOutput>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this for now, I think it's better to keep this draft/round of changes focused on the interfaces changes with existing writer

@CTTY CTTY requested a review from liurenjie1024 September 21, 2025 03:37
Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @CTTY for this pr. I think we are on the right track.

partition_value: Struct,
partition_spec_id: i32,
pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner_writer: Option<RollingFileWriter<B, L, F>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be Option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, it's that we need to take the rolling writer when closing the DataFileWriter, and I haven't found a nice solution to do it without Option, and I also don't think it makes much sense to implement Default for RollingFileWriter

use crate::writer::{DefaultInput, DefaultOutput};

#[async_trait::async_trait]
pub trait PartitioningWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a builder for partition writer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decouple ParquetWriter and LocationGenerator
3 participants