Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion awswrangler/s3/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ def _read_dfs_from_multiple_paths(
) -> List[pd.DataFrame]:
cpus = ensure_cpu_count(use_threads)
if cpus < 2:
return [read_func(path, version_id=version_ids.get(path) if version_ids else None, **kwargs) for path in paths]
return [
read_func(
path,
version_id=version_ids.get(path) if version_ids else None,
**kwargs,
)
for path in paths
]

with concurrent.futures.ThreadPoolExecutor(max_workers=ensure_cpu_count(use_threads)) as executor:
kwargs["boto3_session"] = boto3_to_primitives(kwargs["boto3_session"])
Expand Down
24 changes: 21 additions & 3 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ def _read_parquet_file(
boto3_session: boto3.Session,
s3_additional_kwargs: Optional[Dict[str, str]],
use_threads: Union[bool, int],
validate_schema: Optional[bool],
version_id: Optional[str] = None,
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
) -> pa.Table:
Expand All @@ -481,6 +482,15 @@ def _read_parquet_file(
read_dictionary=categories,
coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
)
if validate_schema and pq_file:
pq_file_table: pyarrow.lib.Table = pq_file.read()
if columns:
for column in columns:
try:
pq_file_table.column(column)
except KeyError as ex:
raise exceptions.InvalidArgument(f"column: {column} does not exist\n{ex}")

if pq_file is None:
raise exceptions.InvalidFile(f"Invalid Parquet file: {path}")
return pq_file.read(columns=columns, use_threads=False, use_pandas_metadata=False)
Expand Down Expand Up @@ -521,6 +531,7 @@ def _read_parquet(
map_types: bool,
boto3_session: Union[boto3.Session, _utils.Boto3PrimitivesType],
dataset: bool,
validate_schema: Optional[bool],
path_root: Optional[str],
s3_additional_kwargs: Optional[Dict[str, str]],
use_threads: Union[bool, int],
Expand All @@ -537,6 +548,7 @@ def _read_parquet(
s3_additional_kwargs=s3_additional_kwargs,
use_threads=use_threads,
version_id=version_id,
validate_schema=validate_schema,
pyarrow_additional_kwargs=pyarrow_args,
),
categories=categories,
Expand Down Expand Up @@ -750,6 +762,7 @@ def read_parquet(
"boto3_session": session,
"dataset": dataset,
"path_root": path_root,
"validate_schema": validate_schema,
"s3_additional_kwargs": s3_additional_kwargs,
"use_threads": use_threads,
"pyarrow_additional_kwargs": pyarrow_additional_kwargs,
Expand All @@ -759,14 +772,15 @@ def read_parquet(
return _read_parquet_chunked(
paths=paths,
chunked=chunked,
validate_schema=validate_schema,
ignore_index=ignore_index,
version_ids=versions,
**args,
)
if len(paths) == 1:
return _read_parquet(
path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
path=paths[0],
version_id=versions[paths[0]] if isinstance(versions, dict) else None,
**args,
)
if validate_schema is True:
_validate_schemas_from_files(
Expand All @@ -779,7 +793,11 @@ def read_parquet(
)
return _union(
dfs=_read_dfs_from_multiple_paths(
read_func=_read_parquet, paths=paths, version_ids=versions, use_threads=use_threads, kwargs=args
read_func=_read_parquet,
paths=paths,
version_ids=versions,
use_threads=use_threads,
kwargs=args,
),
ignore_index=ignore_index,
)
Expand Down