Skip to content

Commit 938e83c

Browse files
Fix: S3 read text with version ID was not working (#1587)
1 parent 0f954ba commit 938e83c

File tree

2 files changed

+25
-6
lines changed

2 files changed

+25
-6
lines changed

awswrangler/s3/_read_text.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@
3333
_logger: logging.Logger = logging.getLogger(__name__)
3434

3535

36+
def _get_version_id_for(version_id: Optional[Union[str, Dict[str, str]]], path: str) -> Optional[str]:
37+
if isinstance(version_id, dict):
38+
return version_id.get(path, None)
39+
40+
return version_id
41+
42+
3643
def _get_read_details(path: str, pandas_kwargs: Dict[str, Any]) -> Tuple[str, Optional[str], Optional[str]]:
3744
if pandas_kwargs.get("compression", "infer") == "infer":
3845
pandas_kwargs["compression"] = infer_compression(path, compression="infer")
@@ -52,7 +59,7 @@ def _read_text_chunked(
5259
s3_additional_kwargs: Optional[Dict[str, str]],
5360
dataset: bool,
5461
use_threads: Union[bool, int],
55-
version_ids: Optional[Dict[str, str]] = None,
62+
version_ids: Optional[Dict[str, Optional[str]]] = None,
5663
) -> Iterator[pd.DataFrame]:
5764
for path in paths:
5865
_logger.debug("path: %s", path)
@@ -157,19 +164,21 @@ def _read_text(
157164
}
158165
_logger.debug("args:\n%s", pprint.pformat(args))
159166

160-
if chunksize is not None:
161-
return _read_text_chunked(
162-
paths=paths, version_ids=version_id if isinstance(version_id, dict) else None, chunksize=chunksize, **args
167+
if len(paths) > 1 and version_id is not None and not isinstance(version_id, dict):
168+
raise exceptions.InvalidArgumentCombination(
169+
"If multiple paths are provided along with a file version ID, the version ID parameter must be a dict."
163170
)
171+
version_id_dict = {path: _get_version_id_for(version_id, path) for path in paths}
164172

165-
version_id = version_id if isinstance(version_id, dict) else None
173+
if chunksize is not None:
174+
return _read_text_chunked(paths=paths, version_ids=version_id_dict, chunksize=chunksize, **args)
166175

167176
executor = _get_executor(use_threads=use_threads)
168177
tables = executor.map(
169178
_read_text_file,
170179
session,
171180
paths,
172-
itertools.repeat(version_id),
181+
[version_id_dict[path] for path in paths],
173182
itertools.repeat(parser_func),
174183
itertools.repeat(path_root),
175184
itertools.repeat(pandas_kwargs),

tests/unit/test_s3_text.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,14 @@ def test_read_json_versioned(path) -> None:
325325
pd.DataFrame({"id": [4, 5, 6], "value": ["foo", "boo", "bar"]}),
326326
pd.DataFrame({"id": [1, 2, 3], "value": ["a", "b", "c"]}),
327327
]
328+
version_ids = []
329+
328330
for df in dfs:
329331
wr.s3.to_json(df=df, path=path_file)
330332
version_id = wr.s3.describe_objects(path=path_file)[path_file]["VersionId"]
333+
version_ids.append(version_id)
334+
335+
for df, version_id in zip(dfs, version_ids):
331336
df_temp = wr.s3.read_json(path_file, version_id=version_id)
332337
assert df_temp.equals(df)
333338
assert version_id == wr.s3.describe_objects(path=path_file, version_id=version_id)[path_file]["VersionId"]
@@ -339,9 +344,14 @@ def test_read_csv_versioned(path) -> None:
339344
pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]}),
340345
pd.DataFrame({"c0": [3, 4, 5], "c1": [6, 7, 8]}),
341346
]
347+
version_ids = []
348+
342349
for df in dfs:
343350
wr.s3.to_csv(df=df, path=path_file, index=False)
344351
version_id = wr.s3.describe_objects(path=path_file)[path_file]["VersionId"]
352+
version_ids.append(version_id)
353+
354+
for df, version_id in zip(dfs, version_ids):
345355
df_temp = wr.s3.read_csv(path_file, version_id=version_id)
346356
assert df_temp.equals(df)
347357
assert version_id == wr.s3.describe_objects(path=path_file, version_id=version_id)[path_file]["VersionId"]

0 commit comments

Comments
 (0)