Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
)
from awswrangler.__metadata__ import __description__, __license__, __title__, __version__ # noqa
from awswrangler._config import config # noqa
from awswrangler._distributed import initialize_ray

if config.distributed:
initialize_ray()


__all__ = [
"athena",
Expand Down
85 changes: 84 additions & 1 deletion awswrangler/_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Configuration file for AWS Data Wrangler."""

import importlib.util
import inspect
import logging
import os
Expand Down Expand Up @@ -49,6 +50,15 @@ class _ConfigArg(NamedTuple):
# Botocore config
"botocore_config": _ConfigArg(dtype=botocore.config.Config, nullable=True),
"verify": _ConfigArg(dtype=str, nullable=True),
# Distributed
"distributed": _ConfigArg(dtype=bool, nullable=True),
"address": _ConfigArg(dtype=str, nullable=True),
"redis_password": _ConfigArg(dtype=str, nullable=True),
"ignore_reinit_error": _ConfigArg(dtype=bool, nullable=True),
"include_dashboard": _ConfigArg(dtype=bool, nullable=True),
"object_store_memory": _ConfigArg(dtype=int, nullable=True),
"cpu_count": _ConfigArg(dtype=int, nullable=True),
"gpu_count": _ConfigArg(dtype=int, nullable=True),
}


Expand All @@ -70,6 +80,7 @@ def __init__(self) -> None:
self.secretsmanager_endpoint_url = None
self.botocore_config = None
self.verify = None
self.distributed = all(importlib.util.find_spec(pkg) for pkg in ("modin", "ray"))
for name in _CONFIG_ARGS:
self._load_config(name=name)

Expand Down Expand Up @@ -155,7 +166,7 @@ def __getitem__(self, item: str) -> _ConfigValueType:

def _reset_item(self, item: str) -> None:
if item in self._loaded_values:
if item.endswith("_endpoint_url") or item == "verify":
if item.endswith("_endpoint_url") or item in ["verify", "distributed"]:
self._loaded_values[item] = None
else:
del self._loaded_values[item]
Expand Down Expand Up @@ -405,6 +416,78 @@ def verify(self) -> Optional[str]:
def verify(self, value: Optional[str]) -> None:
self._set_config_value(key="verify", value=value)

@property
def distributed(self) -> Optional[bool]:
"""Property distributed."""
return cast(Optional[bool], self["distributed"])

@distributed.setter
def distributed(self, value: Optional[bool]) -> None:
self._set_config_value(key="distributed", value=value)

@property
def ignore_reinit_error(self) -> Optional[bool]:
"""Property ignore_reinit_error."""
return cast(Optional[bool], self["ignore_reinit_error"])

@ignore_reinit_error.setter
def ignore_reinit_error(self, value: Optional[bool]) -> None:
self._set_config_value(key="ignore_reinit_error", value=value)

@property
def include_dashboard(self) -> Optional[bool]:
"""Property include_dashboard."""
return cast(Optional[bool], self["include_dashboard"])

@include_dashboard.setter
def include_dashboard(self, value: Optional[bool]) -> None:
self._set_config_value(key="include_dashboard", value=value)

@property
def address(self) -> Optional[str]:
"""Property address."""
return cast(Optional[str], self["address"])

@address.setter
def address(self, value: Optional[str]) -> None:
self._set_config_value(key="address", value=value)

@property
def redis_password(self) -> Optional[str]:
"""Property redis_password."""
return cast(Optional[str], self["redis_password"])

@redis_password.setter
def redis_password(self, value: Optional[str]) -> None:
self._set_config_value(key="redis_password", value=value)

@property
def object_store_memory(self) -> int:
"""Property object_store_memory."""
return cast(int, self["object_store_memory"])

@object_store_memory.setter
def object_store_memory(self, value: int) -> None:
self._set_config_value(key="object_store_memory", value=value)

@property
def cpu_count(self) -> int:
"""Property cpu_count."""
return cast(int, self["cpu_count"])

@cpu_count.setter
def cpu_count(self, value: int) -> None:
self._set_config_value(key="cpu_count", value=value)

@property
def gpu_count(self) -> int:
"""Property gpu_count."""
return cast(int, self["gpu_count"])

@gpu_count.setter
def gpu_count(self, value: int) -> None:
self._set_config_value(key="gpu_count", value=value)


def _insert_str(text: str, token: str, insert: str) -> str:
"""Insert string into other."""
Expand Down
126 changes: 126 additions & 0 deletions awswrangler/_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Distributed Module (PRIVATE)."""

import multiprocessing
import os
import sys
import warnings
from typing import TYPE_CHECKING, Any, Callable, Optional

import psutil

from awswrangler._config import apply_configs, config

if config.distributed or TYPE_CHECKING:
import ray


def ray_remote(function: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorate callable to wrap within ray.remote.

Parameters
----------
function : Callable[..., Any]
Callable as input to ray.remote

Returns
-------
Callable[..., Any]
"""
if config.distributed:

def wrapper(*args: Any, **kwargs: Any) -> Any:
return ray.remote(function).remote(*args, **kwargs)

return wrapper
return function


@apply_configs
def initialize_ray(
address: Optional[str] = None,
redis_password: Optional[str] = None,
ignore_reinit_error: Optional[bool] = True,
include_dashboard: Optional[bool] = False,
object_store_memory: Optional[int] = None,
cpu_count: Optional[int] = None,
gpu_count: Optional[int] = 0,
) -> None:
"""
Connect to an existing Ray cluster or start one and connect to it.

Parameters
----------
address : Optional[str]
Address of the Ray cluster to connect to, by default None
redis_password : Optional[str]
Password to the Redis cluster, by default None
ignore_reinit_error : Optional[bool]
If true, Ray suppress errors from calling ray.init() twice, by default True
include_dashboard : Optional[bool]
Boolean flag indicating whether or not to start the Ray dashboard, by default False
object_store_memory : Optional[int]
The amount of memory (in bytes) to start the object store with, by default None
cpu_count : Optional[int]
Number of CPUs to assign to each raylet, by default None
gpu_count : Optional[int]
Number of GPUs to assign to each raylet, by default 0
"""
if address:
ray.init(
address=address,
include_dashboard=include_dashboard,
ignore_reinit_error=ignore_reinit_error,
)
else:
if not object_store_memory:
object_store_memory = _get_ray_object_store_memory()

mac_size_limit = getattr(ray.ray_constants, "MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT", None)
if sys.platform == "darwin" and mac_size_limit is not None and object_store_memory > mac_size_limit:
warnings.warn(
"On Macs, Ray's performance is known to degrade with "
+ "object store size greater than "
+ f"{mac_size_limit / 2 ** 30:.4} GiB. Ray by default does "
+ "not allow setting an object store size greater than "
+ "that. This default is overridden to avoid "
+ "spilling to disk more often. To override this "
+ "behavior, you can initialize Ray yourself."
)
os.environ["RAY_ENABLE_MAC_LARGE_OBJECT_STORE"] = "1"

ray_init_kwargs = {
"num_cpus": cpu_count or multiprocessing.cpu_count(),
"num_gpus": gpu_count,
"include_dashboard": include_dashboard,
"ignore_reinit_error": ignore_reinit_error,
"object_store_memory": object_store_memory,
"_redis_password": redis_password,
"_memory": object_store_memory,
}
ray.init(**ray_init_kwargs)


def _get_ray_object_store_memory() -> Optional[int]:
virtual_memory = psutil.virtual_memory().total
if sys.platform.startswith("linux"):
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_stats = os.fstatvfs(shm_fd)
system_memory = shm_stats.f_bsize * shm_stats.f_bavail
if system_memory / (virtual_memory / 2) < 0.99:
warnings.warn(
f"The size of /dev/shm is too small ({system_memory} bytes). The required size "
+ f"is at least half of RAM ({virtual_memory // 2} bytes). Please, delete files "
+ "in /dev/shm or increase the size with --shm-size in Docker. Alternatively, set the "
+ "memory size for each Ray worker in bytes with the RAY_OBJECT_STORE_MEMORY env var."
)
finally:
os.close(shm_fd)
else:
system_memory = virtual_memory
object_store_memory: Optional[int] = int(0.6 * system_memory // 1e9 * 1e9) # type: ignore
# If the memory pool is smaller than 2GB, just use the default in ray.
if object_store_memory == 0:
object_store_memory = None
return object_store_memory
Loading