Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow-core/docs/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,13 @@ Here are some examples that could cause such an event:
- A DAG run timeout, specified by ``dagrun_timeout`` in the DAG's definition.
- An Airflow worker running out of memory
- Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens.

Lingering task supervisor processes
-----------------------------------

Under very high concurrency the socket handlers inside the task supervisor may
miss the final EOF events from the task process. When this occurs the supervisor
believes sockets are still open and will not exit. The
:ref:`workers.socket_cleanup_timeout <config:workers__socket_cleanup_timeout>` option controls how long the supervisor
waits after the task finishes before force-closing any remaining sockets. If you
observe leftover ``supervisor`` processes, consider increasing this delay.
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,15 @@ workers:
type: float
example: ~
default: "90.0"
socket_cleanup_timeout:
description: |
Number of seconds to wait after a task process exits before forcibly closing any
remaining communication sockets. This helps prevent the task supervisor from hanging
indefinitely due to missed EOF signals.
version_added: 3.0.2
type: float
example: ~
default: "60.0"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
53 changes: 53 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
MIN_HEARTBEAT_INTERVAL: int = conf.getint("workers", "min_heartbeat_interval")
MAX_FAILED_HEARTBEATS: int = conf.getint("workers", "max_failed_heartbeats")

SOCKET_CLEANUP_TIMEOUT: float = conf.getfloat("workers", "socket_cleanup_timeout")

SERVER_TERMINATED = "SERVER_TERMINATED"

Expand Down Expand Up @@ -357,6 +358,13 @@ def exit(n: int) -> NoReturn:
sys.stderr.flush()
with suppress(ValueError, OSError):
last_chance_stderr.flush()

# Explicitly close the child-end of our supervisor sockets so
# the parent sees EOF on both "requests" and "logs" channels.
with suppress(OSError):
os.close(log_fd)
with suppress(OSError):
os.close(child_stdin.fileno())
os._exit(n)

if hasattr(atexit, "_clear"):
Expand Down Expand Up @@ -429,6 +437,8 @@ class WatchedSubprocess:

_num_open_sockets: int = 4
_exit_code: int | None = attrs.field(default=None, init=False)
_process_exit_monotonic: float | None = attrs.field(default=None, init=False)
_fd_to_socket_type: dict[int, str] = attrs.field(factory=dict, init=False)

selector: selectors.BaseSelector = attrs.field(factory=selectors.DefaultSelector, repr=False)

Expand Down Expand Up @@ -513,6 +523,14 @@ def _register_pipe_readers(self, stdout: socket, stderr: socket, requests: socke
# alternatives are used automatically) -- this is a way of having "event-based" code, but without
# needing full async, to read and process output from each socket as it is received.

# Track socket types for debugging
self._fd_to_socket_type = {
stdout.fileno(): "stdout",
stderr.fileno(): "stderr",
requests.fileno(): "requests",
logs.fileno(): "logs",
}

target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
if self.subprocess_logs_to_stdout:
target_loggers += (log,)
Expand Down Expand Up @@ -599,6 +617,28 @@ def _close_unused_sockets(*sockets):
sock._sock.close()
sock.close()

def _cleanup_open_sockets(self):
"""Force-close any sockets that never reported EOF."""
# In extremely busy environments the selector can fail to deliver a
# final read event before the subprocess exits. Without closing these
# sockets the supervisor would wait forever thinking they are still
# active. This cleanup ensures we always release resources and exit.
stuck_sockets = []
for key in list(self.selector.get_map().values()):
socket_type = self._fd_to_socket_type.get(key.fd, f"unknown-{key.fd}")
stuck_sockets.append(f"{socket_type}({key.fd})")
with suppress(Exception):
self.selector.unregister(key.fileobj)
with suppress(Exception):
key.fileobj.close() # type: ignore[union-attr]

if stuck_sockets:
log.warning("Force-closed stuck sockets", pid=self.pid, sockets=stuck_sockets)

self.selector.close()
self._close_unused_sockets(self.stdin)
self._num_open_sockets = 0

def kill(
self,
signal_to_send: signal.Signals = signal.SIGINT,
Expand Down Expand Up @@ -732,6 +772,7 @@ def _check_subprocess_exit(
if raise_on_timeout:
raise
else:
self._process_exit_monotonic = time.monotonic()
self._close_unused_sockets(self.stdin)
# Put a message in the viewable task logs

Expand Down Expand Up @@ -905,6 +946,18 @@ def _monitor_subprocess(self):
# This listens for activity (e.g., subprocess output) on registered file objects
alive = self._service_subprocess(max_wait_time=max_wait_time) is None

if self._exit_code is not None and self._num_open_sockets > 0:
if (
self._process_exit_monotonic
and time.monotonic() - self._process_exit_monotonic > SOCKET_CLEANUP_TIMEOUT
):
log.debug(
"Forcefully closing remaining sockets",
open_sockets=self._num_open_sockets,
pid=self.pid,
)
self._cleanup_open_sockets()

if alive:
# We don't need to heartbeat if the process has shutdown, as we are just finishing of reading the
# logs
Expand Down
7 changes: 7 additions & 0 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
from contextlib import suppress
from datetime import datetime, timezone
from io import FileIO
from itertools import product
Expand Down Expand Up @@ -1301,6 +1302,12 @@ def main():
log = structlog.get_logger(logger_name="task")
log.exception("Top level error")
exit(1)
finally:
# Ensure the request socket is closed on the child side in all circumstances
# before the process fully terminates.
if SUPERVISOR_COMMS and SUPERVISOR_COMMS.request_socket:
with suppress(Exception):
SUPERVISOR_COMMS.request_socket.close()


if __name__ == "__main__":
Expand Down
38 changes: 38 additions & 0 deletions task-sdk/tests/task_sdk/execution_time/test_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,44 @@ def subprocess_main():
} in cap_structlog
assert rc == -signal_to_raise

@pytest.mark.execution_timeout(3)
def test_cleanup_sockets_after_delay(self, monkeypatch, mocker, time_machine):
"""Supervisor should close sockets if EOF events are missed."""

monkeypatch.setattr("airflow.sdk.execution_time.supervisor.SOCKET_CLEANUP_TIMEOUT", 1.0)

mock_process = mocker.Mock(pid=12345)

time_machine.move_to(time.monotonic(), tick=False)

proc = ActivitySubprocess(
process_log=mocker.MagicMock(),
id=TI_ID,
pid=mock_process.pid,
stdin=mocker.MagicMock(),
client=mocker.MagicMock(),
process=mock_process,
requests_fd=-1,
)

proc.selector = mocker.MagicMock()
proc.selector.select.return_value = []

proc._exit_code = 0
proc._num_open_sockets = 1
proc._process_exit_monotonic = time.monotonic()

mocker.patch.object(
ActivitySubprocess,
"_cleanup_open_sockets",
side_effect=lambda: setattr(proc, "_num_open_sockets", 0),
)

time_machine.shift(2)

proc._monitor_subprocess()
assert proc._num_open_sockets == 0


class TestWatchedSubprocessKill:
@pytest.fixture
Expand Down