Skip to content

Commit eca70d0

Browse files
kaxilashb
authored andcommitted
Fix lingering task supervisors when EOF is missed (#51180)
closes #50500 Adds a new safeguard for cases where the task subprocess closes before all pipe sockets send EOF. The supervisor now records the process exit time and forcibly closes any sockets still open after `workers.socket_cleanup_timeout`. This stops the supervisor loop from hanging indefinitely and allows the process to exit cleanly. (cherry picked from commit a2651f1)
1 parent 0daf753 commit eca70d0

File tree

5 files changed

+117
-0
lines changed

5 files changed

+117
-0
lines changed

airflow-core/docs/troubleshooting.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,13 @@ Here are some examples that could cause such an event:
4646
- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
4747
- An Airflow worker running out of memory
4848
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens.
49+
50+
Lingering task supervisor processes
51+
-----------------------------------
52+
53+
Under very high concurrency the socket handlers inside the task supervisor may
54+
miss the final EOF events from the task process. When this occurs the supervisor
55+
believes sockets are still open and will not exit. The
56+
:ref:`workers.socket_cleanup_timeout <config:workers__socket_cleanup_timeout>` option controls how long the supervisor
57+
waits after the task finishes before force-closing any remaining sockets. If you
58+
observe leftover ``supervisor`` processes, consider increasing this delay.

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,6 +1525,15 @@ workers:
15251525
type: float
15261526
example: ~
15271527
default: "90.0"
1528+
socket_cleanup_timeout:
1529+
description: |
1530+
Number of seconds to wait after a task process exits before forcibly closing any
1531+
remaining communication sockets. This helps prevent the task supervisor from hanging
1532+
indefinitely due to missed EOF signals.
1533+
version_added: 3.0.2
1534+
type: float
1535+
example: ~
1536+
default: "60.0"
15281537
api_auth:
15291538
description: Settings relating to authentication on the Airflow APIs
15301539
options:

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
131131
MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")
132132

133+
SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout")
133134

134135
SERVER_TERMINATED = "SERVER_TERMINATED"
135136

@@ -357,6 +358,13 @@ def exit(n: int) -> NoReturn:
357358
sys.stderr.flush()
358359
with suppress(ValueError, OSError):
359360
last_chance_stderr.flush()
361+
362+
# Explicitly close the child-end of our supervisor sockets so
363+
# the parent sees EOF on both "requests" and "logs" channels.
364+
with suppress(OSError):
365+
os.close(log_fd)
366+
with suppress(OSError):
367+
os.close(child_stdin.fileno())
360368
os._exit(n)
361369

362370
if hasattr(atexit, "_clear"):
@@ -429,6 +437,8 @@ class WatchedSubprocess:
429437

430438
_num_open_sockets: int = 4
431439
_exit_code: int | None = attrs.field(default=None, init=False)
440+
_process_exit_monotonic: float | None = attrs.field(default=None, init=False)
441+
_fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False)
432442

433443
selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector, repr=False)
434444

@@ -513,6 +523,14 @@ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socke
513523
# alternatives are used automatically) -- this is a way of having "event-based" code, but without
514524
# needing full async, to read and process output from each socket as it is received.
515525

526+
# Track socket types for debugging
527+
self._fd_to_socket_type = {
528+
stdout.fileno(): "stdout",
529+
stderr.fileno(): "stderr",
530+
requests.fileno(): "requests",
531+
logs.fileno(): "logs",
532+
}
533+
516534
target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
517535
if self.subprocess_logs_to_stdout:
518536
target_loggers += (log,)
@@ -599,6 +617,28 @@ def _close_unused_sockets(*sockets):
599617
sock._sock.close()
600618
sock.close()
601619

