Skip to content

Commit 748215c

Browse files
Distributed s3 delete objects (#1474)
* first draft of delete_objects (distributed) * removing concurrent function, potentially not needed.. * flake8 * Fixing fixed iterable arg * restoring test script * Fixing typing * remove retry logic, redundant with botocore retry * Module name * Refactoring _delete_objects * ray get added * updating load tests with configuration and s3 delete test * reverting isort bad update * reverting isort bad update * changing chunk size * typing * pylint and test count * adding region to conftest * changing chunk size * updating load test * flake8 * adding ExecutionTime context manager for benchmarking load tests * updating benchmark for s3 delete
1 parent b7ad7f6 commit 748215c

File tree

5 files changed

+158
-95
lines changed

5 files changed

+158
-95
lines changed

awswrangler/s3/_delete.py

Lines changed: 31 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
1-
"""Amazon S3 CopDeletey Module (PRIVATE)."""
1+
"""Amazon S3 Delete Module (PRIVATE)."""
22

3-
import concurrent.futures
43
import datetime
54
import itertools
65
import logging
7-
import time
86
from typing import Any, Dict, List, Optional, Union
9-
from urllib.parse import unquote_plus as _unquote_plus
107

118
import boto3
129

13-
from awswrangler import _utils, exceptions
10+
from awswrangler import _utils
11+
from awswrangler._threading import _get_executor
12+
from awswrangler.distributed import ray_get, ray_remote
1413
from awswrangler.s3._fs import get_botocore_valid_kwargs
1514
from awswrangler.s3._list import _path2list
1615

@@ -20,65 +19,37 @@
2019
def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]:
2120
buckets: Dict[str, List[str]] = {}
2221
bucket: str
23-
key: str
2422
for path in paths:
25-
bucket, key = _utils.parse_path(path=path)
23+
bucket = _utils.parse_path(path=path)[0]
2624
if bucket not in buckets:
2725
buckets[bucket] = []
28-
buckets[bucket].append(key)
26+
buckets[bucket].append(path)
2927
return buckets
3028

3129

30+
@ray_remote
3231
def _delete_objects(
33-
bucket: str,
34-
keys: List[str],
35-
boto3_session: boto3.Session,
32+
boto3_session: Optional[boto3.Session],
33+
paths: List[str],
3634
s3_additional_kwargs: Optional[Dict[str, Any]],
37-
attempt: int = 1,
3835
) -> None:
39-
client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session)
40-
_logger.debug("len(keys): %s", len(keys))
41-
batch: List[Dict[str, str]] = [{"Key": key} for key in keys]
36+
client_s3: boto3.client = _utils.client(
37+
service_name="s3",
38+
session=boto3_session,
39+
)
40+
_logger.debug("len(paths): %s", len(paths))
4241
if s3_additional_kwargs:
4342
extra_kwargs: Dict[str, Any] = get_botocore_valid_kwargs(
4443
function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs
4544
)
4645
else:
4746
extra_kwargs = {}
47+
bucket = _utils.parse_path(path=paths[0])[0]
48+
batch: List[Dict[str, str]] = [{"Key": _utils.parse_path(path)[1]} for path in paths]
4849
res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}, **extra_kwargs)
4950
deleted: List[Dict[str, Any]] = res.get("Deleted", [])
5051
for obj in deleted:
5152
_logger.debug("s3://%s/%s has been deleted.", bucket, obj.get("Key"))
52-
errors: List[Dict[str, Any]] = res.get("Errors", [])
53-
internal_errors: List[str] = []
54-
for error in errors:
55-
_logger.debug("error: %s", error)
56-
if "Code" not in error or error["Code"] != "InternalError":
57-
raise exceptions.ServiceApiError(errors)
58-
internal_errors.append(_unquote_plus(error["Key"]))
59-
if len(internal_errors) > 0:
60-
if attempt > 5: # Maximum of 5 attempts (Total of 15 seconds)
61-
raise exceptions.ServiceApiError(errors)
62-
time.sleep(attempt) # Incremental delay (linear)
63-
_delete_objects(
64-
bucket=bucket,
65-
keys=internal_errors,
66-
boto3_session=boto3_session,
67-
s3_additional_kwargs=s3_additional_kwargs,
68-
attempt=(attempt + 1),
69-
)
70-
71-
72-
def _delete_objects_concurrent(
73-
bucket: str,
74-
keys: List[str],
75-
s3_additional_kwargs: Optional[Dict[str, Any]],
76-
boto3_primitives: _utils.Boto3PrimitivesType,
77-
) -> None:
78-
boto3_session = _utils.boto3_from_primitives(primitives=boto3_primitives)
79-
return _delete_objects(
80-
bucket=bucket, keys=keys, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
81-
)
8253

8354

