Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
af3ab20
first draft of delete_objects (distributed)
malachi-constant Jul 22, 2022
da502a3
removing concurrent function, potentially not needed..
malachi-constant Jul 24, 2022
ca197fb
flake8
malachi-constant Jul 24, 2022
717e321
Merge branch 'release-3.0.0' of github.com:awslabs/aws-data-wrangler …
malachi-constant Aug 1, 2022
885a15c
Fixing fixed iterable arg
malachi-constant Aug 2, 2022
8ef6858
restoring test script
malachi-constant Aug 2, 2022
fd9ad26
Fixing typing
malachi-constant Aug 3, 2022
d71b671
remove retry logic, redundant with botocore retry
malachi-constant Aug 3, 2022
87a92e2
Module name
malachi-constant Aug 4, 2022
babf835
Refactoring _delete_objects
malachi-constant Aug 4, 2022
2e39c0c
ray get added
malachi-constant Aug 4, 2022
40da5fb
updating load tests with configuration and s3 delete test
malachi-constant Aug 4, 2022
4fc3dd7
reverting isort bad update
malachi-constant Aug 4, 2022
0ce008b
reverting isort bad update
malachi-constant Aug 4, 2022
abb22e6
changing chunk size
malachi-constant Aug 4, 2022
13308c5
typing
malachi-constant Aug 4, 2022
9fd3f4c
pylint and test count
malachi-constant Aug 4, 2022
9b85b44
adding region to conftest
malachi-constant Aug 4, 2022
0affa4f
changing chunk size
malachi-constant Aug 5, 2022
9548826
updating load test
malachi-constant Aug 5, 2022
b1995de
flake8
malachi-constant Aug 5, 2022
ccdaf0e
adding ExecutionTime context manager for benchmarking load tests
malachi-constant Aug 9, 2022
71a6534
updating benchmark for s3 delete
malachi-constant Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 31 additions & 71 deletions awswrangler/s3/_delete.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
"""Amazon S3 CopDeletey Module (PRIVATE)."""
"""Amazon S3 Delete Module (PRIVATE)."""

import concurrent.futures
import datetime
import itertools
import logging
import time
from typing import Any, Dict, List, Optional, Union
from urllib.parse import unquote_plus as _unquote_plus

import boto3

from awswrangler import _utils, exceptions
from awswrangler import _utils
from awswrangler._threading import _get_executor
from awswrangler.distributed import ray_get, ray_remote
from awswrangler.s3._fs import get_botocore_valid_kwargs
from awswrangler.s3._list import _path2list

Expand All @@ -20,65 +19,37 @@
def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]:
buckets: Dict[str, List[str]] = {}
bucket: str
key: str
for path in paths:
bucket, key = _utils.parse_path(path=path)
bucket = _utils.parse_path(path=path)[0]
if bucket not in buckets:
buckets[bucket] = []
buckets[bucket].append(key)
buckets[bucket].append(path)
return buckets


@ray_remote
def _delete_objects(
bucket: str,
keys: List[str],
boto3_session: boto3.Session,
boto3_session: Optional[boto3.Session],
paths: List[str],
s3_additional_kwargs: Optional[Dict[str, Any]],
attempt: int = 1,
) -> None:
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
_logger.debug("len(keys): %s", len(keys))
batch: List[Dict[str, str]] = [{"Key": key} for key in keys]
client_s3: boto3.client = _utils.client(
service_name="s3",
session=boto3_session,
)
_logger.debug("len(paths): %s", len(paths))
if s3_additional_kwargs:
extra_kwargs: Dict[str, Any] = get_botocore_valid_kwargs(
function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs
)
else:
extra_kwargs = {}
bucket = _utils.parse_path(path=paths[0])[0]
batch: List[Dict[str, str]] = [{"Key": _utils.parse_path(path)[1]} for path in paths]
res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}, **extra_kwargs)
deleted: List[Dict[str, Any]] = res.get("Deleted", [])
for obj in deleted:
_logger.debug("s3://%s/%s has been deleted.", bucket, obj.get("Key"))
errors: List[Dict[str, Any]] = res.get("Errors", [])
internal_errors: List[str] = []
for error in errors:
_logger.debug("error: %s", error)
if "Code" not in error or error["Code"] != "InternalError":
raise exceptions.ServiceApiError(errors)
internal_errors.append(_unquote_plus(error["Key"]))
if len(internal_errors) > 0:
if attempt > 5: # Maximum of 5 attempts (Total of 15 seconds)
raise exceptions.ServiceApiError(errors)
time.sleep(attempt) # Incremental delay (linear)
_delete_objects(
bucket=bucket,
keys=internal_errors,
boto3_session=boto3_session,
s3_additional_kwargs=s3_additional_kwargs,
attempt=(attempt + 1),
)


def _delete_objects_concurrent(
bucket: str,
keys: List[str],
s3_additional_kwargs: Optional[Dict[str, Any]],
boto3_primitives: _utils.Boto3PrimitivesType,
) -> None:
boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives)
return _delete_objects(
bucket=bucket, keys=keys, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
)


