diff --git a/airflow-core/docs/troubleshooting.rst b/airflow-core/docs/troubleshooting.rst index f636b87a42c47..f354ea1a2ff6e 100644 --- a/airflow-core/docs/troubleshooting.rst +++ b/airflow-core/docs/troubleshooting.rst @@ -46,3 +46,13 @@ Here are some examples that could cause such an event: - A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition. - An Airflow worker running out of memory - Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens. + +Lingering task supervisor processes +----------------------------------- + +Under very high concurrency the socket handlers inside the task supervisor may +miss the final EOF events from the task process. When this occurs the supervisor +believes sockets are still open and will not exit. The +:ref:`workers.socket_cleanup_timeout ` option controls how long the supervisor +waits after the task finishes before force-closing any remaining sockets. If you +observe leftover ``supervisor`` processes, consider increasing this delay. diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 437b04e771e22..6190f7f306e50 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -1525,6 +1525,15 @@ workers: type: float example: ~ default: "90.0" + socket_cleanup_timeout: + description: | + Number of seconds to wait after a task process exits before forcibly closing any + remaining communication sockets. This helps prevent the task supervisor from hanging + indefinitely due to missed EOF signals. + version_added: 3.0.2 + type: float + example: ~ + default: "60.0" api_auth: description: Settings relating to authentication on the Airflow APIs options: diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index 4c3ed654ec18b..dc63daeef7bb7 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -130,6 +130,7 @@ MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval") MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats") +SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout") SERVER_TERMINATED = "SERVER_TERMINATED" @@ -357,6 +358,13 @@ def exit(n: int) -> NoReturn: sys.stderr.flush() with suppress(ValueError, OSError): last_chance_stderr.flush() + + # Explicitly close the child-end of our supervisor sockets so + # the parent sees EOF on both "requests" and "logs" channels. + with suppress(OSError): + os.close(log_fd) + with suppress(OSError): + os.close(child_stdin.fileno()) os._exit(n) if hasattr(atexit, "_clear"): @@ -429,6 +437,8 @@ class WatchedSubprocess: _num_open_sockets: int = 4 _exit_code: int | None = attrs.field(default=None, init=False) + _process_exit_monotonic: float | None = attrs.field(default=None, init=False) + _fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False) selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector, repr=False) @@ -513,6 +523,14 @@ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socke # alternatives are used automatically) -- this is a way of having "event-based" code, but without # needing full async, to read and process output from each socket as it is received. + # Track socket types for debugging + self._fd_to_socket_type = { + stdout.fileno(): "stdout", + stderr.fileno(): "stderr", + requests.fileno(): "requests", + logs.fileno(): "logs", + } + target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,) if self.subprocess_logs_to_stdout: target_loggers += (log,) @@ -599,6 +617,28 @@ def _close_unused_sockets(*sockets): sock._sock.close() sock.close() + def _cleanup_open_sockets(self): + """Force-close any sockets that never reported EOF.""" + # In extremely busy environments the selector can fail to deliver a + # final read event before the subprocess exits. Without closing these + # sockets the supervisor would wait forever thinking they are still + # active. This cleanup ensures we always release resources and exit. + stuck_sockets = [] + for key in list(self.selector.get_map().values()): + socket_type = self._fd_to_socket_type.get(key.fd, f"unknown-{key.fd}") + stuck_sockets.append(f"{socket_type}({key.fd})") + with suppress(Exception): + self.selector.unregister(key.fileobj) + with suppress(Exception): + key.fileobj.close() # type: ignore[union-attr] + + if stuck_sockets: + log.warning("Force-closed stuck sockets", pid=self.pid, sockets=stuck_sockets) + + self.selector.close() + self._close_unused_sockets(self.stdin) + self._num_open_sockets = 0 + def kill( self, signal_to_send: signal.Signals = signal.SIGINT, @@ -732,6 +772,7 @@ def _check_subprocess_exit( if raise_on_timeout: raise else: + self._process_exit_monotonic = time.monotonic() self._close_unused_sockets(self.stdin) # Put a message in the viewable task logs @@ -905,6 +946,18 @@ def _monitor_subprocess(self): # This listens for activity (e.g., subprocess output) on registered file objects alive = self._service_subprocess(max_wait_time=max_wait_time) is None + if self._exit_code is not None and self._num_open_sockets > 0: + if ( + self._process_exit_monotonic + and time.monotonic() - self._process_exit_monotonic > SOCKET_CLEANUP_TIMEOUT + ): + log.debug( + "Forcefully closing remaining sockets", + open_sockets=self._num_open_sockets, + pid=self.pid, + ) + self._cleanup_open_sockets() + if alive: # We don't need to heartbeat if the process has shutdown, as we are just finishing of reading the # logs diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index 9d75a7250e4b0..33583ffe9c7b8 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -26,6 +26,7 @@ import sys import time from collections.abc import Callable, Iterable, Iterator, Mapping +from contextlib import suppress from datetime import datetime, timezone from io import FileIO from itertools import product @@ -1301,6 +1302,12 @@ def main(): log = structlog.get_logger(logger_name="task") log.exception("Top level error") exit(1) + finally: + # Ensure the request socket is closed on the child side in all circumstances + # before the process fully terminates. + if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket: + with suppress(Exception): + SUPERVISOR_COMMS.request_socket.close() if __name__ == "__main__": diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 4f6341c89da6a..4f5e4cfc7ac9d 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -774,6 +774,44 @@ def subprocess_main(): } in cap_structlog assert rc == -signal_to_raise + @pytest.mark.execution_timeout(3) + def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine): + """Supervisor should close sockets if EOF events are missed.""" + + monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT", 1.0) + + mock_process = mocker.Mock(pid=12345) + + time_machine.move_to(time.monotonic(), tick=False) + + proc = ActivitySubprocess( + process_log=mocker.MagicMock(), + id=TI_ID, + pid=mock_process.pid, + stdin=mocker.MagicMock(), + client=mocker.MagicMock(), + process=mock_process, + requests_fd=-1, + ) + + proc.selector = mocker.MagicMock() + proc.selector.select.return_value = [] + + proc._exit_code = 0 + proc._num_open_sockets = 1 + proc._process_exit_monotonic = time.monotonic() + + mocker.patch.object( + ActivitySubprocess, + "_cleanup_open_sockets", + side_effect=lambda: setattr(proc, "_num_open_sockets", 0), + ) + + time_machine.shift(2) + + proc._monitor_subprocess() + assert proc._num_open_sockets == 0 + class TestWatchedSubprocessKill: @pytest.fixture