-
Notifications
You must be signed in to change notification settings - Fork 319
feat: support merge append action #902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
crates/iceberg/src/spec/manifest.rs
Outdated
&self.data_file | ||
} | ||
|
||
/// get file sequence number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// get file sequence number | |
/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall just some small nits. implementing the defaults values for new fields can be done after this pr (#737)
// Enable merge append for table | ||
let tx = Transaction::new(&table); | ||
table = tx | ||
.set_properties(HashMap::from([ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also try adding a test here for the MANIFEST_TARGET_SIZE_BYTES
property?
crates/iceberg/src/transaction.rs
Outdated
/// Finished building the action and apply it to the transaction. | ||
pub async fn apply(self) -> Result<Transaction<'a>> { | ||
if self.merge_enabled { | ||
let process = MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let process = MergeManifsetProcess { | |
let process = MergeManifestProcess { |
crates/iceberg/src/transaction.rs
Outdated
} | ||
} | ||
|
||
impl ManifestProcess for MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl ManifestProcess for MergeManifsetProcess { | |
impl ManifestProcess for MergeManifestProcess { |
crates/iceberg/src/transaction.rs
Outdated
struct MergeManifsetProcess { | ||
target_size_bytes: u32, | ||
min_count_to_merge: u32, | ||
} | ||
|
||
impl MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct MergeManifsetProcess { | |
target_size_bytes: u32, | |
min_count_to_merge: u32, | |
} | |
impl MergeManifsetProcess { | |
struct MergeManifestProcess { | |
target_size_bytes: u32, | |
min_count_to_merge: u32, | |
} | |
impl MergeManifestProcess { |
crates/iceberg/src/transaction.rs
Outdated
for manifset_file in manifest_bin { | ||
let manifest_file = manifset_file.load_manifest(&file_io).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for manifset_file in manifest_bin { | |
let manifest_file = manifset_file.load_manifest(&file_io).await?; | |
for manifest_file in manifest_bin { | |
let manifest_file = manifest_file.load_manifest(&file_io).await?; |
crates/iceberg/src/transaction.rs
Outdated
Ok(merged_bins.into_iter().flatten().collect()) | ||
} | ||
|
||
async fn merge_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn merge_manifeset<'a>( | |
async fn merge_manifest<'a>( |
crates/iceberg/src/transaction.rs
Outdated
impl ManifestProcess for DefaultManifestProcess { | ||
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile> { | ||
manifests | ||
async fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn process_manifeset<'a>( | |
async fn process_manifest<'a>( |
crates/iceberg/src/transaction.rs
Outdated
|
||
trait ManifestProcess: Send + Sync { | ||
fn process_manifeset(&self, manifests: Vec<ManifestFile>) -> Vec<ManifestFile>; | ||
fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn process_manifeset<'a>( | |
fn process_manifest<'a>( |
crates/iceberg/src/transaction.rs
Outdated
} | ||
|
||
impl ManifestProcess for MergeManifsetProcess { | ||
async fn process_manifeset<'a>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn process_manifeset<'a>( | |
async fn process_manifest<'a>( |
crates/iceberg/src/transaction.rs
Outdated
return Ok(manifests); | ||
} | ||
|
||
let first_manifest = manifests[0].clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be an expensive clone (not wrapped in arc)? is there a way to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn merge_group<'a>(
&self,
snapshot_produce: &mut SnapshotProduceAction<'a>,
first_manifest: &ManifestFile,
group_manifests: Vec<ManifestFile>,
)
This function will take ownership of the manifests, so we have to clone the first one out. Looks like hard to avoid it. Welcome for any suggestion to avoid this.
2e6c75e
to
82d73d4
Compare
Thanks for review! @jonathanc-n @kevinjqliu. I have refined the code and fixed the test. I think this PR is ready to review again. also cc @Fokko @liurenjie1024 @Xuanwo @sdd |
// For this first manifest, it will be pack with the first additional manifest and | ||
// the count(2) is less than the min merge count(4), so these two will not merge. | ||
// See detail: `MergeManifestProcess::merge_group` | ||
if idx == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether it's expected behaviour. This manifest will not be merged, so the status will remain Added
. cc @Fokko @kevinjqliu
## Which issue does this PR close? This PR add new_manifest_writer in SnapshotProducer and this function can be used to create different manifset writer for different action in the future, e.g. MergeAppend #902 ## What changes are included in this PR? ## Are these changes tested? Co-authored-by: ZENOTME <[email protected]>
261cbfd
to
cc00f8e
Compare
I have fix this PR to adapt new transcaction framework and ready to review. Recently I will keep up this PR. cc @liurenjie1024 @Xuanwo @Fokko @kevinjqliu @CTTY @jonathanc-n |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ZENOTME , thanks for this awesome work! LGTM overall, I've left some questions in the comments
.metadata() | ||
.properties() | ||
.get(MANIFEST_TARGET_SIZE_BYTES) | ||
.and_then(|s| s.parse().ok()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should expose the parse error here
If users set MANIFEST_TARGET_SIZE_BYTES=1024+
by mistake, ok()
here will silently fall back to the default value. And it will be very hard to find out what happened
I've used something like below instead in another PR:
let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) {
Some(value_str) => value_str.parse::<u64>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Invalid value for commit.retry.total-timeout-ms",
)
.with_source(e)
})?,
None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
};
It would be so much easier if we had something like a PropertyUtil
to define the orthogonal way of getting and parsing properties
} | ||
|
||
struct FastAppendOperation; | ||
pub(crate) struct FastAppendOperation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe we should just put MergeAppendAction under append.rs
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will make append.rs file too big I think.
|
||
if self.merge_enabled { | ||
snapshot_producer | ||
.commit(FastAppendOperation, MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we change FastAppendOperation
to AppendOperation
?
.merge_manifest(snapshot_produce, unmerg_data_manifests) | ||
.await? | ||
}; | ||
data_manifests.extend(unmerge_delete_manifest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I'm a bit confused while reading this, could you please help me understand:
- why do we care about
delete_manifest
? I think MergeAppend should not introduce delete manifest but maybe I'm wrong - Why should we leave
delete_manifest
unmerged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we care about delete_manifest? I think MergeAppend should not introduce delete manifest but maybe I'm wrong
Yes, we will not process delete manifest. In here, we:
- fitler out all data manifest and merge them
- keep delete manifest but not process them (We can't drop them)
- concat processed data manifest and delete manifest and return them
Why should we leave delete_manifest unmerged?
This implementation refer from pyiceberg: https://github.com/apache/iceberg-python/blob/e9c025318787bfd34b98a3fc41544e0f168904ba/pyiceberg/table/update/snapshot.py#L553. I guess the reason is that in MergeAppend we only append data file, so we assume the delete file will not change so we don't need to process them.
But I notice that here is different from iceberg-java. In iceberg-java, merge append action use MergeSnapshotProducer. And MergeSnapshotProducer will merge both data manifest and delete manifest.
}) | ||
} | ||
|
||
pub mod bin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, an iterable packer with lookback would be more memory efficient. This can be completed as a follow-up
pub fn file_sequence_number(&self) -> Option<i64> { | ||
self.file_sequence_number | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In merge_append_test.rs
|
||
/// Add data files to the snapshot. | ||
pub fn add_data_files( | ||
&mut self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK add_data_files on FastAppendAction consumes mut self, not mut &self -- is there a reason why we're not keeping these consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jdcasale I don't noticed that actually. Both way looks good me, but I prefer &mut self because add_data_files actually more like push_back
, if we consume mut self which means that the user always need to get self back after add_data_files like following:
let action = action.add_data_files(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with you in the abstract, but most of the other action methods work this way, and I'd argue that it's more valuable to be consistent across the API than it is to provide the optimally-ergonomic individual method here.
Another example is UpdateProperties action (here) -- set() and remove() take mut self and require the
let action = action.set(...)
pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me make them consistent first and see if we can refine them at all later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have fix it cc @jdcasale
- rename FastAppendOperation to AppendOperation - use iterable input for pack to be more memory efficient
1b8ec03
to
de57b76
Compare
de57b76
to
af5c4e7
Compare
} | ||
} | ||
|
||
struct MergeManifsetProcess { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct MergeManifsetProcess { | |
struct MergeManifestProcess { |
#[tokio::test] | ||
async fn test_append_data_file() { | ||
let fixture = get_shared_containers(); | ||
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail due to the recent RestCatalogBuilder change, you can switch to
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); | |
let rest_catalog = RestCatalogBuilder::default() | |
.load("rest", fixture.catalog_config.clone()) | |
.await | |
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CTTY!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like all test use this way. Maybe we can modify them all at another pr.
- refine comment
fe7295b
to
59e52f6
Compare
This PR complete #736