|
19 | 19 | def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]:
|
20 | 20 | buckets: Dict[str, List[str]] = {}
|
21 | 21 | bucket: str
|
22 |
| - key: str |
23 | 22 | for path in paths:
|
24 |
| - bucket, key = _utils.parse_path(path=path) |
| 23 | + bucket = _utils.parse_path(path=path)[0] |
25 | 24 | if bucket not in buckets:
|
26 | 25 | buckets[bucket] = []
|
27 |
| - buckets[bucket].append(key) |
| 26 | + buckets[bucket].append(path) |
28 | 27 | return buckets
|
29 | 28 |
|
30 | 29 |
|
31 | 30 | @ray_remote
|
32 | 31 | def _delete_objects(
|
33 | 32 | boto3_session: Optional[boto3.Session],
|
34 |
| - bucket: str, |
35 |
| - keys: List[str], |
| 33 | + paths: List[str], |
36 | 34 | s3_additional_kwargs: Optional[Dict[str, Any]],
|
37 | 35 | ) -> None:
|
38 | 36 | client_s3: boto3.client = _utils.client(
|
39 | 37 | service_name="s3",
|
40 | 38 | session=boto3_session,
|
41 | 39 | )
|
42 |
| - _logger.debug("len(keys): %s", len(keys)) |
43 |
| - batch: List[Dict[str, str]] = [{"Key": key} for key in keys] |
| 40 | + _logger.debug("len(paths): %s", len(paths)) |
44 | 41 | if s3_additional_kwargs:
|
45 | 42 | extra_kwargs: Dict[str, Any] = get_botocore_valid_kwargs(
|
46 | 43 | function_name="list_objects_v2", s3_additional_kwargs=s3_additional_kwargs
|
47 | 44 | )
|
48 | 45 | else:
|
49 | 46 | extra_kwargs = {}
|
| 47 | + bucket = _utils.parse_path(path=paths[0])[0] |
| 48 | + batch: List[Dict[str, str]] = [{"Key": _utils.parse_path(path)[1]} for path in paths] |
50 | 49 | res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}, **extra_kwargs)
|
51 | 50 | deleted: List[Dict[str, Any]] = res.get("Deleted", [])
|
52 | 51 | for obj in deleted:
|
@@ -118,11 +117,16 @@ def delete_objects(
|
118 | 117 | last_modified_end=last_modified_end,
|
119 | 118 | s3_additional_kwargs=s3_additional_kwargs,
|
120 | 119 | )
|
121 |
| - |
122 |
| - buckets: Dict[str, List[str]] = _split_paths_by_bucket(paths=paths) |
123 |
| - for bucket, keys in buckets.items(): |
124 |
| - chunks: List[List[str]] = _utils.chunkify(lst=keys, max_length=1_000) |
125 |
| - executor = _get_executor(use_threads=use_threads) |
126 |
| - executor.map( |
127 |
| - _delete_objects, boto3_session, itertools.repeat(bucket), chunks, itertools.repeat(s3_additional_kwargs) |
128 |
| - ) |
| 120 | + paths_by_bucket: Dict[List[str, List[str]]] = _split_paths_by_bucket(paths) |
| 121 | + |
| 122 | + chunks = [] |
| 123 | + for bucket in paths_by_bucket: |
| 124 | + chunks += _utils.chunkify(lst=paths_by_bucket[bucket], max_length=5) |
| 125 | + |
| 126 | + executor = _get_executor(use_threads=use_threads) |
| 127 | + executor.map( |
| 128 | + _delete_objects, |
| 129 | + boto3_session, |
| 130 | + chunks, |
| 131 | + itertools.repeat(s3_additional_kwargs), |
| 132 | + ) |
0 commit comments