-
Notifications
You must be signed in to change notification settings - Fork 287
feat: add count() pushdown optimization in Iceberg datasource #5029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR adds count pushdown optimization support to Iceberg data sources by implementing the supports_count_pushdown()
method in the IcebergScanOperator
class. The method returns True
to indicate that this scan operator can handle count aggregations efficiently at the scan level rather than requiring data materialization.
Count pushdown is a query optimization technique that allows aggregate operations like COUNT()
to be executed directly by the data source rather than loading all data into memory first. This is particularly valuable for Iceberg tables since they maintain rich metadata that can be used to compute counts without scanning actual data files.
The implementation follows the established pattern in Daft's codebase where the base ScanOperator
class provides a default supports_count_pushdown()
method returning False
, and specific scan operators override it to return True
when they support the optimization. This is consistent with other scan operators like LanceDBScanOperator
that have already implemented this interface. The change integrates seamlessly with Daft's query planning infrastructure, which can now recognize that Iceberg scans support count pushdown and optimize queries accordingly.
PR Description Notes:
- The PR description template is mostly empty and lacks details about the specific changes made
- No related issues are mentioned despite this being part of a broader optimization effort
- The checklist items remain unchecked
Confidence score: 4/5
- This PR is safe to merge with minimal risk as it's a simple additive optimization feature
- Score reflects the straightforward nature of the change and consistency with existing patterns
- Minor attention needed for the missing newline at EOF in iceberg_scan.py
1 file reviewed, 1 comment
4baa68a
to
cad91bd
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5029 +/- ##
==========================================
+ Coverage 73.81% 74.27% +0.46%
==========================================
Files 957 957
Lines 124278 123240 -1038
==========================================
- Hits 91740 91542 -198
+ Misses 32538 31698 -840
🚀 New features to boost your workflow:
|
Hey @huleilei what's the latest update on this PR? Thank you! |
5f4a9ce
to
57bb363
Compare
cb75b72
to
45667e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a count pushdown optimization for Iceberg tables that allows COUNT(*)
queries to leverage Iceberg's manifest metadata instead of scanning actual data files. The optimization works by reading record counts directly from Iceberg's manifest files and aggregating them to produce the final count result.
The implementation consists of two main components:
-
Core optimization logic (
daft/io/iceberg/iceberg_scan.py
): Adds a_iceberg_count_result_function
helper that constructs count results from metadata, modifiesto_scan_tasks
to detect count operations and use metadata-based counting, implements_create_count_scan_task
for count-specific scan tasks, and adds_calculate_total_rows_from_metadata
to aggregate row counts from manifest files while accounting for delete files in Merge-on-Read scenarios. -
Comprehensive test suite (
tests/integration/iceberg/test_iceberg_reads.py
): AddsTestIcebergCountPushdown
class with 8 test methods covering basic count operations, partitioned tables, filtered queries (where pushdown should be disabled), column selection, limit operations, snapshot consistency, and tables with MOR deletes. The tests verify pushdown activation by checking for the presence of_iceberg_count_result_function
in query execution plans.
The optimization integrates with Daft's existing pushdown framework by implementing the required support methods (supports_count_mode
and multiline_display
) and follows the established pattern of using metadata to avoid expensive I/O operations. This change provides significant performance improvements for count queries on large Iceberg tables.
Confidence score: 3/5
- This PR introduces a useful optimization but has potential accuracy issues with delete file handling that could affect correctness
- Score reflects concerns about the simplistic delete file subtraction logic in Merge-on-Read scenarios which may not accurately handle overlapping or complex delete patterns
- Pay close attention to the delete file handling logic in
_calculate_total_rows_from_metadata
and consider testing with complex MOR scenarios
2 files reviewed, 1 comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This review covers only the changes made since the last review, not the entire PR. The recent changes implement count pushdown optimization for Iceberg datasources, which is a significant performance improvement that allows COUNT(*)
operations to execute by reading metadata from manifest files instead of scanning actual data files.
The implementation adds sophisticated logic to the IcebergScanOperator
class that can calculate row counts from Iceberg's metadata while properly handling complex deletion scenarios. This includes support for position deletes (which mark specific row positions as deleted) and equality deletes (which remove rows matching certain values), both using Iceberg's sequence number system to determine deletion validity.
The changes also consolidate the test suite by replacing multiple individual test methods with a single parameterized test that covers all supported table types, expanding test coverage while reducing code duplication. New test tables were added to validate count accuracy in complex delete scenarios, including overlapping deletes and mixed delete types.
This optimization fits well within Daft's broader pushdown optimization strategy and leverages the existing ScanOperator
interface that already supports count pushdown operations. The implementation includes proper fallback mechanisms when metadata reading fails, ensuring robustness in production environments.
Confidence score: 3/5
- This PR introduces complex logic for handling Iceberg deletions that may have edge cases in production
- Score reflects the sophisticated deletion handling logic that could lead to count inaccuracies if the position delete simulation or equality delete estimation has bugs
- Pay close attention to the deletion calculation logic in
iceberg_scan.py
, particularly the position delete simulation and sequence number handling
3 files reviewed, 1 comment
|
7283387
to
0b511b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This review covers only the changes made since the last review, not the entire PR. The remaining issue from the previous reviews has been addressed.
The most recent changes focus on completing the count pushdown optimization feature for Iceberg data sources. The implementation adds the ability to compute COUNT(*)
queries directly from Iceberg table metadata instead of scanning actual data files, which provides significant performance improvements for analytical workloads.
Key changes include:
- Implementation of count detection logic in
IcebergScanOperator.to_scan_tasks()
that identifies aggregation pushdowns and routes them to optimized execution paths - Addition of
_create_count_scan_task()
method that aggregates row counts from Iceberg metadata and constructs synthetic scan tasks with the results - Safety mechanisms through
_has_delete_files()
method that prevents count pushdown when delete files are present, ensuring correctness - Interface compliance by implementing
supports_count_pushdown()
andsupported_count_modes()
methods - Comprehensive test coverage with new test tables and scenarios in the integration test suite
The optimization intelligently falls back to regular scanning when count pushdown cannot be safely applied, such as when delete files exist, filters are applied, or other conditions prevent the optimization. This ensures that the feature maintains correctness while providing performance benefits where possible.
Confidence score: 4/5
- This PR is generally safe to merge with good implementation and comprehensive testing
- Score reflects solid architecture with proper fallback mechanisms, though the WIP status suggests potential incompleteness
- Pay close attention to the test data setup and count optimization logic validation
3 files reviewed, no comments
@Jay-ju help me review when you are convenient. Thanks |
Changes Made
Reference #4969 (comment) , adds count pushdown optimization support to Iceberg data sources.
But, for the MOR table, there are multiple delete operations, and calculating the number of rows through delete_files may cause problems. Therefore, this PR does not currently support iceberg tables with delete operations.
Related Issues
Checklist
docs/mkdocs.yml
navigation