Skip to content

Commit 5125721

Browse files
authored
feat: support write to branch (#62)
* feat: support to branch * fix: fix ref name * fix: current_snapshot_id * refactor: refactor interface * fmt
1 parent e1b5835 commit 5125721

File tree

4 files changed

+286
-7
lines changed

4 files changed

+286
-7
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ impl<'a> FastAppendAction<'a> {
6868
self
6969
}
7070

71+
pub fn with_to_branch(mut self, to_branch: String) -> Self {
72+
self.snapshot_produce_action.set_target_branch(to_branch);
73+
self
74+
}
75+
7176
/// Add data files to the snapshot.
7277
pub fn add_data_files(
7378
&mut self,
@@ -262,6 +267,11 @@ impl<'a> MergeAppendAction<'a> {
262267
})
263268
}
264269

270+
pub fn with_to_branch(mut self, to_branch: String) -> Result<MergeAppendAction<'a>> {
271+
self.snapshot_produce_action.set_target_branch(to_branch);
272+
Ok(self)
273+
}
274+
265275
/// Add data files to the snapshot.
266276
pub fn add_data_files(
267277
&mut self,
@@ -384,6 +394,94 @@ mod tests {
384394
assert_eq!(data_file, *manifest.entries()[0].data_file());
385395
}
386396

397+
#[tokio::test]
398+
async fn test_fast_append_with_branch() {
399+
let table = make_v2_minimal_table();
400+
let tx = Transaction::new(&table);
401+
402+
// Test creating new branch
403+
let branch_name = "test_branch";
404+
let mut action = tx
405+
.fast_append(None, None, vec![])
406+
.unwrap()
407+
.with_to_branch(branch_name.to_string());
408+
409+
let data_file = DataFileBuilder::default()
410+
.partition_spec_id(0)
411+
.content(DataContentType::Data)
412+
.file_path("test/3.parquet".to_string())
413+
.file_format(DataFileFormat::Parquet)
414+
.file_size_in_bytes(100)
415+
.record_count(1)
416+
.partition_spec_id(table.metadata().default_partition_spec_id())
417+
.partition(Struct::from_iter([Some(Literal::long(300))]))
418+
.build()
419+
.unwrap();
420+
action.add_data_files(vec![data_file.clone()]).unwrap();
421+
let tx = action.apply().await.unwrap();
422+
423+
// Check branch reference was created
424+
assert!(
425+
matches!(&tx.updates[1], TableUpdate::SetSnapshotRef { ref_name, .. }
426+
if ref_name == branch_name)
427+
);
428+
429+
// Test updating existing branch
430+
let tx2 = Transaction::new(&table);
431+
let mut action2 = tx2
432+
.fast_append(None, None, vec![])
433+
.unwrap()
434+
.with_to_branch(branch_name.to_string());
435+
action2.add_data_files(vec![data_file.clone()]).unwrap();
436+
let tx2 = action2.apply().await.unwrap();
437+
438+
// Check requirements contain branch validation
439+
assert!(tx2.requirements.iter().any(
440+
|r| matches!(r, TableRequirement::RefSnapshotIdMatch { r#ref, .. }
441+
if r#ref == branch_name)
442+
));
443+
}
444+
445+
#[tokio::test]
446+
async fn test_branch_operations() {
447+
let table = make_v2_minimal_table();
448+
449+
// Test creating new branch
450+
let branch_name = "test_branch";
451+
let tx = Transaction::new(&table);
452+
let mut action = tx
453+
.fast_append(None, None, vec![])
454+
.unwrap()
455+
.with_to_branch(branch_name.to_string());
456+
457+
let data_file = DataFileBuilder::default()
458+
.partition_spec_id(0)
459+
.content(DataContentType::Data)
460+
.file_path("test/3.parquet".to_string())
461+
.file_format(DataFileFormat::Parquet)
462+
.file_size_in_bytes(100)
463+
.record_count(1)
464+
.partition_spec_id(table.metadata().default_partition_spec_id())
465+
.partition(Struct::from_iter([Some(Literal::long(300))]))
466+
.build()
467+
.unwrap();
468+
469+
action.add_data_files(vec![data_file.clone()]).unwrap();
470+
let tx = action.apply().await.unwrap();
471+
472+
// Verify branch was created
473+
assert!(matches!(
474+
&tx.updates[1],
475+
TableUpdate::SetSnapshotRef { ref_name, .. } if ref_name == branch_name
476+
));
477+
478+
// Verify requirements
479+
assert!(tx.requirements.iter().any(
480+
|r| matches!(r, TableRequirement::RefSnapshotIdMatch { r#ref, .. }
481+
if r#ref == branch_name)
482+
));
483+
}
484+
387485
#[tokio::test]
388486
async fn test_add_existing_parquet_files_to_unpartitioned_table() {
389487
let mut fixture = TableTestFixture::new_unpartitioned();

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ impl<'a> RewriteFilesAction<'a> {
118118
}
119119

120120
/// Add data files to the snapshot.
121-
122121
pub fn add_data_files(
123122
mut self,
124123
data_files: impl IntoIterator<Item = DataFile>,
@@ -137,6 +136,11 @@ impl<'a> RewriteFilesAction<'a> {
137136
Ok(self)
138137
}
139138

139+
pub fn with_to_branch(mut self, to_branch: String) -> Result<Self> {
140+
self.snapshot_produce_action.set_target_branch(to_branch);
141+
Ok(self)
142+
}
143+
140144
/// Finished building the action and apply it to the transaction.
141145
pub async fn apply(self) -> Result<Transaction<'a>> {
142146
if self.merge_enabled {

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub(crate) struct SnapshotProduceAction<'a> {
9191
manifest_counter: RangeFrom<u64>,
9292

9393
new_data_file_sequence_number: Option<i64>,
94+
95+
target_branch: String,
9496
}
9597

9698
impl<'a> SnapshotProduceAction<'a> {
@@ -115,6 +117,7 @@ impl<'a> SnapshotProduceAction<'a> {
115117
manifest_counter: (0..),
116118
key_metadata,
117119
new_data_file_sequence_number: None,
120+
target_branch: MAIN_BRANCH.to_string(),
118121
})
119122
}
120123

@@ -426,28 +429,39 @@ impl<'a> SnapshotProduceAction<'a> {
426429
let new_manifests = self
427430
.manifest_file(&snapshot_produce_operation, &process)
428431
.await?;
432+
429433
let next_seq_num = self.tx.current_table.metadata().next_sequence_number();
430434

431435
let summary = self.summary(&snapshot_produce_operation);
432436

433437
let manifest_list_path = self.generate_manifest_list_file_path(0);
434438

439+
let parent_snapshot = self
440+
.tx
441+
.current_table
442+
.metadata()
443+
.snapshot_for_ref(&self.target_branch);
444+
445+
let parent_snapshot_id = parent_snapshot
446+
.map(|s| Some(s.snapshot_id()))
447+
.unwrap_or(None);
448+
435449
let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() {
436450
FormatVersion::V1 => ManifestListWriter::v1(
437451
self.tx
438452
.current_table
439453
.file_io()
440454
.new_output(manifest_list_path.clone())?,
441455
self.snapshot_id,
442-
self.tx.current_table.metadata().current_snapshot_id(),
456+
parent_snapshot_id,
443457
),
444458
FormatVersion::V2 => ManifestListWriter::v2(
445459
self.tx
446460
.current_table
447461
.file_io()
448462
.new_output(manifest_list_path.clone())?,
449463
self.snapshot_id,
450-
self.tx.current_table.metadata().current_snapshot_id(),
464+
parent_snapshot_id,
451465
next_seq_num,
452466
),
453467
};
@@ -458,7 +472,7 @@ impl<'a> SnapshotProduceAction<'a> {
458472
let new_snapshot = Snapshot::builder()
459473
.with_manifest_list(manifest_list_path)
460474
.with_snapshot_id(self.snapshot_id)
461-
.with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id())
475+
.with_parent_snapshot_id(parent_snapshot_id)
462476
.with_sequence_number(next_seq_num)
463477
.with_summary(summary)
464478
.with_schema_id(self.tx.current_table.metadata().current_schema_id())
@@ -471,7 +485,7 @@ impl<'a> SnapshotProduceAction<'a> {
471485
snapshot: new_snapshot,
472486
},
473487
TableUpdate::SetSnapshotRef {
474-
ref_name: MAIN_BRANCH.to_string(),
488+
ref_name: self.target_branch.clone(),
475489
reference: SnapshotReference::new(
476490
self.snapshot_id,
477491
SnapshotRetention::branch(None, None, None),
@@ -483,8 +497,8 @@ impl<'a> SnapshotProduceAction<'a> {
483497
uuid: self.tx.current_table.metadata().uuid(),
484498
},
485499
TableRequirement::RefSnapshotIdMatch {
486-
r#ref: MAIN_BRANCH.to_string(),
487-
snapshot_id: self.tx.current_table.metadata().current_snapshot_id(),
500+
r#ref: self.target_branch.clone(),
501+
snapshot_id: parent_snapshot_id,
488502
},
489503
],
490504
)?;
@@ -494,6 +508,10 @@ impl<'a> SnapshotProduceAction<'a> {
494508
pub fn set_new_data_file_sequence_number(&mut self, new_data_file_sequence_number: i64) {
495509
self.new_data_file_sequence_number = Some(new_data_file_sequence_number);
496510
}
511+
512+
pub fn set_target_branch(&mut self, target_branch: String) {
513+
self.target_branch = target_branch;
514+
}
497515
}
498516

499517
pub(crate) struct MergeManifestProcess {
@@ -519,6 +537,7 @@ impl ManifestProcess for MergeManifestProcess {
519537
let (unmerge_data_manifest, unmerge_delete_manifest): (Vec<_>, Vec<_>) = manifests
520538
.into_iter()
521539
.partition(|manifest| matches!(manifest.content, ManifestContentType::Data));
540+
522541
let mut data_manifest = {
523542
let manifest_merge_manager = MergeManifestManager::new(
524543
self.target_size_bytes,
@@ -529,6 +548,7 @@ impl ManifestProcess for MergeManifestProcess {
529548
.merge_manifest(snapshot_produce, unmerge_data_manifest)
530549
.await?
531550
};
551+
532552
data_manifest.extend(unmerge_delete_manifest);
533553
Ok(data_manifest)
534554
}

0 commit comments

Comments
 (0)