Skip to content

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented May 13, 2025

closes #50097
closes #49887

Previously, each DagFileProcessorProcess created its own InProcessExecutionAPI client instance, leading to unnecessary thread creation and memory build up.

This commit ensures that a single Client backed by InProcessExecutionAPI is created and owned by DagFileProcessorManager, and passed into all DAG file processor subprocesses.

Reproduction

Use following DAG:

from airflow.sdk import Variable, dag, task

@dag
def xample_simplest_dag():

    Variable.get("my_variable", "d")

    @task
    def my_task():
        print("hellooo")

    my_task()


xample_simplest_dag()

Run the dag-processor with following command for quicker reproduction:

AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL=2 \
AIRFLOW__CORE__LOAD_EXAMPLES=False \
AIRFLOW__DAG_PROCESSOR__DAG_FILE_PROCESSOR_TIMEOUT=2 \
AIRFLOW__DAG_PROCESSOR__BUNDLE_REFRESH_CHECK_INTERVAL=1 \
airflow dag-processor

Check the number of threads grow as the dag is parsed:
thread-leak

Add following diff to check in logs:

diff --git a/task-sdk/src/airflow/sdk/api/client.py b/task-sdk/src/airflow/sdk/api/client.py
index 5f76e8360b..cbee5c2609 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -102,6 +102,11 @@ __all__ = [
 def get_json_error(response: httpx.Response):
     """Raise a ServerResponseError if we can extract error info from the error."""
     err = ServerResponseError.from_response(response)
+    import os;
+    import threading
+    log.exception(f"Total threads at runtime: {len(threading.enumerate())}")
+    for t in threading.enumerate():
+        log.exception(f"  - {t.name}")
     if err:
         log.warning("Server error", detail=err.detail)
         raise err

May-14-2025 00-13-53

Another easy way to reproduce, thanks to @tirkarthi in #49887 (comment)

Command:

watch -n 2 "pgrep -f dag-processor | xargs -I{} ls -l /proc/{}/fd/ 2>/dev/null | grep -i socket | wc -l"

May-14-2025 00-35-05


Before:

image

After:

image

In a follow-up, I will add some cleanup to InProcessExecutionAPI itself so our test have no side-effects when used with dag.test.

The Triggerer is the other thing that uses it but because it is a long-running process that doesn't spawn any other processes like DAG File processor, it remains unaffected.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@jedcunningham jedcunningham added backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch type:bug-fix Changelog: Bug Fixes labels May 13, 2025
@kaxil kaxil force-pushed the fix-inprocess-thread branch 2 times, most recently from e5a74f0 to fb7c183 Compare May 13, 2025 19:28
closes apache#50097
closes apache#49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
@kaxil kaxil force-pushed the fix-inprocess-thread branch from fb7c183 to 7802104 Compare May 13, 2025 19:37
@kaxil kaxil merged commit 3dc597f into apache:main May 13, 2025
48 of 50 checks passed
@kaxil kaxil deleted the fix-inprocess-thread branch May 13, 2025 20:10
@kaxil
Copy link
Member Author

kaxil commented May 13, 2025

Doc failures are unrelated

github-actions bot pushed a commit that referenced this pull request May 13, 2025
closes #50097
closes #49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
(cherry picked from commit 3dc597f)

Co-authored-by: Kaxil Naik <[email protected]>
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

kaxil added a commit that referenced this pull request May 13, 2025
closes #50097
closes #49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
(cherry picked from commit 3dc597f)

Co-authored-by: Kaxil Naik <[email protected]>
github-actions bot pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request May 13, 2025
closes apache#50097
closes apache#49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
(cherry picked from commit 3dc597f)

Co-authored-by: Kaxil Naik <[email protected]>
kaxil added a commit that referenced this pull request May 14, 2025
closes #50097
closes #49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
(cherry picked from commit 3dc597f)

Co-authored-by: Kaxil Naik <[email protected]>
kaxil added a commit that referenced this pull request Jun 3, 2025
closes #50097
closes #49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
(cherry picked from commit 3dc597f)

Co-authored-by: Kaxil Naik <[email protected]>
sanederchik pushed a commit to sanederchik/airflow that referenced this pull request Jun 7, 2025
closes apache#50097
closes apache#49887

Previously, each `DagFileProcessorProcess` created its own `InProcessExecutionAPI`
client instance, leading to unnecessary thread creation and resource use.

This commit ensures that a single `Client` backed by `InProcessExecutionAPI` is created
and owned by `DagFileProcessorManager`, and passed into all DAG file processor subprocesses.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:DAG-processing backport-to-v3-0-test Mark PR with this label to backport to v3-0-test branch type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory leak in dag-processor after upgrade to airflow 3.0 Airflow dag processor exits with too many open files after sometime
3 participants