620+
def _cleanup_open_sockets(self):
621+
"""Force-close any sockets that never reported EOF."""
622+
# In extremely busy environments the selector can fail to deliver a
623+
# final read event before the subprocess exits. Without closing these
624+
# sockets the supervisor would wait forever thinking they are still
625+
# active. This cleanup ensures we always release resources and exit.
626+
stuck_sockets = []
627+
for key in list(self.selector.get_map().values()):
628+
socket_type = self._fd_to_socket_type.get(key.fd, f"unknown-{key.fd}")
629+
stuck_sockets.append(f"{socket_type}({key.fd})")
630+
with suppress(Exception):
631+
self.selector.unregister(key.fileobj)
632+
with suppress(Exception):
633+
key.fileobj.close() # type: ignore[union-attr]
634+
635+
if stuck_sockets:
636+
log.warning("Force-closed stuck sockets", pid=self.pid, sockets=stuck_sockets)
637+
638+
self.selector.close()
639+
self._close_unused_sockets(self.stdin)
640+
self._num_open_sockets = 0
641+
602642
def kill(
603643
self,
604644
signal_to_send: signal.Signals = signal.SIGINT,
@@ -732,6 +772,7 @@ def _check_subprocess_exit(
732772
if raise_on_timeout:
733773
raise
734774
else:
775+
self._process_exit_monotonic = time.monotonic()
735776
self._close_unused_sockets(self.stdin)
736777
# Put a message in the viewable task logs
737778

@@ -905,6 +946,18 @@ def _monitor_subprocess(self):
905946
# This listens for activity (e.g., subprocess output) on registered file objects
906947
alive = self._service_subprocess(max_wait_time=max_wait_time) is None
907948

949+
if self._exit_code is not None and self._num_open_sockets > 0:
950+
if (
951+
self._process_exit_monotonic
952+
and time.monotonic() - self._process_exit_monotonic > SOCKET_CLEANUP_TIMEOUT
953+
):
954+
log.debug(
955+
"Forcefully closing remaining sockets",
956+
open_sockets=self._num_open_sockets,
957+
pid=self.pid,
958+
)
959+
self._cleanup_open_sockets()
960+
908961
if alive:
909962
# We don't need to heartbeat if the process has shutdown, as we are just finishing of reading the
910963
# logs

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import sys
2727
import time
2828
from collections.abc import Callable, Iterable, Iterator, Mapping
29+
from contextlib import suppress
2930
from datetime import datetime, timezone
3031
from io import FileIO
3132
from itertools import product
@@ -1301,6 +1302,12 @@ def main():
13011302
log = structlog.get_logger(logger_name="task")
13021303
log.exception("Top level error")
13031304
exit(1)
1305+
finally:
1306+
# Ensure the request socket is closed on the child side in all circumstances
1307+
# before the process fully terminates.
1308+
if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket:
1309+
with suppress(Exception):
1310+
SUPERVISOR_COMMS.request_socket.close()
13041311

13051312

13061313
if __name__ == "__main__":

task-sdk/tests/task_sdk/execution_time/test_supervisor.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,44 @@ def subprocess_main():
774774
} in cap_structlog
775775
assert rc == -signal_to_raise
776776

777+
@pytest.mark.execution_timeout(3)
778+
def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine):
779+
"""Supervisor should close sockets if EOF events are missed."""
780+
781+
monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT", 1.0)
782+
783+
mock_process = mocker.Mock(pid=12345)
784+
785+
time_machine.move_to(time.monotonic(), tick=False)
786+
787+
proc = ActivitySubprocess(
788+
process_log=mocker.MagicMock(),
789+
id=TI_ID,
790+
pid=mock_process.pid,
791+
stdin=mocker.MagicMock(),
792+
client=mocker.MagicMock(),
793+
process=mock_process,
794+
requests_fd=-1,
795+
)
796+
797+
proc.selector = mocker.MagicMock()
798+
proc.selector.select.return_value = []
799+
800+
proc._exit_code = 0
801+
proc._num_open_sockets = 1
802+
proc._process_exit_monotonic = time.monotonic()
803+
804+
mocker.patch.object(
805+
ActivitySubprocess,
806+
"_cleanup_open_sockets",
807+
side_effect=lambda: setattr(proc, "_num_open_sockets", 0),
808+
)
809+
810+
time_machine.shift(2)
811+
812+
proc._monitor_subprocess()
813+
assert proc._num_open_sockets == 0
814+
777815

778816
class TestWatchedSubprocessKill:
779817
@pytest.fixture

0 commit comments

Comments
 (0)