8455
def delete_objects(
@@ -146,29 +117,18 @@ def delete_objects(
146117
last_modified_end=last_modified_end,
147118
s3_additional_kwargs=s3_additional_kwargs,
148119
)
149-
if len(paths) < 1:
150-
return
151-
buckets: Dict[str, List[str]] = _split_paths_by_bucket(paths=paths)
152-
for bucket, keys in buckets.items():
153-
chunks: List[List[str]] = _utils.chunkify(lst=keys, max_length=1_000)
154-
if len(chunks) == 1:
155-
_delete_objects(
156-
bucket=bucket, keys=chunks[0], boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
157-
)
158-
elif use_threads is False:
159-
for chunk in chunks:
160-
_delete_objects(
161-
bucket=bucket, keys=chunk, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs
162-
)
163-
else:
164-
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
165-
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
166-
list(
167-
executor.map(
168-
_delete_objects_concurrent,
169-
itertools.repeat(bucket),
170-
chunks,
171-
itertools.repeat(s3_additional_kwargs),
172-
itertools.repeat(_utils.boto3_to_primitives(boto3_session=boto3_session)),
173-
)
174-
)
120+
paths_by_bucket: Dict[str, List[str]] = _split_paths_by_bucket(paths)
121+
122+
chunks = []
123+
for _, paths in paths_by_bucket.items():
124+
chunks += _utils.chunkify(lst=paths, max_length=1_000)
125+
126+
executor = _get_executor(use_threads=use_threads)
127+
ray_get(
128+
executor.map(
129+
_delete_objects,
130+
boto3_session,
131+
chunks,
132+
itertools.repeat(s3_additional_kwargs),
133+
)
134+
)

load_tests/_utils.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import random
2+
from datetime import datetime
3+
from timeit import default_timer as timer
4+
from typing import Iterator
5+
6+
import boto3
7+
import botocore.exceptions
8+
9+
import awswrangler as wr
10+
from awswrangler._utils import try_it
11+
12+
CFN_VALID_STATUS = ["CREATE_COMPLETE", "ROLLBACK_COMPLETE", "UPDATE_COMPLETE", "UPDATE_ROLLBACK_COMPLETE"]
13+
14+
15+
class ExecutionTimer:
16+
def __init__(self, msg="elapsed time"):
17+
self.msg = msg
18+
19+
def __enter__(self):
20+
self.before = timer()
21+
return self
22+
23+
def __exit__(self, type, value, traceback):
24+
self.elapsed_time = round((timer() - self.before), 3)
25+
print(f"{self.msg}: {self.elapsed_time:.3f} sec")
26+
return None
27+
28+
29+
def extract_cloudformation_outputs():
30+
outputs = {}
31+
client = boto3.client("cloudformation")
32+
response = try_it(client.describe_stacks, botocore.exceptions.ClientError, max_num_tries=5)
33+
for stack in response.get("Stacks"):
34+
if (
35+
stack["StackName"]
36+
in ["aws-data-wrangler-base", "aws-data-wrangler-databases", "aws-data-wrangler-opensearch"]
37+
) and (stack["StackStatus"] in CFN_VALID_STATUS):
38+
for output in stack.get("Outputs"):
39+
outputs[output.get("OutputKey")] = output.get("OutputValue")
40+
return outputs
41+
42+
43+
def get_time_str_with_random_suffix() -> str:
44+
time_str = datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
45+
return f"{time_str}_{random.randrange(16**6):06x}"
46+
47+
48+
def path_generator(bucket: str) -> Iterator[str]:
49+
s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/"
50+
print(f"S3 Path: {s3_path}")
51+
objs = wr.s3.list_objects(s3_path)
52+
wr.s3.delete_objects(path=objs)
53+
yield s3_path
54+
objs = wr.s3.list_objects(s3_path)
55+
wr.s3.delete_objects(path=objs)

load_tests/conftest.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import pytest # type: ignore
2+
3+
from ._utils import extract_cloudformation_outputs, path_generator
4+
5+
6+
@pytest.fixture(scope="session")
7+
def cloudformation_outputs():
8+
return extract_cloudformation_outputs()
9+
10+
11+
@pytest.fixture(scope="session")
12+
def region(cloudformation_outputs):
13+
return cloudformation_outputs["Region"]
14+
15+
16+
@pytest.fixture(scope="session")
17+
def bucket(cloudformation_outputs):
18+
return cloudformation_outputs["BucketName"]
19+
20+
21+
@pytest.fixture(scope="function")
22+
def path(bucket):
23+
yield from path_generator(bucket)
24+
25+
26+
@pytest.fixture(scope="function")
27+
def path2(bucket):
28+
yield from path_generator(bucket)
29+
30+
31+
@pytest.fixture(scope="function")
32+
def path3(bucket):
33+
yield from path_generator(bucket)

load_tests/test_s3.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import pandas as pd
2+
import pytest
3+
4+
import awswrangler as wr
5+
6+
from ._utils import ExecutionTimer
7+
8+
9+
@pytest.mark.repeat(1)
10+
@pytest.mark.parametrize("benchmark_time", [150])
11+
def test_s3_select(benchmark_time):
12+
13+
path = "s3://ursa-labs-taxi-data/2018/1*.parquet"
14+
with ExecutionTimer("elapsed time of wr.s3.select_query()") as timer:
15+
wr.s3.select_query(
16+
sql="SELECT * FROM s3object",
17+
path=path,
18+
input_serialization="Parquet",
19+
input_serialization_params={},
20+
scan_range_chunk_size=16 * 1024 * 1024,
21+
)
22+
23+
assert timer.elapsed_time < benchmark_time
24+
25+
26+
@pytest.mark.parametrize("benchmark_time", [5])
27+
def test_s3_delete_objects(path, path2, benchmark_time):
28+
df = pd.DataFrame({"id": [1, 2, 3]})
29+
objects_per_bucket = 505
30+
paths1 = [f"{path}delete-test{i}.json" for i in range(objects_per_bucket)]
31+
paths2 = [f"{path2}delete-test{i}.json" for i in range(objects_per_bucket)]
32+
paths = paths1 + paths2
33+
for path in paths:
34+
wr.s3.to_json(df, path)
35+
with ExecutionTimer("elapsed time of wr.s3.delete_objects()") as timer:
36+
wr.s3.delete_objects(path=paths)
37+
assert timer.elapsed_time < benchmark_time
38+
assert len(wr.s3.list_objects(f"{path}delete-test*")) == 0
39+
assert len(wr.s3.list_objects(f"{path2}delete-test*")) == 0

load_tests/test_s3_select.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)