Skip to content

Commit 1d4b1d7

Browse files
authored
feat: support overwrite files action (#63)
* fix: fix snapshot for entries * refactor: refactor starting_sequence_number * feat: OverwriteFiles Action * fix: fix compile * fix: add more test * typo * fix: fix ut * fix: fix check
1 parent 5125721 commit 1d4b1d7

File tree

10 files changed

+1940
-71
lines changed

10 files changed

+1940
-71
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl SnapshotProduceOperation for FastAppendOperation {
193193
.tx
194194
.current_table
195195
.metadata()
196-
.current_snapshot()
196+
.snapshot_for_ref(snapshot_produce.target_branch())
197197
else {
198198
return Ok(vec![]);
199199
};

crates/iceberg/src/transaction/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module contains transaction api.
1919
2020
mod append;
21+
mod overwrite_files;
2122
mod remove_snapshots;
2223
mod rewrite_files;
2324
mod snapshot;
@@ -37,6 +38,7 @@ use crate::error::Result;
3738
use crate::spec::FormatVersion;
3839
use crate::table::Table;
3940
use crate::transaction::append::{FastAppendAction, MergeAppendAction};
41+
use crate::transaction::overwrite_files::OverwriteFilesAction;
4042
use crate::transaction::sort_order::ReplaceSortOrderAction;
4143
use crate::TableUpdate::UpgradeFormatVersion;
4244
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
@@ -255,6 +257,22 @@ impl<'a> Transaction<'a> {
255257
Ok(self)
256258
}
257259

260+
/// Creates an overwrite files action.
261+
pub fn overwrite_files(
262+
self,
263+
commit_uuid: Option<Uuid>,
264+
key_metadata: Vec<u8>,
265+
) -> Result<OverwriteFilesAction<'a>> {
266+
let snapshot_id = self.generate_unique_snapshot_id();
267+
OverwriteFilesAction::new(
268+
self,
269+
snapshot_id,
270+
commit_uuid.unwrap_or_else(Uuid::now_v7),
271+
key_metadata,
272+
HashMap::new(),
273+
)
274+
}
275+
258276
/// Commit transaction.
259277
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
260278
let table_commit = TableCommit::builder()
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use uuid::Uuid;
21+
22+
use super::snapshot::{
23+
DefaultManifestProcess, MergeManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
24+
};
25+
use super::{
26+
Transaction, MANIFEST_MERGE_ENABLED, MANIFEST_MERGE_ENABLED_DEFAULT, MANIFEST_MIN_MERGE_COUNT,
27+
MANIFEST_MIN_MERGE_COUNT_DEFAULT, MANIFEST_TARGET_SIZE_BYTES,
28+
MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
29+
};
30+
use crate::error::Result;
31+
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
32+
use crate::transaction::rewrite_files::RewriteFilesOperation;
33+
34+
pub const USE_STARTING_SEQUENCE_NUMBER: &str = "use-starting-sequence-number";
35+
pub const USE_STARTING_SEQUENCE_NUMBER_DEFAULT: bool = true;
36+
37+
/// Transaction action for rewriting files.
38+
pub struct OverwriteFilesAction<'a> {
39+
snapshot_produce_action: SnapshotProduceAction<'a>,
40+
target_size_bytes: u32,
41+
min_count_to_merge: u32,
42+
merge_enabled: bool,
43+
}
44+
45+
struct OverwriteFilesOperation {
46+
inner: RewriteFilesOperation,
47+
}
48+
49+
impl<'a> OverwriteFilesAction<'a> {
50+
pub fn new(
51+
tx: Transaction<'a>,
52+
snapshot_id: i64,
53+
commit_uuid: Uuid,
54+
key_metadata: Vec<u8>,
55+
snapshot_properties: HashMap<String, String>,
56+
) -> Result<Self> {
57+
let target_size_bytes: u32 = tx
58+
.current_table
59+
.metadata()
60+
.properties()
61+
.get(MANIFEST_TARGET_SIZE_BYTES)
62+
.and_then(|s| s.parse().ok())
63+
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
64+
65+
let min_count_to_merge: u32 = tx
66+
.current_table
67+
.metadata()
68+
.properties()
69+
.get(MANIFEST_MIN_MERGE_COUNT)
70+
.and_then(|s| s.parse().ok())
71+
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
72+
73+
let merge_enabled = tx
74+
.current_table
75+
.metadata()
76+
.properties()
77+
.get(MANIFEST_MERGE_ENABLED)
78+
.and_then(|s| s.parse().ok())
79+
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);
80+
81+
let snapshot_produce_action = SnapshotProduceAction::new(
82+
tx,
83+
snapshot_id,
84+
key_metadata,
85+
commit_uuid,
86+
snapshot_properties,
87+
)
88+
.unwrap();
89+
90+
Ok(Self {
91+
snapshot_produce_action,
92+
target_size_bytes,
93+
min_count_to_merge,
94+
merge_enabled,
95+
})
96+
}
97+
98+
/// Add data files to the snapshot.
99+
100+
pub fn add_data_files(
101+
mut self,
102+
data_files: impl IntoIterator<Item = DataFile>,
103+
) -> Result<Self> {
104+
self.snapshot_produce_action.add_data_files(data_files)?;
105+
Ok(self)
106+
}
107+
108+
/// Add remove files to the snapshot.
109+
pub fn delete_files(
110+
mut self,
111+
remove_data_files: impl IntoIterator<Item = DataFile>,
112+
) -> Result<Self> {
113+
self.snapshot_produce_action
114+
.delete_files(remove_data_files)?;
115+
Ok(self)
116+
}
117+
118+
/// Finished building the action and apply it to the transaction.
119+
pub async fn apply(self) -> Result<Transaction<'a>> {
120+
let inner = RewriteFilesOperation;
121+
122+
if self.merge_enabled {
123+
let process =
124+
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);
125+
self.snapshot_produce_action
126+
.apply(OverwriteFilesOperation { inner }, process)
127+
.await
128+
} else {
129+
self.snapshot_produce_action
130+
.apply(OverwriteFilesOperation { inner }, DefaultManifestProcess)
131+
.await
132+
}
133+
}
134+
135+
pub fn with_starting_sequence_number(mut self, seq: i64) -> Result<Self> {
136+
// If the compaction should use the sequence number of the snapshot at compaction start time for
137+
// new data files, instead of using the sequence number of the newly produced snapshot.
138+
// This avoids commit conflicts with updates that add newer equality deletes at a higher sequence number.
139+
let use_starting_sequence_number = self
140+
.snapshot_produce_action
141+
.tx
142+
.current_table
143+
.metadata()
144+
.properties()
145+
.get(USE_STARTING_SEQUENCE_NUMBER)
146+
.and_then(|s| s.parse().ok())
147+
.unwrap_or(USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
148+
149+
if !use_starting_sequence_number {
150+
return Err(crate::error::Error::new(
151+
crate::ErrorKind::Unexpected,
152+
"Cannot set data file sequence number when use-starting-sequence-number is false"
153+
.to_string(),
154+
));
155+
}
156+
157+
self.snapshot_produce_action
158+
.set_new_data_file_sequence_number(seq);
159+
160+
Ok(self)
161+
}
162+
163+
pub fn with_starting_sequence_number_from_branch(mut self, branch: &str) -> Result<Self> {
164+
// If the compaction should use the sequence number of the snapshot at compaction start time for
165+
// new data files, instead of using the sequence number of the newly produced snapshot.
166+
// This avoids commit conflicts with updates that add newer equality deletes at a higher sequence number.
167+
let use_starting_sequence_number = self
168+
.snapshot_produce_action
169+
.tx
170+
.current_table
171+
.metadata()
172+
.properties()
173+
.get(USE_STARTING_SEQUENCE_NUMBER)
174+
.and_then(|s| s.parse().ok())
175+
.unwrap_or(USE_STARTING_SEQUENCE_NUMBER_DEFAULT);
176+
177+
if !use_starting_sequence_number {
178+
return Err(crate::error::Error::new(
179+
crate::ErrorKind::Unexpected,
180+
"Cannot set data file sequence number when use-starting-sequence-number is false"
181+
.to_string(),
182+
));
183+
}
184+
185+
if let Some(snapshot) = self
186+
.snapshot_produce_action
187+
.tx
188+
.current_table
189+
.metadata()
190+
.snapshot_for_ref(branch)
191+
{
192+
self.snapshot_produce_action
193+
.set_new_data_file_sequence_number(snapshot.sequence_number());
194+
}
195+
196+
Ok(self)
197+
}
198+
199+
pub fn with_to_branch(mut self, to_branch: String) -> Self {
200+
self.snapshot_produce_action.set_target_branch(to_branch);
201+
self
202+
}
203+
}
204+
205+
impl SnapshotProduceOperation for OverwriteFilesOperation {
206+
fn operation(&self) -> Operation {
207+
Operation::Overwrite
208+
}
209+
210+
async fn delete_entries(
211+
&self,
212+
snapshot_produce: &SnapshotProduceAction<'_>,
213+
) -> Result<Vec<ManifestEntry>> {
214+
self.inner.delete_entries(snapshot_produce).await
215+
}
216+
217+
async fn existing_manifest(
218+
&self,
219+
snapshot_produce: &mut SnapshotProduceAction<'_>,
220+
) -> Result<Vec<ManifestFile>> {
221+
self.inner.existing_manifest(snapshot_produce).await
222+
}
223+
}

