Skip to content

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Jul 28, 2025

Resolves intermittent DetachedInstanceError when scheduler processes task instances that have timed out during heartbeat detection. The error occurred when Pydantic validation of TIRunContext attempted to access the consumed_asset_events relationship on DagRun objects that had been detached from the SQLAlchemy session.

The Problem:

  1. Scheduler main loop loads TaskInstances with selectinload(TI.dag_run) (missing consumed_asset_events)
  2. session.expunge_all() is called, detaching all objects from the session
  3. Detached objects remain in memory and are somehow passed to subsequent operations
  4. Timer callback operations receive these detached objects through unknown object sharing mechanisms
  5. TIRunContext creation triggers Pydantic validation that accesses dag_run.consumed_asset_events
  6. SQLAlchemy attempts lazy loading on the detached object and throws DetachedInstanceError

Key Evidence:

  • Testing confirmed that accessing consumed_asset_events on detached DagRun objects reliably reproduces the error
  • The timer callback creates its own fresh session, but somehow receives references to objects detached in previous sessions
  • This suggests object sharing through scheduler instance variables, callback queues, executor state, or other global mechanisms

Why It's Intermittent:

  • Depends on precise timing between session.expunge_all() and subsequent object access
  • Only occurs when detached objects are passed to operations that trigger lazy loading
  • Race conditions in the scheduler's concurrent environment (executor events, heartbeats, timer callbacks)

Solution

Add minimal eager loading with selectinload(DagRun.consumed_asset_events) to the heartbeat timeout query. This ensures the relationship is loaded before objects can be detached, eliminating the need for lazy loading.

Why This Fix Works:

  • Avoids lazy loading entirely by pre-loading the required relationship
  • Eliminates dependency on consistent session state in concurrent scheduler operations
  • Minimal performance impact - only loads the specific relationship needed for TIRunContext

Verification Steps for Reviewers

To verify the root cause and validate the fix, run these tests in an iPython shell:

Test 1: Verify DetachedInstanceError on expunged objects

from airflow.utils.session import create_session
from airflow.models.taskinstance import TaskInstance as TI
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.inspection import inspect

# Load object and expunge it (simulating scheduler behavior)
with create_session() as session:
    ti = session.scalars(
        select(TI)
        .options(selectinload(TI.dag_run))  # Pre-fix: missing consumed_asset_events
        .limit(1)
    ).first()

    if ti:
        dagrun_ref = ti.dag_run
        print(f"DagRun bound to session: {inspect(dagrun_ref).session is not None}")

        # Simulate scheduler's expunge_all
        session.expunge_all()
        print(f"After expunge - DagRun bound: {inspect(dagrun_ref).session is not None}")

        # Test accessing consumed_asset_events on detached object
        try:
            events = dagrun_ref.consumed_asset_events
            print(f"SUCCESS: {events}")
        except Exception as e:
            print(f"ERROR: {type(e).__name__}: {e}")
            print("^ This is the DetachedInstanceError that causes the scheduler failure!")

Expected Result: Should show DetachedInstanceError when accessing consumed_asset_events on the detached object.

Test 2: Verify the fix prevents the error

from airflow.utils.session import create_session
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.dagrun import DagRun as DR
from sqlalchemy import select
from sqlalchemy.orm import selectinload

# Load with the fix (eager loading consumed_asset_events)
with create_session() as session:
    ti = session.scalars(
        select(TI)
        .options(selectinload(TI.dag_run).selectinload(DR.consumed_asset_events))  # With fix
        .limit(1)
    ).first()

    if ti:
        dagrun_ref = ti.dag_run

        # Simulate scheduler's expunge_all
        session.expunge_all()

        # Test accessing consumed_asset_events on detached object
        try:
            events = dagrun_ref.consumed_asset_events
            print(f"SUCCESS with fix: {events}")
        except Exception as e:
            print(f"ERROR with fix: {type(e).__name__}: {e}")

Expected Result: Should show SUCCESS because the relationship was eagerly loaded before detachment.

Test 3: Verify scoped session reuse (explains contamination mechanism)

from airflow.utils.session import create_session

# Create two sessions to verify they reuse the same object (thread-local scoping)
with create_session() as session1:
    session1_id = id(session1)

with create_session() as session2:
    session2_id = id(session2)

print(f"Session 1 ID: {session1_id}")
print(f"Session 2 ID: {session2_id}")
print(f"Same session object reused: {session1_id == session2_id}")

Expected Result: Should show True for session reuse, confirming thread-local scoping that enables object contamination.

Testing Strategy

Why no new automated test added:

  • Existing tests validate the fix: test_scheduler_passes_context_from_server_on_heartbeat_timeout
  • Intermittent nature makes testing non-trivial: Bug depends on complex timing between session.expunge_all() and object access across concurrent scheduler operations
  • Would require extensive mocking: Test would need to simulate the exact object sharing mechanism causing detached object references, providing limited real-world validation
  • Manual verification steps above provide sufficient validation for reviewers

