23
23
from vllm .logger import init_logger
24
24
from vllm .lora .request import LoRARequest
25
25
from vllm .tasks import SupportedTask
26
- from vllm .utils import (cancel_task_threadsafe , get_open_port ,
27
- get_open_zmq_inproc_path , make_zmq_socket )
26
+ from vllm .utils import (close_sockets , get_open_port , get_open_zmq_inproc_path ,
27
+ in_loop , make_zmq_socket )
28
28
from vllm .v1 .engine import (EngineCoreOutputs , EngineCoreRequest ,
29
29
EngineCoreRequestType ,
30
30
ReconfigureDistributedRequest , ReconfigureRankType ,
@@ -317,7 +317,7 @@ class BackgroundResources:
317
317
"""Used as a finalizer for clean shutdown, avoiding
318
318
circular reference back to the client object."""
319
319
320
- ctx : Union [ zmq .Context ]
320
+ ctx : zmq .Context
321
321
# If CoreEngineProcManager, it manages local engines;
322
322
# if CoreEngineActorManager, it manages all engines.
323
323
engine_manager : Optional [Union [CoreEngineProcManager ,
@@ -326,6 +326,8 @@ class BackgroundResources:
326
326
output_socket : Optional [Union [zmq .Socket , zmq .asyncio .Socket ]] = None
327
327
input_socket : Optional [Union [zmq .Socket , zmq .asyncio .Socket ]] = None
328
328
first_req_send_socket : Optional [zmq .asyncio .Socket ] = None
329
+ first_req_rcv_socket : Optional [zmq .asyncio .Socket ] = None
330
+ stats_update_socket : Optional [zmq .asyncio .Socket ] = None
329
331
output_queue_task : Optional [asyncio .Task ] = None
330
332
stats_update_task : Optional [asyncio .Task ] = None
331
333
shutdown_path : Optional [str ] = None
@@ -343,23 +345,47 @@ def __call__(self):
343
345
if self .coordinator is not None :
344
346
self .coordinator .close ()
345
347
346
- cancel_task_threadsafe (self .output_queue_task )
347
- cancel_task_threadsafe (self .stats_update_task )
348
+ if isinstance (self .output_socket , zmq .asyncio .Socket ):
349
+ # Async case.
350
+ loop = self .output_socket ._get_loop ()
351
+ asyncio .get_running_loop ()
352
+ sockets = (self .output_socket , self .input_socket ,
353
+ self .first_req_send_socket , self .first_req_rcv_socket ,
354
+ self .stats_update_socket )
355
+
356
+ tasks = (self .output_queue_task , self .stats_update_task )
357
+
358
+ def close_sockets_and_tasks ():
359
+ close_sockets (sockets )
360
+ for task in tasks :
361
+ if task is not None and not task .done ():
362
+ task .cancel ()
363
+
364
+ if in_loop (loop ):
365
+ close_sockets_and_tasks ()
366
+ elif not loop .is_closed ():
367
+ loop .call_soon_threadsafe (close_sockets_and_tasks )
368
+ else :
369
+ # Loop has been closed, try to clean up directly.
370
+ del tasks
371
+ del close_sockets_and_tasks
372
+ close_sockets (sockets )
373
+ del self .output_queue_task
374
+ del self .stats_update_task
375
+ else :
376
+ # Sync case.
348
377
349
- # ZMQ context termination can hang if the sockets
350
- # aren't explicitly closed first.
351
- for socket in (self .output_socket , self .input_socket ,
352
- self .first_req_send_socket ):
353
- if socket is not None :
354
- socket .close (linger = 0 )
378
+ # ZMQ context termination can hang if the sockets
379
+ # aren't explicitly closed first.
380
+ close_sockets ((self .output_socket , self .input_socket ))
355
381
356
- if self .shutdown_path is not None :
357
- # We must ensure that the sync output socket is
358
- # closed cleanly in its own thread.
359
- with self .ctx .socket (zmq .PAIR ) as shutdown_sender :
360
- shutdown_sender .connect (self .shutdown_path )
361
- # Send shutdown signal.
362
- shutdown_sender .send (b'' )
382
+ if self .shutdown_path is not None :
383
+ # We must ensure that the sync output socket is
384
+ # closed cleanly in its own thread.
385
+ with self .ctx .socket (zmq .PAIR ) as shutdown_sender :
386
+ shutdown_sender .connect (self .shutdown_path )
387
+ # Send shutdown signal.
388
+ shutdown_sender .send (b'' )
363
389
364
390
def validate_alive (self , frames : Sequence [zmq .Frame ]):
365
391
if len (frames ) == 1 and (frames [0 ].buffer
@@ -969,14 +995,19 @@ def _ensure_stats_update_task(self):
969
995
self .engine_ranks_managed [- 1 ] + 1 )
970
996
971
997
async def run_engine_stats_update_task ():
972
- with make_zmq_socket (self .ctx , self .stats_update_address ,
973
- zmq .XSUB ) as socket , make_zmq_socket (
974
- self .ctx ,
975
- self .first_req_sock_addr ,
976
- zmq .PAIR ,
977
- bind = False ) as first_req_rcv_socket :
998
+ with (make_zmq_socket (self .ctx ,
999
+ self .stats_update_address ,
1000
+ zmq .XSUB ,
1001
+ linger = 0 ) as socket ,
1002
+ make_zmq_socket (self .ctx ,
1003
+ self .first_req_sock_addr ,
1004
+ zmq .PAIR ,
1005
+ bind = False ,
1006
+ linger = 0 ) as first_req_rcv_socket ):
978
1007
assert isinstance (socket , zmq .asyncio .Socket )
979
1008
assert isinstance (first_req_rcv_socket , zmq .asyncio .Socket )
1009
+ self .resources .stats_update_socket = socket
1010
+ self .resources .first_req_rcv_socket = first_req_rcv_socket
980
1011
# Send subscription message.
981
1012
await socket .send (b'\x01 ' )
982
1013
0 commit comments