Skip to content

Commit 57bb363

Browse files
committed
feat: add count() pushdown optimization in Iceberg datasource
1 parent cad91bd commit 57bb363

File tree

1 file changed

+69
-23
lines changed

1 file changed

+69
-23
lines changed

tests/integration/iceberg/test_iceberg_reads.py

Lines changed: 69 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -241,30 +241,41 @@ class TestIcebergCountPushdown:
241241
"""Test suite for Iceberg Count pushdown optimization."""
242242

243243
@pytest.mark.integration()
244-
def test_count_pushdown_basic(self, local_iceberg_catalog):
244+
def test_count_pushdown_basic(self, local_iceberg_catalog, capsys):
245245
"""Test basic count(*) pushdown functionality."""
246246
catalog_name, pyiceberg_catalog = local_iceberg_catalog
247247
tab = pyiceberg_catalog.load_table("default.test_all_types")
248248

249249
# Test Daft count with pushdown
250-
df = daft.read_table(f"{catalog_name}.default.test_all_types")
251-
daft_count = df.count().collect().to_pydict()["count"][0]
250+
df = daft.read_table(f"{catalog_name}.default.test_all_types").count()
251+
_ = capsys.readouterr()
252+
df.explain(True)
253+
actual = capsys.readouterr()
254+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" in actual.out
255+
256+
daft_count = df.collect().to_pydict()["count"][0]
252257

253258
# Compare with PyIceberg count
254259
iceberg_count = len(tab.scan().to_arrow())
255260

256261
assert daft_count == iceberg_count
257262

258263
@pytest.mark.integration()
259-
def test_count_pushdown_empty_table(self, local_iceberg_catalog):
264+
def test_count_pushdown_empty_table(self, local_iceberg_catalog, capsys):
260265
"""Test count pushdown on empty table."""
261266
catalog_name, pyiceberg_catalog = local_iceberg_catalog
262267

263268
# Use a table that might be empty or create logic to test empty scenario
264269
try:
265270
tab = pyiceberg_catalog.load_table("default.test_new_column_with_no_data")
266-
df = daft.read_table(f"{catalog_name}.default.test_new_column_with_no_data")
267-
daft_count = df.count().collect().to_pydict()["count"][0]
271+
df = daft.read_table(f"{catalog_name}.default.test_new_column_with_no_data").count()
272+
273+
_ = capsys.readouterr()
274+
df.explain(True)
275+
actual = capsys.readouterr()
276+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" in actual.out
277+
278+
daft_count = df.collect().to_pydict()["count"][0]
268279

269280
# Compare with PyIceberg count
270281
iceberg_count = len(tab.scan().to_arrow())
@@ -284,30 +295,41 @@ def test_count_pushdown_empty_table(self, local_iceberg_catalog):
284295
"test_partitioned_by_years",
285296
],
286297
)
287-
def test_count_pushdown_partitioned_tables(self, table_name, local_iceberg_catalog):
298+
def test_count_pushdown_partitioned_tables(self, table_name, local_iceberg_catalog, capsys):
288299
"""Test count pushdown on partitioned tables."""
289300
catalog_name, pyiceberg_catalog = local_iceberg_catalog
290301
tab = pyiceberg_catalog.load_table(f"default.{table_name}")
291302

292303
# Test Daft count with pushdown on partitioned table
293-
df = daft.read_table(f"{catalog_name}.default.{table_name}")
294-
daft_count = df.count().collect().to_pydict()["count"][0]
304+
df = daft.read_table(f"{catalog_name}.default.{table_name}").count()
305+
_ = capsys.readouterr()
306+
df.explain(True)
307+
actual = capsys.readouterr()
308+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" in actual.out
309+
310+
daft_count = df.collect().to_pydict()["count"][0]
295311

296312
# Compare with PyIceberg count
297313
iceberg_count = len(tab.scan().to_arrow())
298314

299315
assert daft_count == iceberg_count
300316

301317
@pytest.mark.integration()
302-
def test_count_pushdown_with_filter(self, local_iceberg_catalog):
318+
def test_count_pushdown_with_filter(self, local_iceberg_catalog, capsys):
303319
"""Test count with filter - should not use pushdown optimization."""
304320
catalog_name, pyiceberg_catalog = local_iceberg_catalog
305321
tab = pyiceberg_catalog.load_table("default.test_partitioned_by_identity")
306322

307323
# Test Daft count with filter (should not use pushdown)
308324
df = daft.read_table(f"{catalog_name}.default.test_partitioned_by_identity")
309-
filtered_df = df.where(df["number"] > 0)
310-
daft_count = filtered_df.count().collect().to_pydict()["count"][0]
325+
df = df.where(df["number"] > 0).count()
326+
327+
_ = capsys.readouterr()
328+
df.explain(True)
329+
actual = capsys.readouterr()
330+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" not in actual.out
331+
332+
daft_count = df.collect().to_pydict()["count"][0]
311333

