Skip to content

Commit 32e2f7e

Browse files
xxchanchenzl25
authored andcommitted
feat: support incremental scan between 2 snapshots (#13)
1 parent b4da968 commit 32e2f7e

File tree

2 files changed

+173
-12
lines changed

2 files changed

+173
-12
lines changed

crates/iceberg/src/scan/context.rs

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashSet;
1819
use std::sync::Arc;
1920

2021
use futures::channel::mpsc::Sender;
2122
use futures::{SinkExt, TryFutureExt};
23+
use itertools::Itertools;
2224

2325
use crate::delete_file_index::DeleteFileIndex;
2426
use crate::expr::{Bind, BoundPredicate, Predicate};
@@ -28,11 +30,12 @@ use crate::scan::{
2830
PartitionFilterCache,
2931
};
3032
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32-
TableMetadataRef,
33+
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList,
34+
ManifestStatus, Operation, SchemaRef, SnapshotRef, TableMetadataRef,
3335
};
3436
use crate::{Error, ErrorKind, Result};
3537

38+
type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync;
3639
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3740
/// to process it in a thread-safe manner
3841
pub(crate) struct ManifestFileContext {
@@ -46,6 +49,10 @@ pub(crate) struct ManifestFileContext {
4649
snapshot_schema: SchemaRef,
4750
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4851
delete_file_index: DeleteFileIndex,
52+
53+
/// filter manifest entries.
54+
/// Used for different kind of scans, e.g., only scan newly added files without delete files.
55+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
4956
}
5057

5158
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -74,12 +81,13 @@ impl ManifestFileContext {
7481
mut sender,
7582
expression_evaluator_cache,
7683
delete_file_index,
77-
..
84+
filter_fn,
7885
} = self;
86+
let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true));
7987

8088
let manifest = object_cache.get_manifest(&manifest_file).await?;
8189