def delete_objects(
Expand Down Expand Up @@ -146,29 +117,18 @@ def delete_objects(
last_modified_end=last_modified_end,
s3_additional_kwargs=s3_additional_kwargs,
)
if len(paths) < 1:
return
buckets: Dict[str, List[str]] = _split_paths_by_bucket(paths=paths)
for bucket, keys in buckets.items():
chunks: List[List[str]] = _utils.chunkify(lst=keys, max_length=1_000)
if len(chunks) == 1:
_delete_objects(
bucket=bucket, keys=chunks[0], boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
)
elif use_threads is False:
for chunk in chunks:
_delete_objects(
bucket=bucket, keys=chunk, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
)
else:
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
list(
executor.map(
_delete_objects_concurrent,
itertools.repeat(bucket),
chunks,
itertools.repeat(s3_additional_kwargs),
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
)
)
paths_by_bucket: Dict[str, List[str]] = _split_paths_by_bucket(paths)

chunks = []
for _, paths in paths_by_bucket.items():
chunks += _utils.chunkify(lst=paths, max_length=1_000)

executor = _get_executor(use_threads=use_threads)
ray_get(
executor.map(
_delete_objects,
boto3_session,
chunks,
itertools.repeat(s3_additional_kwargs),
)
)
55 changes: 55 additions & 0 deletions load_tests/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import random
from datetime import datetime
from timeit import default_timer as timer
from typing import Iterator

import boto3
import botocore.exceptions

import awswrangler as wr
from awswrangler._utils import try_it

CFN_VALID_STATUS = ["CREATE_COMPLETE", "ROLLBACK_COMPLETE", "UPDATE_COMPLETE", "UPDATE_ROLLBACK_COMPLETE"]


class ExecutionTimer:
def __init__(self, msg="elapsed time"):
self.msg = msg

def __enter__(self):
self.before = timer()
return self

def __exit__(self, type, value, traceback):
self.elapsed_time = round((timer() - self.before), 3)
print(f"{self.msg}: {self.elapsed_time:.3f} sec")
return None


def extract_cloudformation_outputs():
outputs = {}
client = boto3.client("cloudformation")
response = try_it(client.describe_stacks, botocore.exceptions.ClientError, max_num_tries=5)
for stack in response.get("Stacks"):
if (
stack["StackName"]
in ["aws-data-wrangler-base", "aws-data-wrangler-databases", "aws-data-wrangler-opensearch"]
) and (stack["StackStatus"] in CFN_VALID_STATUS):
for output in stack.get("Outputs"):
outputs[output.get("OutputKey")] = output.get("OutputValue")
return outputs


def get_time_str_with_random_suffix() -> str:
time_str = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
return f"{time_str}_{random.randrange(16**6):06x}"


def path_generator(bucket: str) -> Iterator[str]:
s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/"
print(f"S3 Path: {s3_path}")
objs = wr.s3.list_objects(s3_path)
wr.s3.delete_objects(path=objs)
yield s3_path
objs = wr.s3.list_objects(s3_path)
wr.s3.delete_objects(path=objs)
33 changes: 33 additions & 0 deletions load_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest # type: ignore

from ._utils import extract_cloudformation_outputs, path_generator


@pytest.fixture(scope="session")
def cloudformation_outputs():
return extract_cloudformation_outputs()


@pytest.fixture(scope="session")
def region(cloudformation_outputs):
return cloudformation_outputs["Region"]


@pytest.fixture(scope="session")
def bucket(cloudformation_outputs):
return cloudformation_outputs["BucketName"]


@pytest.fixture(scope="function")
def path(bucket):
yield from path_generator(bucket)


@pytest.fixture(scope="function")
def path2(bucket):
yield from path_generator(bucket)


@pytest.fixture(scope="function")
def path3(bucket):
yield from path_generator(bucket)
39 changes: 39 additions & 0 deletions load_tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pandas as pd
import pytest

import awswrangler as wr

from ._utils import ExecutionTimer


@pytest.mark.repeat(1)
@pytest.mark.parametrize("benchmark_time", [150])
def test_s3_select(benchmark_time):

path = "s3://ursa-labs-taxi-data/2018/1*.parquet"
with ExecutionTimer("elapsed time of wr.s3.select_query()") as timer:
wr.s3.select_query(
sql="SELECT * FROM s3object",
path=path,
input_serialization="Parquet",
input_serialization_params={},
scan_range_chunk_size=16 * 1024 * 1024,
)

assert timer.elapsed_time < benchmark_time


@pytest.mark.parametrize("benchmark_time", [5])
def test_s3_delete_objects(path, path2, benchmark_time):
df = pd.DataFrame({"id": [1, 2, 3]})
objects_per_bucket = 505
paths1 = [f"{path}delete-test{i}.json" for i in range(objects_per_bucket)]
paths2 = [f"{path2}delete-test{i}.json" for i in range(objects_per_bucket)]
paths = paths1 + paths2
for path in paths:
wr.s3.to_json(df, path)
with ExecutionTimer("elapsed time of wr.s3.delete_objects()") as timer:
wr.s3.delete_objects(path=paths)
assert timer.elapsed_time < benchmark_time
assert len(wr.s3.list_objects(f"{path}delete-test*")) == 0
assert len(wr.s3.list_objects(f"{path2}delete-test*")) == 0
24 changes: 0 additions & 24 deletions load_tests/test_s3_select.py

This file was deleted.