From 91ae3a14ace4bed2f78218f5337eeea16ee0f050 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 16 May 2025 16:24:10 +0530 Subject: [PATCH] Fix Pydantic ``ForwardRef`` error by reordering discriminated union definitions In some rare cases, the DAG parse subprocess (`DagFileProcessorProcess`) could raise a `PydanticUserError` related to unresolved ForwardRefs in the discriminated union ToDagProcessor. This was caused by `TypeAdapter[ToDagProcessor]` being instantiated before `DagFileParseRequest` was defined. While this is not always reproducible, it can happen in forked subprocesses depending on import order. This change moves the union definitions (`ToDagProcessor`, `ToManager`) after the relevant Pydantic models are declared, ensuring all references are fully resolved at definition time. closes https://github.com/apache/airflow/issues/50530 --- .../src/airflow/dag_processing/processor.py | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 01ac41f7a326a..5f69082c758aa 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -54,13 +54,47 @@ from airflow.sdk.definitions.context import Context from airflow.typing_compat import Self + +class DagFileParseRequest(BaseModel): + """ + Request for DAG File Parsing. + + This is the request that the manager will send to the DAG parser with the dag file and + any other necessary metadata. + """ + + file: str + + bundle_path: Path + """Passing bundle path around lets us figure out relative file path.""" + + requests_fd: int + callback_requests: list[CallbackRequest] = Field(default_factory=list) + type: Literal["DagFileParseRequest"] = "DagFileParseRequest" + + +class DagFileParsingResult(BaseModel): + """ + Result of DAG File Parsing. + + This is the result of a successful DAG parse, in this class, we gather all serialized DAGs, + import errors and warnings to send back to the scheduler to store in the DB. + """ + + fileloc: str + serialized_dags: list[LazyDeserializedDAG] + warnings: list | None = None + import_errors: dict[str, str] | None = None + type: Literal["DagFileParsingResult"] = "DagFileParsingResult" + + ToManager = Annotated[ - Union["DagFileParsingResult", GetConnection, GetVariable, PutVariable, DeleteVariable], + Union[DagFileParsingResult, GetConnection, GetVariable, PutVariable, DeleteVariable], Field(discriminator="type"), ] ToDagProcessor = Annotated[ - Union["DagFileParseRequest", ConnectionResult, VariableResult, ErrorResponse, OKResponse], + Union[DagFileParseRequest, ConnectionResult, VariableResult, ErrorResponse, OKResponse], Field(discriminator="type"), ] @@ -182,39 +216,6 @@ def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: Fil Stats.incr("dag.callback_exceptions", tags={"dag_id": request.dag_id}) -class DagFileParseRequest(BaseModel): - """ - Request for DAG File Parsing. - - This is the request that the manager will send to the DAG parser with the dag file and - any other necessary metadata. - """ - - file: str - - bundle_path: Path - """Passing bundle path around lets us figure out relative file path.""" - - requests_fd: int - callback_requests: list[CallbackRequest] = Field(default_factory=list) - type: Literal["DagFileParseRequest"] = "DagFileParseRequest" - - -class DagFileParsingResult(BaseModel): - """ - Result of DAG File Parsing. - - This is the result of a successful DAG parse, in this class, we gather all serialized DAGs, - import errors and warnings to send back to the scheduler to store in the DB. - """ - - fileloc: str - serialized_dags: list[LazyDeserializedDAG] - warnings: list | None = None - import_errors: dict[str, str] | None = None - type: Literal["DagFileParsingResult"] = "DagFileParsingResult" - - def in_process_api_server() -> InProcessExecutionAPI: from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI