Skip to content

Commit 4fd0914

Browse files
authored
Distribute describe_objects S3 method (#1744)
1 parent f583dd1 commit 4fd0914

File tree

3 files changed

+20
-74
lines changed

3 files changed

+20
-74
lines changed

awswrangler/distributed/ray/_register.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from awswrangler.distributed.ray import ray_remote
77
from awswrangler.lakeformation._read import _get_work_unit_results
88
from awswrangler.s3._delete import _delete_objects
9+
from awswrangler.s3._describe import _describe_object
910
from awswrangler.s3._read_parquet import _read_parquet, _read_parquet_metadata_file
1011
from awswrangler.s3._read_text import _read_text
1112
from awswrangler.s3._select import _select_object_content, _select_query
@@ -20,6 +21,7 @@ def register_ray() -> None:
2021
"""Register dispatched Ray and Modin (on Ray) methods."""
2122
for func in [
2223
_get_work_unit_results,
24+
_describe_object,
2325
_delete_objects,
2426
_read_parquet_metadata_file,
2527
_select_query,

awswrangler/s3/_describe.py

Lines changed: 18 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Amazon S3 Describe Module (INTERNAL)."""
22

3-
import concurrent.futures
43
import datetime
54
import itertools
65
import logging
@@ -9,15 +8,19 @@
98
import boto3
109

1110
from awswrangler import _utils
11+
from awswrangler._distributed import engine
12+
from awswrangler._threading import _get_executor
13+
from awswrangler.distributed.ray import ray_get
1214
from awswrangler.s3 import _fs
1315
from awswrangler.s3._list import _path2list
1416

1517
_logger: logging.Logger = logging.getLogger(__name__)
1618

1719

20+
@engine.dispatch_on_engine
1821
def _describe_object(
19-
path: str,
2022
boto3_session: boto3.Session,
23+
path: str,
2124
s3_additional_kwargs: Optional[Dict[str, Any]],
2225
version_id: Optional[str] = None,
2326
) -> Tuple[str, Dict[str, Any]]:
@@ -40,18 +43,6 @@ def _describe_object(
4043
return path, desc
4144

4245

43-
def _describe_object_concurrent(
44-
path: str,
45-
boto3_primitives: _utils.Boto3PrimitivesType,
46-
s3_additional_kwargs: Optional[Dict[str, Any]],
47-
version_id: Optional[str] = None,
48-
) -> Tuple[str, Dict[str, Any]]:
49-
boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives)
50-
return _describe_object(
51-
path=path, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, version_id=version_id
52-
)
53-
54-
5546
def describe_objects(
5647
path: Union[str, List[str]],
5748
version_id: Optional[Union[str, Dict[str, str]]] = None,
@@ -127,41 +118,22 @@ def describe_objects(
127118
last_modified_end=last_modified_end,
128119
s3_additional_kwargs=s3_additional_kwargs,
129120
)
121+
130122
if len(paths) < 1:
131123
return {}
132124
resp_list: List[Tuple[str, Dict[str, Any]]]
133-
if len(paths) == 1:
134-
resp_list = [
135-
_describe_object(
136-
path=paths[0],
137-
version_id=version_id.get(paths[0]) if isinstance(version_id, dict) else version_id,
138-
boto3_session=boto3_session,
139-
s3_additional_kwargs=s3_additional_kwargs,
140-
)
141-
]
142-
elif use_threads is False:
143-
resp_list = [
144-
_describe_object(
145-
path=p,
146-
version_id=version_id.get(p) if isinstance(version_id, dict) else version_id,
147-
boto3_session=boto3_session,
148-
s3_additional_kwargs=s3_additional_kwargs,
149-
)
150-
for p in paths
151-
]
152-
else:
153-
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
154-
versions = [version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths]
155-
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
156-
resp_list = list(
157-
executor.map(
158-
_describe_object_concurrent,
159-
paths,
160-
versions,
161-
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
162-
itertools.repeat(s3_additional_kwargs),
163-
)
164-
)
125+
126+
executor = _get_executor(use_threads=use_threads)
127+
resp_list = ray_get(
128+
executor.map(
129+
_describe_object,
130+
boto3_session,
131+
paths,
132+
itertools.repeat(s3_additional_kwargs),
133+
[version_id.get(p) if isinstance(version_id, dict) else version_id for p in paths],
134+
)
135+
)
136+
165137
desc_dict: Dict[str, Dict[str, Any]] = dict(resp_list)
166138
return desc_dict
167139

awswrangler/s3/_read.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
"""Amazon S3 Read Module (PRIVATE)."""
22

3-
import concurrent.futures
43
import logging
5-
from functools import partial
64
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast
75

86
import numpy as np
@@ -11,7 +9,6 @@
119

1210
from awswrangler import exceptions
1311
from awswrangler._arrow import _extract_partitions_from_path
14-
from awswrangler._utils import boto3_to_primitives, ensure_cpu_count
1512
from awswrangler.s3._list import _prefix_cleanup
1613

1714
_logger: logging.Logger = logging.getLogger(__name__)
@@ -110,28 +107,3 @@ def _union(dfs: List[pd.DataFrame], ignore_index: Optional[bool]) -> pd.DataFram
110107
for df in dfs:
111108
df[col] = pd.Categorical(df[col].values, categories=cat.categories)
112109
return pd.concat(objs=dfs, sort=False, copy=False, ignore_index=ignore_index)
113-
114-
115-
def _read_dfs_from_multiple_paths(
116-
read_func: Callable[..., pd.DataFrame],
117-
paths: List[str],
118-
version_ids: Optional[Dict[str, str]],
119-
use_threads: Union[bool, int],
120-
kwargs: Dict[str, Any],
121-
) -> List[pd.DataFrame]:
122-
cpus = ensure_cpu_count(use_threads)
123-
if cpus < 2:
124-
return [
125-
read_func(
126-
path,
127-
version_id=version_ids.get(path) if version_ids else None,
128-
**kwargs,
129-
)
130-
for path in paths
131-
]
132-
133-
with concurrent.futures.ThreadPoolExecutor(max_workers=ensure_cpu_count(use_threads)) as executor:
134-
kwargs["boto3_session"] = boto3_to_primitives(kwargs["boto3_session"])
135-
partial_read_func = partial(read_func, **kwargs)
136-
versions = [version_ids.get(p) if isinstance(version_ids, dict) else None for p in paths]
137-
return list(df for df in executor.map(partial_read_func, paths, versions))

0 commit comments

Comments
 (0)