Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion awswrangler/s3/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,14 @@ def _read_dfs_from_multiple_paths(
) -> List[pd.DataFrame]:
cpus = ensure_cpu_count(use_threads)
if cpus < 2:
return [read_func(path, version_id=version_ids.get(path) if version_ids else None, **kwargs) for path in paths]
return [
read_func(
path,
version_id=version_ids.get(path) if version_ids else None,
**kwargs,
)
for path in paths
]

with concurrent.futures.ThreadPoolExecutor(max_workers=ensure_cpu_count(use_threads)) as executor:
kwargs["boto3_session"] = boto3_to_primitives(kwargs["boto3_session"])
Expand Down
21 changes: 18 additions & 3 deletions awswrangler/s3/_read_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ def _read_parquet_file(
boto3_session: boto3.Session,
s3_additional_kwargs: Optional[Dict[str, str]],
use_threads: Union[bool, int],
validate_schema: Optional[bool],
version_id: Optional[str] = None,
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
) -> pa.Table:
Expand All @@ -481,6 +482,12 @@ def _read_parquet_file(
read_dictionary=categories,
coerce_int96_timestamp_unit=pyarrow_args["coerce_int96_timestamp_unit"],
)
if validate_schema and pq_file and columns:
pq_file_columns: List[str] = pq_file.schema.names()
for column in columns:
if column not in pq_file_columns:
raise exceptions.InvalidArgument(f"column: {column} does not exist")

if pq_file is None:
raise exceptions.InvalidFile(f"Invalid Parquet file: {path}")
return pq_file.read(columns=columns, use_threads=False, use_pandas_metadata=False)
Expand Down Expand Up @@ -521,6 +528,7 @@ def _read_parquet(
map_types: bool,
boto3_session: Union[boto3.Session, _utils.Boto3PrimitivesType],
dataset: bool,
validate_schema: Optional[bool],
path_root: Optional[str],
s3_additional_kwargs: Optional[Dict[str, str]],
use_threads: Union[bool, int],
Expand All @@ -537,6 +545,7 @@ def _read_parquet(
s3_additional_kwargs=s3_additional_kwargs,
use_threads=use_threads,
version_id=version_id,
validate_schema=validate_schema,
pyarrow_additional_kwargs=pyarrow_args,
),
categories=categories,
Expand Down Expand Up @@ -750,6 +759,7 @@ def read_parquet(
"boto3_session": session,
"dataset": dataset,
"path_root": path_root,
"validate_schema": validate_schema,
"s3_additional_kwargs": s3_additional_kwargs,
"use_threads": use_threads,
"pyarrow_additional_kwargs": pyarrow_additional_kwargs,
Expand All @@ -759,14 +769,15 @@ def read_parquet(
return _read_parquet_chunked(
paths=paths,
chunked=chunked,
validate_schema=validate_schema,
ignore_index=ignore_index,
version_ids=versions,
**args,
)
if len(paths) == 1:
return _read_parquet(
path=paths[0], version_id=versions[paths[0]] if isinstance(versions, dict) else None, **args
path=paths[0],
version_id=versions[paths[0]] if isinstance(versions, dict) else None,
**args,
)
if validate_schema is True:
_validate_schemas_from_files(
Expand All @@ -779,7 +790,11 @@ def read_parquet(
)
return _union(
dfs=_read_dfs_from_multiple_paths(
read_func=_read_parquet, paths=paths, version_ids=versions, use_threads=use_threads, kwargs=args
read_func=_read_parquet,
paths=paths,
version_ids=versions,
use_threads=use_threads,
kwargs=args,
),
ignore_index=ignore_index,
)
Expand Down