Skip to content

Commit b9f9aa5

Browse files
committed
[v3-0-test] Fix Pydantic ForwardRef error by reordering discriminated union definitions (#50688)
In some rare cases, the DAG parse subprocess (`DagFileProcessorProcess`) could raise a `PydanticUserError` related to unresolved ForwardRefs in the discriminated union ToDagProcessor. This was caused by `TypeAdapter[ToDagProcessor]` being instantiated before `DagFileParseRequest` was defined. While this is not always reproducible, it can happen in forked subprocesses depending on import order. This change moves the union definitions (`ToDagProcessor`, `ToManager`) after the relevant Pydantic models are declared, ensuring all references are fully resolved at definition time. closes #50530 (cherry picked from commit 672ec99) Co-authored-by: Kaxil Naik <[email protected]>
1 parent f103f89 commit b9f9aa5

File tree

1 file changed

+36
-35
lines changed

1 file changed

+36
-35
lines changed

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,47 @@
5454
from airflow.sdk.definitions.context import Context
5555
from airflow.typing_compat import Self
5656

57+
58+
class DagFileParseRequest(BaseModel):
59+
"""
60+
Request for DAG File Parsing.
61+
62+
This is the request that the manager will send to the DAG parser with the dag file and
63+
any other necessary metadata.
64+
"""
65+
66+
file: str
67+
68+
bundle_path: Path
69+
"""Passing bundle path around lets us figure out relative file path."""
70+
71+
requests_fd: int
72+
callback_requests: list[CallbackRequest] = Field(default_factory=list)
73+
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
74+
75+
76+
class DagFileParsingResult(BaseModel):
77+
"""
78+
Result of DAG File Parsing.
79+
80+
This is the result of a successful DAG parse, in this class, we gather all serialized DAGs,
81+
import errors and warnings to send back to the scheduler to store in the DB.
82+
"""
83+
84+
fileloc: str
85+
serialized_dags: list[LazyDeserializedDAG]
86+
warnings: list | None = None
87+
import_errors: dict[str, str] | None = None
88+
type: Literal["DagFileParsingResult"] = "DagFileParsingResult"
89+
90+
5791
ToManager = Annotated[
58-
Union["DagFileParsingResult", GetConnection, GetVariable, PutVariable, DeleteVariable],
92+
Union[DagFileParsingResult, GetConnection, GetVariable, PutVariable, DeleteVariable],
5993
Field(discriminator="type"),
6094
]
6195

6296
ToDagProcessor = Annotated[
63-
Union["DagFileParseRequest", ConnectionResult, VariableResult, ErrorResponse, OKResponse],
97+
Union[DagFileParseRequest, ConnectionResult, VariableResult, ErrorResponse, OKResponse],
6498
Field(discriminator="type"),
6599
]
66100

@@ -182,39 +216,6 @@ def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: Fil
182216
Stats.incr("dag.callback_exceptions", tags={"dag_id": request.dag_id})
183217

184218

185-
class DagFileParseRequest(BaseModel):
186-
"""
187-
Request for DAG File Parsing.
188-
189-
This is the request that the manager will send to the DAG parser with the dag file and
190-
any other necessary metadata.
191-
"""
192-
193-
file: str
194-
195-
bundle_path: Path
196-
"""Passing bundle path around lets us figure out relative file path."""
197-
198-
requests_fd: int
199-
callback_requests: list[CallbackRequest] = Field(default_factory=list)
200-
type: Literal["DagFileParseRequest"] = "DagFileParseRequest"
201-
202-
203-
class DagFileParsingResult(BaseModel):
204-
"""
205-
Result of DAG File Parsing.
206-
207-
This is the result of a successful DAG parse, in this class, we gather all serialized DAGs,
208-
import errors and warnings to send back to the scheduler to store in the DB.
209-
"""
210-
211-
fileloc: str
212-
serialized_dags: list[LazyDeserializedDAG]
213-
warnings: list | None = None
214-
import_errors: dict[str, str] | None = None
215-
type: Literal["DagFileParsingResult"] = "DagFileParsingResult"
216-
217-
218219
def in_process_api_server() -> InProcessExecutionAPI:
219220
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
220221

0 commit comments

Comments
 (0)