crates/iceberg/src/transaction/remove_snapshots.rs

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -252,15 +252,9 @@ impl<'a> RemoveSnapshotAction<'a> {
252252
)?;
253253
}
254254

255-
self.tx.apply(vec![], vec![
256-
TableRequirement::UuidMatch {
257-
uuid: self.tx.current_table.metadata().uuid(),
258-
},
259-
TableRequirement::RefSnapshotIdMatch {
260-
r#ref: MAIN_BRANCH.to_string(),
261-
snapshot_id: self.tx.current_table.metadata().current_snapshot_id(),
262-
},
263-
])?;
255+
self.tx.apply(vec![], vec![TableRequirement::UuidMatch {
256+
uuid: self.tx.current_table.metadata().uuid(),
257+
}])?;
264258

265259
Ok(RemoveSnapshotApplyResult {
266260
tx: self.tx,
@@ -460,7 +454,7 @@ mod tests {
460454
use std::io::BufReader;
461455

462456
use crate::io::FileIOBuilder;
463-
use crate::spec::{TableMetadata, MAIN_BRANCH};
457+
use crate::spec::TableMetadata;
464458
use crate::table::Table;
465459
use crate::transaction::Transaction;
466460
use crate::{TableIdent, TableRequirement};
@@ -495,15 +489,9 @@ mod tests {
495489
assert_eq!(4, tx.updates.len());
496490

497491
assert_eq!(
498-
vec![
499-
TableRequirement::UuidMatch {
500-
uuid: tx.current_table.metadata().uuid()
501-
},
502-
TableRequirement::RefSnapshotIdMatch {
503-
r#ref: MAIN_BRANCH.to_string(),
504-
snapshot_id: tx.current_table.metadata().current_snapshot_id
505-
}
506-
],
492+
vec![TableRequirement::UuidMatch {
493+
uuid: tx.current_table.metadata().uuid()
494+
},],
507495
tx.requirements
508496
);
509497
}
@@ -514,15 +502,9 @@ mod tests {
514502
assert_eq!(3, tx.updates.len());
515503

516504
assert_eq!(
517-
vec![
518-
TableRequirement::UuidMatch {
519-
uuid: tx.current_table.metadata().uuid()
520-
},
521-
TableRequirement::RefSnapshotIdMatch {
522-
r#ref: MAIN_BRANCH.to_string(),
523-
snapshot_id: tx.current_table.metadata().current_snapshot_id
524-
}
525-
],
505+
vec![TableRequirement::UuidMatch {
506+
uuid: tx.current_table.metadata().uuid()
507+
},],
526508
tx.requirements
527509
);
528510
}
@@ -538,15 +520,9 @@ mod tests {
538520
.unwrap();
539521
assert_eq!(0, tx.updates.len());
540522
assert_eq!(
541-
vec![
542-
TableRequirement::UuidMatch {
543-
uuid: tx.current_table.metadata().uuid()
544-
},
545-
TableRequirement::RefSnapshotIdMatch {
546-
r#ref: MAIN_BRANCH.to_string(),
547-
snapshot_id: tx.current_table.metadata().current_snapshot_id
548-
}
549-
],
523+
vec![TableRequirement::UuidMatch {
524+
uuid: tx.current_table.metadata().uuid()
525+
},],
550526
tx.requirements
551527
);
552528
}

0 commit comments

Comments
 (0)