Skip to content

Commit e596377

Browse files
committed
Add scheduling parameters
1 parent f3ab41d commit e596377

File tree

4 files changed

+10
-4
lines changed

4 files changed

+10
-4
lines changed

awswrangler/distributed/ray/_register.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,16 @@ def register_ray() -> None:
2525
_select_query,
2626
_select_object_content,
2727
_wait_object_batch,
28+
]:
29+
# Schedule for maximum concurrency
30+
engine.register_func(func, ray_remote(scheduling_strategy="SPREAD")(func))
31+
32+
for pack_func in [
2833
_write_batch,
2934
_write_df,
3035
]:
31-
engine.register_func(func, ray_remote()(func))
36+
# Schedule for data locality
37+
engine.register_func(pack_func, ray_remote(scheduling_strategy="PACK")(pack_func))
3238

3339
if memory_format.get() == MemoryFormatEnum.MODIN:
3440
from awswrangler.distributed.ray.modin._data_types import pyarrow_types_from_pandas_distributed

awswrangler/distributed/ray/modin/_data_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def pyarrow_types_from_pandas_distributed(
1313
df: pd.DataFrame, index: bool, ignore_cols: Optional[List[str]] = None, index_left: bool = False
1414
) -> Dict[str, pa.DataType]:
1515
"""Extract the related Pyarrow data types from a pandas DataFrame."""
16-
func = ray_remote()(pyarrow_types_from_pandas)
16+
func = ray_remote(scheduling_strategy="PACK")(pyarrow_types_from_pandas)
1717
first_block_object_ref = ray.data.from_modin(df).get_internal_block_refs()[0]
1818
return ray_get( # type: ignore
1919
func(

awswrangler/distributed/ray/modin/_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from awswrangler._arrow import _table_to_df
1515

1616

17-
@ray_remote()
17+
@ray_remote(scheduling_strategy="PACK")
1818
def _block_to_df(
1919
block: Any,
2020
to_pandas_kwargs: Dict[str, Any],

awswrangler/distributed/ray/modin/s3/_write_dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def _to_partitions_distributed( # pylint: disable=unused-argument
140140
_to_partitions_func = engine.dispatch_func(_to_partitions, PandasDataFrame)
141141
func = engine.dispatch_func(func, PandasDataFrame)
142142

143-
@ray_remote()
143+
@ray_remote(scheduling_strategy="PACK")
144144
def write_partitions(df: pd.DataFrame, block_index: int) -> Tuple[List[str], Dict[str, List[str]]]:
145145
paths, partitions_values = _to_partitions_func(
146146
# Passing a copy of the data frame because data in ray object store is immutable

0 commit comments

Comments
 (0)