Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions vllm/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ class EngineClient(ABC):
def is_running(self) -> bool:
...

@property
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this property is not sure anywhere

@abstractmethod
def is_stopped(self) -> bool:
...

@property
@abstractmethod
def errored(self) -> bool:
Expand Down
44 changes: 12 additions & 32 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
is_valid_ipv6_address, kill_process_tree, set_ulimit)
is_valid_ipv6_address, set_ulimit)
from vllm.version import __version__ as VLLM_VERSION

TIMEOUT_KEEP_ALIVE = 5 # seconds
Expand Down Expand Up @@ -133,32 +133,21 @@ async def build_async_engine_client_from_engine_args(
Returns the Client or None if the creation failed.
"""

# Fall back
# TODO: fill out feature matrix.
# AsyncLLMEngine.
if (MQLLMEngineClient.is_unsupported_config(engine_args)
or envs.VLLM_USE_V1 or disable_frontend_multiprocessing):
engine_config = engine_args.create_engine_config(
UsageContext.OPENAI_API_SERVER)
uses_ray = getattr(AsyncLLMEngine._get_executor_cls(engine_config),
"uses_ray", False)

build_engine = partial(AsyncLLMEngine.from_engine_args,
engine_args=engine_args,
engine_config=engine_config,
usage_context=UsageContext.OPENAI_API_SERVER)
if uses_ray:
# Must run in main thread with ray for its signal handlers to work
engine_client = build_engine()
else:
engine_client = await asyncio.get_running_loop().run_in_executor(
None, build_engine)

yield engine_client
if hasattr(engine_client, "shutdown"):
engine_client.shutdown()
return
engine_client: Optional[EngineClient] = None
try:
engine_client = AsyncLLMEngine.from_engine_args(
engine_args=engine_args,
usage_context=UsageContext.OPENAI_API_SERVER)
yield engine_client
finally:
if engine_client and hasattr(engine_client, "shutdown"):
engine_client.shutdown()

# Otherwise, use the multiprocessing AsyncLLMEngine.
# MQLLMEngine.
else:
if "PROMETHEUS_MULTIPROC_DIR" not in os.environ:
# Make TemporaryDirectory for prometheus multiprocessing
Expand Down Expand Up @@ -737,15 +726,6 @@ def signal_handler(*_) -> None:

signal.signal(signal.SIGTERM, signal_handler)

# The child processes will send SIGQUIT to this process when
# any error happens. This process then clean up the whole tree.
# TODO(rob): move this into AsyncLLM.__init__ once we remove
# the context manager below.
def sigquit_handler(signum, frame):
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

async with build_async_engine_client(args) as engine_client:
app = build_app(args)

Expand Down
30 changes: 24 additions & 6 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import os
import signal
from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union

from vllm.config import ModelConfig, VllmConfig
Expand All @@ -16,6 +18,7 @@
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.transformers_utils.tokenizer_group import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.utils import kill_process_tree
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.detokenizer import Detokenizer
from vllm.v1.engine.processor import Processor
Expand All @@ -24,6 +27,10 @@
logger = init_logger(__name__)


class EngineDeadError(RuntimeError):
pass


class AsyncLLM(EngineClient):

def __init__(
Expand All @@ -38,6 +45,21 @@ def __init__(
log_requests: bool = True,
start_engine_loop: bool = True,
) -> None:

# Flag for API server to know if we should shutdown.
self._errored = False

# The child processes will send SIGQUIT when unrecoverable
# errors happen. We kill the process tree here so that the
# stack trace is very evident.
def sigquit_handler(signum, frame):
logger.fatal(
"AsyncLLM got SIGQUIT from worker processes, shutting "
"down. See stack trace above for root cause issue.")
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

assert start_engine_loop

self.log_requests = log_requests
Expand Down Expand Up @@ -241,7 +263,7 @@ async def generate(
# If the request is disconnected by the client, the
# generate() task will be canceled. So, we abort the
# request if we end up here.
except asyncio.CancelledError:
except asyncio.exceptions.CancelledError:
await self.abort(request_id)
raise

Expand Down Expand Up @@ -340,14 +362,10 @@ async def stop_profile(self) -> None:
def is_running(self) -> bool:
return True

@property
def is_stopped(self) -> bool:
return False

@property
def errored(self) -> bool:
return False

@property
def dead_error(self) -> BaseException:
return Exception() # TODO: implement
return Exception()
Loading