diff --git a/awswrangler/s3/_delete.py b/awswrangler/s3/_delete.py index 5c9214eb6..02d352b3f 100644 --- a/awswrangler/s3/_delete.py +++ b/awswrangler/s3/_delete.py @@ -1,16 +1,15 @@ -"""Amazon S3 CopDeletey Module (PRIVATE).""" +"""Amazon S3 Delete Module (PRIVATE).""" -import concurrent.futures import datetime import itertools import logging -import time from typing import Any, Dict, List, Optional, Union -from urllib.parse import unquote_plus as _unquote_plus import boto3 -from awswrangler import _utils, exceptions +from awswrangler import _utils +from awswrangler._threading import _get_executor +from awswrangler.distributed import ray_get, ray_remote from awswrangler.s3._fs import get_botocore_valid_kwargs from awswrangler.s3._list import _path2list @@ -20,65 +19,37 @@ def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]: buckets: Dict[str, List[str]] = {} bucket: str - key: str for path in paths: - bucket, key = _utils.parse_path(path=path) + bucket = _utils.parse_path(path=path)[0] if bucket not in buckets: buckets[bucket] = [] - buckets[bucket].append(key) + buckets[bucket].append(path) return buckets +@ray_remote def _delete_objects( - bucket: str, - keys: List[str], - boto3_session: boto3.Session, + boto3_session: Optional[boto3.Session], + paths: List[str], s3_additional_kwargs: Optional[Dict[str, Any]], - attempt: int = 1, ) -> None: - client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) - _logger.debug("len(keys): %s", len(keys)) - batch: List[Dict[str, str]] = [{"Key": key} for key in keys] + client_s3: boto3.client = _utils.client( + service_name="s3", + session=boto3_session, + ) + _logger.debug("len(paths): %s", len(paths)) if s3_additional_kwargs: extra_kwargs: Dict[str, Any] = get_botocore_valid_kwargs( function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs ) else: extra_kwargs = {} + bucket = _utils.parse_path(path=paths[0])[0] + batch: List[Dict[str, str]] = [{"Key": _utils.parse_path(path)[1]} for path in paths] res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}, **extra_kwargs) deleted: List[Dict[str, Any]] = res.get("Deleted", []) for obj in deleted: _logger.debug("s3://%s/%s has been deleted.", bucket, obj.get("Key")) - errors: List[Dict[str, Any]] = res.get("Errors", []) - internal_errors: List[str] = [] - for error in errors: - _logger.debug("error: %s", error) - if "Code" not in error or error["Code"] != "InternalError": - raise exceptions.ServiceApiError(errors) - internal_errors.append(_unquote_plus(error["Key"])) - if len(internal_errors) > 0: - if attempt > 5: # Maximum of 5 attempts (Total of 15 seconds) - raise exceptions.ServiceApiError(errors) - time.sleep(attempt) # Incremental delay (linear) - _delete_objects( - bucket=bucket, - keys=internal_errors, - boto3_session=boto3_session, - s3_additional_kwargs=s3_additional_kwargs, - attempt=(attempt + 1), - ) - - -def _delete_objects_concurrent( - bucket: str, - keys: List[str], - s3_additional_kwargs: Optional[Dict[str, Any]], - boto3_primitives: _utils.Boto3PrimitivesType, -) -> None: - boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives) - return _delete_objects( - bucket=bucket, keys=keys, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs - ) def delete_objects( @@ -146,29 +117,18 @@ def delete_objects( last_modified_end=last_modified_end, s3_additional_kwargs=s3_additional_kwargs, ) - if len(paths) < 1: - return - buckets: Dict[str, List[str]] = _split_paths_by_bucket(paths=paths) - for bucket, keys in buckets.items(): - chunks: List[List[str]] = _utils.chunkify(lst=keys, max_length=1_000) - if len(chunks) == 1: - _delete_objects( - bucket=bucket, keys=chunks[0], boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs - ) - elif use_threads is False: - for chunk in chunks: - _delete_objects( - bucket=bucket, keys=chunk, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs - ) - else: - cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) - with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: - list( - executor.map( - _delete_objects_concurrent, - itertools.repeat(bucket), - chunks, - itertools.repeat(s3_additional_kwargs), - itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)), - ) - ) + paths_by_bucket: Dict[str, List[str]] = _split_paths_by_bucket(paths) + + chunks = [] + for _, paths in paths_by_bucket.items(): + chunks += _utils.chunkify(lst=paths, max_length=1_000) + + executor = _get_executor(use_threads=use_threads) + ray_get( + executor.map( + _delete_objects, + boto3_session, + chunks, + itertools.repeat(s3_additional_kwargs), + ) + ) diff --git a/load_tests/_utils.py b/load_tests/_utils.py new file mode 100644 index 000000000..77d846bb7 --- /dev/null +++ b/load_tests/_utils.py @@ -0,0 +1,55 @@ +import random +from datetime import datetime +from timeit import default_timer as timer +from typing import Iterator + +import boto3 +import botocore.exceptions + +import awswrangler as wr +from awswrangler._utils import try_it + +CFN_VALID_STATUS = ["CREATE_COMPLETE", "ROLLBACK_COMPLETE", "UPDATE_COMPLETE", "UPDATE_ROLLBACK_COMPLETE"] + + +class ExecutionTimer: + def __init__(self, msg="elapsed time"): + self.msg = msg + + def __enter__(self): + self.before = timer() + return self + + def __exit__(self, type, value, traceback): + self.elapsed_time = round((timer() - self.before), 3) + print(f"{self.msg}: {self.elapsed_time:.3f} sec") + return None + + +def extract_cloudformation_outputs(): + outputs = {} + client = boto3.client("cloudformation") + response = try_it(client.describe_stacks, botocore.exceptions.ClientError, max_num_tries=5) + for stack in response.get("Stacks"): + if ( + stack["StackName"] + in ["aws-data-wrangler-base", "aws-data-wrangler-databases", "aws-data-wrangler-opensearch"] + ) and (stack["StackStatus"] in CFN_VALID_STATUS): + for output in stack.get("Outputs"): + outputs[output.get("OutputKey")] = output.get("OutputValue") + return outputs + + +def get_time_str_with_random_suffix() -> str: + time_str = datetime.utcnow().strftime("%Y%m%d%H%M%S%f") + return f"{time_str}_{random.randrange(16**6):06x}" + + +def path_generator(bucket: str) -> Iterator[str]: + s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/" + print(f"S3 Path: {s3_path}") + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + yield s3_path + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) diff --git a/load_tests/conftest.py b/load_tests/conftest.py new file mode 100644 index 000000000..f75becde6 --- /dev/null +++ b/load_tests/conftest.py @@ -0,0 +1,33 @@ +import pytest # type: ignore + +from ._utils import extract_cloudformation_outputs, path_generator + + +@pytest.fixture(scope="session") +def cloudformation_outputs(): + return extract_cloudformation_outputs() + + +@pytest.fixture(scope="session") +def region(cloudformation_outputs): + return cloudformation_outputs["Region"] + + +@pytest.fixture(scope="session") +def bucket(cloudformation_outputs): + return cloudformation_outputs["BucketName"] + + +@pytest.fixture(scope="function") +def path(bucket): + yield from path_generator(bucket) + + +@pytest.fixture(scope="function") +def path2(bucket): + yield from path_generator(bucket) + + +@pytest.fixture(scope="function") +def path3(bucket): + yield from path_generator(bucket) diff --git a/load_tests/test_s3.py b/load_tests/test_s3.py new file mode 100644 index 000000000..922828784 --- /dev/null +++ b/load_tests/test_s3.py @@ -0,0 +1,39 @@ +import pandas as pd +import pytest + +import awswrangler as wr + +from ._utils import ExecutionTimer + + +@pytest.mark.repeat(1) +@pytest.mark.parametrize("benchmark_time", [150]) +def test_s3_select(benchmark_time): + + path = "s3://ursa-labs-taxi-data/2018/1*.parquet" + with ExecutionTimer("elapsed time of wr.s3.select_query()") as timer: + wr.s3.select_query( + sql="SELECT * FROM s3object", + path=path, + input_serialization="Parquet", + input_serialization_params={}, + scan_range_chunk_size=16 * 1024 * 1024, + ) + + assert timer.elapsed_time < benchmark_time + + +@pytest.mark.parametrize("benchmark_time", [5]) +def test_s3_delete_objects(path, path2, benchmark_time): + df = pd.DataFrame({"id": [1, 2, 3]}) + objects_per_bucket = 505 + paths1 = [f"{path}delete-test{i}.json" for i in range(objects_per_bucket)] + paths2 = [f"{path2}delete-test{i}.json" for i in range(objects_per_bucket)] + paths = paths1 + paths2 + for path in paths: + wr.s3.to_json(df, path) + with ExecutionTimer("elapsed time of wr.s3.delete_objects()") as timer: + wr.s3.delete_objects(path=paths) + assert timer.elapsed_time < benchmark_time + assert len(wr.s3.list_objects(f"{path}delete-test*")) == 0 + assert len(wr.s3.list_objects(f"{path2}delete-test*")) == 0 diff --git a/load_tests/test_s3_select.py b/load_tests/test_s3_select.py deleted file mode 100644 index e1931609b..000000000 --- a/load_tests/test_s3_select.py +++ /dev/null @@ -1,24 +0,0 @@ -import time - -import pytest - -import awswrangler as wr - - -@pytest.mark.repeat(1) -@pytest.mark.parametrize("benchmark_time", [150]) -def test_s3_select(benchmark_time): - start = time.time() - - path = "s3://ursa-labs-taxi-data/2018/1*.parquet" - wr.s3.select_query( - sql="SELECT * FROM s3object", - path=path, - input_serialization="Parquet", - input_serialization_params={}, - scan_range_chunk_size=16 * 1024 * 1024, - ) - end = time.time() - - elapsed_time = end - start - assert elapsed_time < benchmark_time