82-
for manifest_entry in manifest.entries() {
90+
for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) {
8391
let manifest_entry_context = ManifestEntryContext {
8492
// TODO: refactor to avoid the expensive ManifestEntry clone
8593
manifest_entry: manifest_entry.clone(),
@@ -149,6 +157,11 @@ pub(crate) struct PlanContext {
149157
pub partition_filter_cache: Arc<PartitionFilterCache>,
150158
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
151159
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
160+
161+
// for incremental scan.
162+
// If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
163+
pub from_snapshot_id: Option<i64>,
164+
pub to_snapshot_id: Option<i64>,
152165
}
153166

154167
impl PlanContext {
@@ -180,19 +193,72 @@ impl PlanContext {
180193
Ok(partition_filter)
181194
}
182195

183-
pub(crate) fn build_manifest_file_contexts(
196+
pub(crate) async fn build_manifest_file_contexts(
184197
&self,
185198
manifest_list: Arc<ManifestList>,
186199
tx_data: Sender<ManifestEntryContext>,
187200
delete_file_idx: DeleteFileIndex,
188201
delete_file_tx: Sender<ManifestEntryContext>,
189202
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>> + 'static>> {
190-
let manifest_files = manifest_list.entries().iter();
203+
let mut filter_fn: Option<Arc<ManifestEntryFilterFn>> = None;
204+
let manifest_files = {
205+
if let Some(to_snapshot_id) = self.to_snapshot_id {
206+
// Incremental scan mode:
207+
// Get all added files between two snapshots.
208+
// - data files in `Append` and `Overwrite` snapshots are included.
209+
// - delete files are ignored
210+
// - `Replace` snapshots (e.g., compaction) are ignored.
211+
//
212+
// `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
213+
214+
let snapshots =
215+
ancestors_between(&self.table_metadata, to_snapshot_id, self.from_snapshot_id)
216+
.filter(|snapshot| {
217+
matches!(
218+
snapshot.summary().operation,
219+
Operation::Append | Operation::Overwrite
220+
)
221+
})
222+
.collect_vec();
223+
let snapshot_ids: HashSet<i64> = snapshots
224+
.iter()
225+
.map(|snapshot| snapshot.snapshot_id())
226+
.collect();
227+
228+
let mut manifest_files = vec![];
229+
for snapshot in snapshots {
230+
let manifest_list = self
231+
.object_cache
232+
.get_manifest_list(&snapshot, &self.table_metadata)
233+
.await?;
234+
for entry in manifest_list.entries() {
235+
if !snapshot_ids.contains(&entry.added_snapshot_id) {
236+
continue;
237+
}
238+
manifest_files.push(entry.clone());
239+
}
240+
}
241+
242+
filter_fn = Some(Arc::new(move |entry: &ManifestEntryRef| {
243+
matches!(entry.status(), ManifestStatus::Added)
244+
&& matches!(entry.data_file().content_type(), DataContentType::Data)
245+
&& (
246+
// Is it possible that the snapshot id here is not contained?
247+
entry.snapshot_id().is_none()
248+
|| snapshot_ids.contains(&entry.snapshot_id().unwrap())
249+
)
250+
}));
251+
252+
manifest_files
253+
} else {
254+
manifest_list.entries().to_vec()
255+
}
256+
};
191257

192258
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
193259
let mut filtered_mfcs = vec![];
194260

195-
for manifest_file in manifest_files {
261+
for manifest_file in &manifest_files {
196262
let tx = if manifest_file.content == ManifestContentType::Deletes {
197263
delete_file_tx.clone()
198264
} else {
@@ -225,6 +291,7 @@ impl PlanContext {
225291
partition_bound_predicate,
226292
tx,
227293
delete_file_idx.clone(),
294+
filter_fn.clone(),
228295
);
229296

230297
filtered_mfcs.push(Ok(mfc));
@@ -239,6 +306,7 @@ impl PlanContext {
239306
partition_filter: Option<Arc<BoundPredicate>>,
240307
sender: Sender<ManifestEntryContext>,
241308
delete_file_index: DeleteFileIndex,
309+
filter_fn: Option<Arc<ManifestEntryFilterFn>>,
242310
) -> ManifestFileContext {
243311
let bound_predicates =
244312
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
@@ -261,6 +329,61 @@ impl PlanContext {
261329
field_ids: self.field_ids.clone(),
262330
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
263331
delete_file_index,
332+
filter_fn,
264333
}
265334
}
266335
}
336+
337+
struct Ancestors {
338+
next: Option<SnapshotRef>,
339+
get_snapshot: Box<dyn Fn(i64) -> Option<SnapshotRef> + Send>,
340+
}
341+
342+
impl Iterator for Ancestors {
343+
type Item = SnapshotRef;
344+
345+
fn next(&mut self) -> Option<Self::Item> {
346+
let snapshot = self.next.take()?;
347+
let result = snapshot.clone();
348+
self.next = snapshot
349+
.parent_snapshot_id()
350+
.and_then(|id| (self.get_snapshot)(id));
351+
Some(result)
352+
}
353+
}
354+
355+
/// Iterate starting from `snapshot` (inclusive) to the root snapshot.
356+
fn ancestors_of(
357+
table_metadata: &TableMetadataRef,
358+
snapshot: i64,
359+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
360+
if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) {
361+
let table_metadata = table_metadata.clone();
362+
Box::new(Ancestors {
363+
next: Some(snapshot.clone()),
364+
get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()),
365+
})
366+
} else {
367+
Box::new(std::iter::empty())
368+
}
369+
}
370+
371+
/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
372+
fn ancestors_between(
373+
table_metadata: &TableMetadataRef,
374+
latest_snapshot_id: i64,
375+
oldest_snapshot_id: Option<i64>,
376+
) -> Box<dyn Iterator<Item = SnapshotRef> + Send> {
377+
let Some(oldest_snapshot_id) = oldest_snapshot_id else {
378+
return Box::new(ancestors_of(table_metadata, latest_snapshot_id));
379+
};
380+
381+
if latest_snapshot_id == oldest_snapshot_id {
382+
return Box::new(std::iter::empty());
383+
}
384+
385+
Box::new(
386+
ancestors_of(table_metadata, latest_snapshot_id)
387+
.take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id),
388+
)
389+
}

crates/iceberg/src/scan/mod.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod cache;
2121
use cache::*;
2222
mod context;
2323
use context::*;
24+
pub use task::*;
2425
mod task;
2526

2627
use std::sync::Arc;
@@ -29,7 +30,6 @@ use arrow_array::RecordBatch;
2930
use futures::channel::mpsc::{Sender, channel};
3031
use futures::stream::BoxStream;
3132
use futures::{SinkExt, StreamExt, TryStreamExt};
32-
pub use task::*;
3333

3434
use crate::arrow::ArrowReaderBuilder;
3535
use crate::delete_file_index::DeleteFileIndex;
@@ -51,6 +51,10 @@ pub struct TableScanBuilder<'a> {
5151
// Defaults to none which means select all columns
5252
column_names: Option<Vec<String>>,
5353
snapshot_id: Option<i64>,
54+
/// Exclusive. Used for incremental scan.
55+
from_snapshot_id: Option<i64>,
56+
/// Inclusive. Used for incremental scan.
57+
to_snapshot_id: Option<i64>,
5458
batch_size: Option<usize>,
5559
case_sensitive: bool,
5660
filter: Option<Predicate>,
@@ -69,6 +73,8 @@ impl<'a> TableScanBuilder<'a> {
6973
table,
7074
column_names: None,
7175
snapshot_id: None,
76+
from_snapshot_id: None,
77+
to_snapshot_id: None,
7278
batch_size: None,
7379
case_sensitive: true,
7480
filter: None,
@@ -130,6 +136,18 @@ impl<'a> TableScanBuilder<'a> {
130136
self
131137
}
132138

139+
/// Set the starting snapshot id (exclusive) for incremental scan.
140+
pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self {
141+
self.from_snapshot_id = Some(from_snapshot_id);
142+
self
143+
}
144+
145+
/// Set the ending snapshot id (inclusive) for incremental scan.
146+
pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self {
147+
self.to_snapshot_id = Some(to_snapshot_id);
148+
self
149+
}
150+
133151
/// Sets the concurrency limit for both manifest files and manifest
134152
/// entries for this scan
135153
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
@@ -185,6 +203,25 @@ impl<'a> TableScanBuilder<'a> {
185203

186204
/// Build the table scan.
187205
pub fn build(self) -> Result<TableScan> {
206+
// Validate that we have either a snapshot scan or an incremental scan configuration
207+
if self.from_snapshot_id.is_some() || self.to_snapshot_id.is_some() {
208+
// For incremental scan, we need to_snapshot_id to be set. from_snapshot_id is optional.
209+
if self.to_snapshot_id.is_none() {
210+
return Err(Error::new(
211+
ErrorKind::DataInvalid,
212+
"Incremental scan requires to_snapshot_id to be set",
213+
));
214+
}
215+
216+
// snapshot_id should not be set for incremental scan
217+
if self.snapshot_id.is_some() {
218+
return Err(Error::new(
219+
ErrorKind::DataInvalid,
220+
"snapshot_id should not be set for incremental scan. Use from_snapshot_id and to_snapshot_id instead.",
221+
));
222+
}
223+
}
224+
188225
let snapshot = match self.snapshot_id {
189226
Some(snapshot_id) => self
190227
.table
@@ -214,7 +251,6 @@ impl<'a> TableScanBuilder<'a> {
214251
current_snapshot_id.clone()
215252
}
216253
};
217-
218254
let schema = snapshot.schema(self.table.metadata())?;
219255

220256
// Check that all column names exist in the schema.
@@ -284,6 +320,8 @@ impl<'a> TableScanBuilder<'a> {
284320
snapshot_bound_predicate: snapshot_bound_predicate.map(Arc::new),
285321
object_cache: self.table.object_cache(),
286322
field_ids: Arc::new(field_ids),
323+
from_snapshot_id: self.from_snapshot_id,
324+
to_snapshot_id: self.to_snapshot_id,
287325
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
288326
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
289327
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
@@ -360,7 +398,7 @@ impl TableScan {
360398
manifest_entry_data_ctx_tx,
361399
delete_file_idx.clone(),
362400
manifest_entry_delete_ctx_tx,
363-
)?;
401+
).await?;
364402

365403
let mut channel_for_manifest_error = file_scan_task_tx.clone();
366404

@@ -390,7 +428,7 @@ impl TableScan {
390428
spawn(async move {
391429
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
392430
})
393-
.await
431+
.await
394432
},
395433
)
396434
.await;
@@ -413,7 +451,7 @@ impl TableScan {
413451
spawn(async move {
414452
Self::process_data_manifest_entry(manifest_entry_context, tx).await
415453
})
416-
.await
454+
.await
417455
},
418456
)
419457
.await;

0 commit comments

Comments
 (0)