Skip to content

Commit ac264fc

Browse files
committed
expose IO in rollingwriter
1 parent d7b4eeb commit ac264fc

File tree

2 files changed

+32
-26
lines changed

2 files changed

+32
-26
lines changed

crates/iceberg/src/writer/file_writer/rolling_writer.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
use arrow_array::RecordBatch;
1919

2020
use crate::io::FileIO;
21-
use crate::spec::{DataFile, PartitionKey};
21+
use crate::spec::PartitionKey;
2222
use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator};
23-
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
23+
use crate::writer::{
24+
CurrentFileStatus, DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder,
25+
};
2426
use crate::{Error, ErrorKind, Result};
2527

2628
/// A writer that automatically rolls over to a new file when the data size
@@ -29,11 +31,12 @@ use crate::{Error, ErrorKind, Result};
2931
/// This writer wraps another writer that tracks the amount of data written.
3032
/// When the data size exceeds the target size, it closes the current file and
3133
/// starts writing to a new one.
32-
pub struct RollingWriter<B, L, F>
34+
pub struct RollingWriter<B, L, F, I = DefaultInput, O = DefaultOutput>
3335
where
34-
B: IcebergWriterBuilder,
36+
B: IcebergWriterBuilder<I, O>,
3537
L: LocationGenerator,
3638
F: FileNameGenerator,
39+
O: IntoIterator,
3740
{
3841
inner: Option<B::R>,
3942
inner_builder: B,
@@ -42,15 +45,17 @@ where
4245
file_name_generator: F,
4346
file_io: FileIO,
4447
partition_key: Option<PartitionKey>,
45-
data_files: Vec<DataFile>, // todo this should be B::R::O? DefaultOutput?
48+
data_files: Vec<O::Item>,
4649
}
4750

48-
impl<B, L, F> RollingWriter<B, L, F>
51+
impl<B, L, F, I, O> RollingWriter<B, L, F, I, O>
4952
where
50-
B: IcebergWriterBuilder,
53+
B: IcebergWriterBuilder<I, O>,
5154
B::R: CurrentFileStatus,
5255
L: LocationGenerator,
5356
F: FileNameGenerator,
57+
O: IntoIterator,
58+
O::Item: Clone,
5459
{
5560
/// Creates a new `RollingWriter` with the specified inner builder and target size.
5661
///
@@ -108,12 +113,12 @@ where
108113
Ok(writer)
109114
}
110115

111-
/// Write a record batch to the current file, rolling over to a new file if necessary.
116+
/// Write input data to the current file, rolling over to a new file if necessary.
112117
///
113118
/// # Arguments
114119
///
115-
/// * `batch` - The record batch to write
116-
pub async fn write(&mut self, batch: RecordBatch) -> Result<()> {
120+
/// * `input` - The input data to write
121+
pub async fn write(&mut self, input: I) -> Result<()> {
117122
if self.inner.is_none() {
118123
// initialize inner writer
119124
self.inner = Some(self.create_new_writer().await?);
@@ -131,7 +136,7 @@ where
131136

132137
// write the input
133138
if let Some(writer) = &mut self.inner {
134-
writer.write(batch).await
139+
writer.write(input).await
135140
} else {
136141
Err(Error::new(
137142
ErrorKind::Unexpected,
@@ -140,8 +145,8 @@ where
140145
}
141146
}
142147

143-
/// Close the writer and return the data files for all files.
144-
pub async fn close(&mut self) -> Result<Vec<DataFile>> {
148+
/// Close the writer and return the output items for all files.
149+
pub async fn close(&mut self) -> Result<Vec<O::Item>> {
145150
// close the current writer and merge the output
146151
if let Some(mut current_writer) = self.inner.take() {
147152
self.data_files.extend(current_writer.close().await?);
@@ -151,12 +156,13 @@ where
151156
}
152157
}
153158

154-
impl<B, L, F> CurrentFileStatus for RollingWriter<B, L, F>
159+
impl<B, L, F, I, O> CurrentFileStatus for RollingWriter<B, L, F, I, O>
155160
where
156-
B: IcebergWriterBuilder,
157-
B::R: IcebergWriter + CurrentFileStatus,
161+
B: IcebergWriterBuilder<I, O>,
162+
B::R: IcebergWriter<I, O> + CurrentFileStatus,
158163
L: LocationGenerator,
159164
F: FileNameGenerator,
165+
O: IntoIterator,
160166
{
161167
fn current_file_path(&self) -> String {
162168
if let Some(inner) = &self.inner {

crates/iceberg/src/writer/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,20 @@ use arrow_array::RecordBatch;
230230

231231
use crate::Result;
232232
use crate::io::OutputFile;
233-
use crate::spec::DataFile;
233+
use crate::spec::{DataFile, PartitionKey};
234234

235235
type DefaultInput = RecordBatch;
236236
type DefaultOutput = Vec<DataFile>;
237237

238+
/// The partitioning writer used to write data to multiple partitions.
239+
pub trait PartitioningWriter {
240+
/// Write a record batch, all rows from this record batch should come from one partition
241+
fn write(&mut self, partition_key: PartitionKey, batch: RecordBatch) -> Result<()>;
242+
243+
/// Close all writers and return the data files.
244+
fn close(&mut self) -> Result<Vec<DataFile>>;
245+
}
246+
238247
/// The builder for iceberg writer.
239248
#[async_trait::async_trait]
240249
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
@@ -269,15 +278,6 @@ pub trait CurrentFileStatus {
269278
fn current_written_size(&self) -> usize;
270279
}
271280

272-
/// The partitioning writer used to write data to multiple partitions.
273-
pub trait PartitioningWriter {
274-
/// Write a record batch, which may contain rows for multiple partitions.
275-
fn write(&mut self, batch: RecordBatch) -> Result<()>;
276-
277-
/// Close all writers and return the data files.
278-
fn close(&mut self) -> Result<Vec<DataFile>>;
279-
}
280-
281281
#[cfg(test)]
282282
mod tests {
283283
use arrow_array::RecordBatch;

0 commit comments

Comments
 (0)