312334
# Compare with PyIceberg filtered count
313335
iceberg_pandas = tab.scan().to_arrow().to_pandas()
@@ -316,37 +338,48 @@ def test_count_pushdown_with_filter(self, local_iceberg_catalog):
316338
assert daft_count == iceberg_count
317339

318340
@pytest.mark.integration()
319-
def test_count_pushdown_with_column_selection(self, local_iceberg_catalog):
341+
def test_count_pushdown_with_column_selection(self, local_iceberg_catalog, capsys):
320342
"""Test count pushdown works correctly with column selection."""
321343
catalog_name, pyiceberg_catalog = local_iceberg_catalog
322344
tab = pyiceberg_catalog.load_table("default.test_all_types")
323345

324346
# Test count with column selection (should still use pushdown)
325347
df = daft.read_table(f"{catalog_name}.default.test_all_types")
326-
selected_df = df.select("id") if "id" in df.column_names else df.select(df.column_names[0])
327-
daft_count = selected_df.count().collect().to_pydict()["count"][0]
348+
df = df.select("id") if "id" in df.column_names else df.select(df.column_names[0]).count()
349+
350+
_ = capsys.readouterr()
351+
df.explain(True)
352+
actual = capsys.readouterr()
353+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" not in actual.out
354+
355+
daft_count = df.collect().to_pydict()["count"][0]
328356

329357
# Compare with full table count (should be the same)
330358
iceberg_count = len(tab.scan().to_arrow())
331359

332360
assert daft_count == iceberg_count
333361

334362
@pytest.mark.integration()
335-
def test_count_pushdown_with_limit(self, local_iceberg_catalog):
363+
def test_count_pushdown_with_limit(self, local_iceberg_catalog, capsys):
336364
"""Test count behavior with limit operations."""
337365
catalog_name, _pyiceberg_catalog = local_iceberg_catalog
338366

339367
# Test count after limit (should not use pushdown optimization)
340368
df = daft.read_table(f"{catalog_name}.default.test_all_types")
341-
limited_df = df.limit(5)
342-
daft_count = limited_df.count().collect().to_pydict()["count"][0]
369+
df = df.limit(5).count()
370+
daft_count = df.collect().to_pydict()["count"][0]
371+
372+
_ = capsys.readouterr()
373+
df.explain(True)
374+
actual = capsys.readouterr()
375+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" not in actual.out
343376

344377
# Count after limit should be at most the limit value
345378
assert daft_count <= 5
346379
assert daft_count >= 0
347380

348381
@pytest.mark.integration()
349-
def test_count_pushdown_snapshot_consistency(self, local_iceberg_catalog):
382+
def test_count_pushdown_snapshot_consistency(self, local_iceberg_catalog, capsys):
350383
"""Test count pushdown with different snapshots."""
351384
_catalog_name, pyiceberg_catalog = local_iceberg_catalog
352385

@@ -358,7 +391,14 @@ def test_count_pushdown_snapshot_consistency(self, local_iceberg_catalog):
358391
# Test count on different snapshots
359392
for snapshot in snapshots[:2]: # Test first 2 snapshots
360393
daft_pandas = daft.read_iceberg(tab, snapshot_id=snapshot.snapshot_id)
361-
daft_count = daft_pandas.count().collect().to_pydict()["count"][0]
394+
395+
df = daft_pandas.count()
396+
_ = capsys.readouterr()
397+
df.explain(True)
398+
actual = capsys.readouterr()
399+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" not in actual.out
400+
401+
daft_count = df.collect().to_pydict()["count"][0]
362402

363403
# Compare with PyIceberg snapshot count
364404
iceberg_count = len(tab.scan(snapshot_id=snapshot.snapshot_id).to_arrow())
@@ -370,14 +410,20 @@ def test_count_pushdown_snapshot_consistency(self, local_iceberg_catalog):
370410

371411
@pytest.mark.integration()
372412
@pytest.mark.parametrize("table_name", ["test_positional_mor_deletes", "test_positional_mor_double_deletes"])
373-
def test_count_pushdown_with_deletes(self, table_name, local_iceberg_catalog):
413+
def test_count_pushdown_with_deletes(self, table_name, local_iceberg_catalog, capsys):
374414
"""Test count pushdown on tables with MOR (Merge-On-Read) deletes."""
375415
catalog_name, pyiceberg_catalog = local_iceberg_catalog
376416
tab = pyiceberg_catalog.load_table(f"default.{table_name}")
377417

378418
# Test Daft count on table with deletes
379-
df = daft.read_table(f"{catalog_name}.default.{table_name}")
380-
daft_count = df.count().collect().to_pydict()["count"][0]
419+
df = daft.read_table(f"{catalog_name}.default.{table_name}").count()
420+
421+
_ = capsys.readouterr()
422+
df.explain(True)
423+
actual = capsys.readouterr()
424+
assert "daft.io.iceberg.iceberg_scan:_iceberg_count_result_function" in actual.out
425+
426+
daft_count = df.collect().to_pydict()["count"][0]
381427

382428
# Compare with PyIceberg count (should account for deletes)
383429
iceberg_count = len(tab.scan().to_arrow())

0 commit comments

Comments
 (0)