Skip to content

Commit e461cf9

Browse files
zsolngoldbaum
authored andcommitted
Replace multiprocessing with ProcessPoolExecutor
Instead of relying on `multiprocessing.Pool`, this PR replaces the implementation of `parallel_exec_transform_with_prettyprint` with `concurrent.futures.ProcessPoolExecutor`
1 parent 11d6e36 commit e461cf9

File tree

2 files changed

+45
-27
lines changed

2 files changed

+45
-27
lines changed

libcst/codemod/_cli.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414
import sys
1515
import time
1616
import traceback
17+
from concurrent.futures import as_completed, Executor, ProcessPoolExecutor
1718
from copy import deepcopy
1819
from dataclasses import dataclass, replace
19-
from multiprocessing import cpu_count, Pool
20+
from multiprocessing import cpu_count
2021
from pathlib import Path
2122
from typing import Any, AnyStr, cast, Dict, List, Optional, Sequence, Union
2223

2324
from libcst import parse_module, PartialParserConfig
2425
from libcst.codemod._codemod import Codemod
2526
from libcst.codemod._context import CodemodContext
26-
from libcst.codemod._dummy_pool import DummyPool
27+
from libcst.codemod._dummy_pool import DummyExecutor
2728
from libcst.codemod._runner import (
2829
SkipFile,
2930
SkipReason,
@@ -607,13 +608,14 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901
607608
python_version=python_version,
608609
)
609610

611+
pool_impl: type[Executor]
610612
if total == 1 or jobs == 1:
611613
# Simple case, we should not pay for process overhead.
612-
# Let's just use a dummy synchronous pool.
614+
# Let's just use a dummy synchronous executor.
613615
jobs = 1
614-
pool_impl = DummyPool
616+
pool_impl = DummyExecutor
615617
else:
616-
pool_impl = Pool
618+
pool_impl = ProcessPoolExecutor
617619
# Warm the parser, pre-fork.
618620
parse_module(
619621
"",
@@ -629,7 +631,7 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901
629631
warnings: int = 0
630632
skips: int = 0
631633

632-
with pool_impl(processes=jobs) as p: # type: ignore
634+
with pool_impl(max_workers=jobs) as executor: # type: ignore
633635
args = [
634636
{
635637
"transformer": transform,
@@ -640,9 +642,9 @@ def parallel_exec_transform_with_prettyprint( # noqa: C901
640642
for filename in files
641643
]
642644
try:
643-
for result in p.imap_unordered(
644-
_execute_transform_wrap, args, chunksize=chunksize
645-
):
645+
futures = [executor.submit(_execute_transform_wrap, arg) for arg in args]
646+
for future in as_completed(futures):
647+
result = future.result()
646648
# Print an execution result, keep track of failures
647649
_print_parallel_result(
648650
result,

libcst/codemod/_dummy_pool.py

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,37 +3,53 @@
33
# This source code is licensed under the MIT license found in the
44
# LICENSE file in the root directory of this source tree.
55

6+
import sys
7+
from concurrent.futures import Executor, Future
68
from types import TracebackType
7-
from typing import Callable, Generator, Iterable, Optional, Type, TypeVar
9+
from typing import Callable, Optional, Type, TypeVar
810

9-
RetT = TypeVar("RetT")
10-
ArgT = TypeVar("ArgT")
11+
if sys.version_info >= (3, 10):
12+
from typing import ParamSpec
13+
else:
14+
from typing_extensions import ParamSpec
1115

16+
Return = TypeVar("Return")
17+
ParamSpec = ParamSpec("ParamSpec")
1218

13-
class DummyPool:
19+
20+
class DummyExecutor(Executor):
1421
"""
15-
Synchronous dummy `multiprocessing.Pool` analogue.
22+
Synchronous dummy `concurrent.futures.Executor` analogue.
1623
"""
1724

18-
def __init__(self, processes: Optional[int] = None) -> None:
25+
def __init__(self, max_workers: Optional[int] = None) -> None:
1926
pass
2027

21-
def imap_unordered(
28+
def submit(
2229
self,
23-
func: Callable[[ArgT], RetT],
24-
iterable: Iterable[ArgT],
25-
chunksize: Optional[int] = None,
26-
) -> Generator[RetT, None, None]:
27-
for args in iterable:
28-
yield func(args)
29-
30-
def __enter__(self) -> "DummyPool":
30+
# pyre-ignore
31+
fn: Callable[ParamSpec, Return],
32+
# pyre-ignore
33+
*args: ParamSpec.args,
34+
# pyre-ignore
35+
**kwargs: ParamSpec.kwargs,
36+
# pyre-ignore
37+
) -> Future[Return]:
38+
future: Future[Return] = Future()
39+
try:
40+
result = fn(*args, **kwargs)
41+
future.set_result(result)
42+
except Exception as exc:
43+
future.set_exception(exc)
44+
return future
45+
46+
def __enter__(self) -> "DummyExecutor":
3147
return self
3248

3349
def __exit__(
3450
self,
35-
exc_type: Optional[Type[Exception]],
36-
exc: Optional[Exception],
37-
tb: Optional[TracebackType],
51+
exc_type: Optional[Type[BaseException]],
52+
exc_val: Optional[BaseException],
53+
exc_tb: Optional[TracebackType],
3854
) -> None:
3955
pass

0 commit comments

Comments
 (0)