Future Considerations

Long-term architectural improvement: Migrate to back_populates with lazy="selectin" to eliminate this class of issues entirely:

# AssetEvent
created_dagruns = relationship("DagRun", back_populates="consumed_asset_events", lazy="selectin")

# DagRun
consumed_asset_events = relationship("AssetEvent", back_populates="created_dagruns", lazy="selectin")

This would prevent similar DetachedInstanceError issues across the codebase by making the relationship always eagerly loaded.

References:

Additional Context

This affects all DAG types (not just asset-triggered) since consumed_asset_events is initialized as empty list on all DagRun objects during creation in _create_orm_dagrun().

The fix uses selectinload (vs joinedload) because the heartbeat query can return multiple TaskInstances, making selectinload more efficient for bulk operations.

Resolves `DetachedInstanceError` when scheduler processes task instances that have
timed out during heartbeat detection. The error occurred when Pydantic validation
of `TIRunContext` attempted to access the consumed_asset_events relationship on
`DagRun` objects that had been detached from the `SQLAlchemy` session.

Root cause: The main scheduler loop calls `session.expunge_all()` which detaches
all objects from the session. Later, when processing heartbeat timeouts, the
scheduler creates `TIRunContext` objects that trigger Pydantic validation of
`dag_run.consumed_asset_events`, causing `DetachedInstanceError` on the lazy-loaded
relationship.

Solution: Add `selectinload(DagRun.consumed_asset_events)` to the heartbeat timeout
query to eagerly load the relationship before objects are detached. This minimal
fix loads only the required relationship without over-eager loading of nested
fields that aren't accessed during heartbeat processing.

The fix affects all DAG types since consumed_asset_events is initialized as an
empty list on all DagRun objects, not just asset-triggered DAGs.

Longer term using `back_populates` (with `lazy="selectin"`) might be better so we don't need to remember this:
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html
https://docs.sqlalchemy.org/en/20/orm/relationship_api.html#sqlalchemy.orm.relationship.params.back_populates
@kaxil kaxil requested review from ashb and XD-DENG as code owners July 28, 2025 23:21
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jul 28, 2025
@kaxil kaxil merged commit 9458053 into apache:main Jul 29, 2025
101 of 103 checks passed
@kaxil kaxil deleted the fix-detached-bug branch July 29, 2025 06:48
github-actions bot pushed a commit that referenced this pull request Jul 29, 2025
…nstanceError`` (#53838)

(cherry picked from commit 9458053)

Co-authored-by: Kaxil Naik <[email protected]>
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Jul 31, 2025
jedcunningham pushed a commit that referenced this pull request Aug 1, 2025
…nstanceError`` (#53838) (#53858)

(cherry picked from commit 9458053)

Co-authored-by: Kaxil Naik <[email protected]>
ferruzzi pushed a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Aug 7, 2025
kaxil added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
Similar to apache#53838 but prevents it for all queries needing `consumed_asset_events`.

Instead of adding `.selectinload(DR.consumed_asset_events))` wherever needed, I am eagerly loading them now.

Changes:
- Add lazy='selectin' to `DagRun.consumed_asset_events` relationship for always-eager loading
- Changed `backref` to `back_populates` in `AssetEvent.created_dagruns` to enable explicit control

Why This Fix Works:
- Eliminates lazy loading entirely by pre-loading the relationship at the model level
- Prevents dependency on consistent session state in concurrent scheduler operations

Closes apache#54306
kaxil added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
Similar to apache#53838 but prevents it for all queries needing `consumed_asset_events`.

Instead of adding `.selectinload(DR.consumed_asset_events))` wherever needed, I am eagerly loading them now.

Changes:
- Add lazy='selectin' to `DagRun.consumed_asset_events` relationship for always-eager loading
- Changed `backref` to `back_populates` in `AssetEvent.created_dagruns` to enable explicit control

Why This Fix Works:
- Eliminates lazy loading entirely by pre-loading the relationship at the model level
- Prevents dependency on consistent session state in concurrent scheduler operations

Closes apache#54306
kaxil added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
Similar to apache#53838 but prevents it for all queries needing `consumed_asset_events`.

Instead of adding `.selectinload(DR.consumed_asset_events))` wherever needed, I am eagerly loading them now.

Changes:
- Add lazy='selectin' to `DagRun.consumed_asset_events` relationship for always-eager loading
- Changed `backref` to `back_populates` in `AssetEvent.created_dagruns` to enable explicit control

Why This Fix Works:
- Eliminates lazy loading entirely by pre-loading the relationship at the model level
- Prevents dependency on consistent session state in concurrent scheduler operations

Closes apache#54306
kaxil added a commit to astronomer/airflow that referenced this pull request Aug 11, 2025
Similar to apache#53838 and alternative for apache#54331

This is a more localized change and only eagerly loads for this specific instance.

Closes apache#54306
fweilun pushed a commit to fweilun/airflow that referenced this pull request Aug 11, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants