-
Notifications
You must be signed in to change notification settings - Fork 319
refactor(writer): Refactor writers for the future partitioning writers #1657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
/// Creates a new `RollingFileWriterBuilder` with the specified inner builder and target size. | ||
impl<B, L, F> RollingWriter<B, L, F> | ||
where | ||
B: IcebergWriterBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing need to noticed is that following is what IcebergWriterBuilder looks like.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Send + Clone + 'static
{
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer.
async fn build(self) -> Result<Self::R>;
}
For writer like position delete writer, it has different input like following, see: #704
#[async_trait::async_trait]
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
for PositionDeleteWriterBuilder<B>
{
type R = PositionDeleteWriter<B>;
async fn build(self) -> Result<Self::R> {
Ok(PositionDeleteWriter {
inner_writer: Some(self.inner.build().await?),
partition_value: self.partition_value.unwrap_or(Struct::empty()),
})
}
}
And that's why rolling writer is a FileWriter at first. After we adopt this design, how can we something like
RollingWriter<PostitionDeletWriter>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your concern is valid, we may need to expose I
and O
in the RollingWriter
as well, and that should solve this problem?
pub struct RollingWriter<B, L, F, I, O>
where
B: IcebergWriterBuilder<I, O>,
L: LocationGenerator,
F: FileNameGenerator,
Meanwhile I've been wondering how useful is the abstraction of IcebergWriter
... If we separate RollingWriter
into RollingPositionalDeletesWriter
and RollingXXXWriter
and have them use concrete types then this would be a lot easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meanwhile I've been wondering how useful is the abstraction of IcebergWriter
E.g the user want to custom their own writer with to track some metrics like following:
RollingWriter<TrackPositionalDeletesWriter>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think custom writers can either implement FileWriter
(lightweighted, file-level customization) or PartitioningWriter
(heavier, customization across multiple partitions).
In your example, the custom writer can implement FileWriter and be used like this:
RollingPositionalDeletesWriter<TrackWriter>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think custom writers can either implement FileWriter (lightweighted, file-level customization) or PartitioningWriter (heavier, customization across multiple partitions).
E.g. user want to access PositionDeleteInput directly.
pub struct RollingWriter<B, L, F, I, O>
where
B: IcebergWriterBuilder<I, O>,
L: LocationGenerator,
F: FileNameGenerator,
I think this way can be easier to extend in the future and give more feasible to let user custom. But both way looks good to me if it introduce too much unnecessary complication.
ad66fa5
to
ac264fc
Compare
|
||
/// The builder for iceberg writer. | ||
#[async_trait::async_trait] | ||
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we will also need to change the DefaultOutput for IcebergWriter
from Vec<DataFile>
to Vec<DataFileBuilder>
since IcebergWriter
is no longer the outermost writer
Hi, @CTTY Seems this is not updated following our discussion? |
Hi @liurenjie1024 , do you mean that we should also include |
crates/iceberg/src/writer/mod.rs
Outdated
type DefaultOutput = Vec<DataFile>; | ||
|
||
/// The partitioning writer used to write data to multiple partitions. | ||
pub trait PartitioningWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add #[async_trait]
annotation?
crates/iceberg/src/writer/mod.rs
Outdated
/// Build the iceberg writer. | ||
async fn build(self) -> Result<Self::R>; | ||
/// Build the iceberg writer with the provided output file. | ||
async fn build(self, output_file: OutputFile) -> Result<Self::R>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this change per our discussion?
Hi, @CTTY I'm not saying we should include
|
ac264fc
to
2ac588f
Compare
// /// | ||
// /// Once a partition has been written to and closed, any further attempts | ||
// /// to write to that partition will result in an error. | ||
// pub struct ClusteredWriter<B: IcebergWriterBuilder, I: Default + Send = DefaultInput, O: Default + Send = DefaultOutput> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore this for now, I think it's better to keep this draft/round of changes focused on the interfaces changes with existing writer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY for this pr. I think we are on the right track.
partition_value: Struct, | ||
partition_spec_id: i32, | ||
pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> { | ||
inner_writer: Option<RollingFileWriter<B, L, F>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be Option
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, it's that we need to take
the rolling writer when closing the DataFileWriter
, and I haven't found a nice solution to do it without Option
, and I also don't think it makes much sense to implement Default
for RollingFileWriter
use crate::writer::{DefaultInput, DefaultOutput}; | ||
|
||
#[async_trait::async_trait] | ||
pub trait PartitioningWriterBuilder<I = DefaultInput, O = DefaultOutput>: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a builder for partition writer?
Which issue does this PR close?
What changes are included in this PR?
Refactored the writer layers; from a bird’s-eye view, the structure now looks like this:
Key Changes
Enhanced Partition Handling
PartitionKey
accessor methods and utility functionsPartitioningWriter
andPartitioningWriterBuilder
traitsRefactored File Writer Architecture
RollingFileWriter
to handle location generator, file name generator, and partition keys directlyParquetWriterBuilder
interface to accept output files during buildDataFileWriterBuilder
to useRollingFileWriter
with partition keysTaskWriter
->PartitioningWriter
->RollingWriter
-> ..., butTaskWriter
andPartitioningWriter
are not included in this draft so farAre these changes tested?
Not yet, but changing the existing tests accordingly should be enough