diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 01ac41f7a326a..5f69082c758aa 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -54,13 +54,47 @@ from airflow.sdk.definitions.context import Context from airflow.typing_compat import Self + +class DagFileParseRequest(BaseModel): + """ + Request for DAG File Parsing. + + This is the request that the manager will send to the DAG parser with the dag file and + any other necessary metadata. + """ + + file: str + + bundle_path: Path + """Passing bundle path around lets us figure out relative file path.""" + + requests_fd: int + callback_requests: list[CallbackRequest] = Field(default_factory=list) + type: Literal["DagFileParseRequest"] = "DagFileParseRequest" + + +class DagFileParsingResult(BaseModel): + """ + Result of DAG File Parsing. + + This is the result of a successful DAG parse, in this class, we gather all serialized DAGs, + import errors and warnings to send back to the scheduler to store in the DB. + """ + + fileloc: str + serialized_dags: list[LazyDeserializedDAG] + warnings: list | None = None + import_errors: dict[str, str] | None = None + type: Literal["DagFileParsingResult"] = "DagFileParsingResult" + + ToManager = Annotated[ - Union["DagFileParsingResult", GetConnection, GetVariable, PutVariable, DeleteVariable], + Union[DagFileParsingResult, GetConnection, GetVariable, PutVariable, DeleteVariable], Field(discriminator="type"), ] ToDagProcessor = Annotated[ - Union["DagFileParseRequest", ConnectionResult, VariableResult, ErrorResponse, OKResponse], + Union[DagFileParseRequest, ConnectionResult, VariableResult, ErrorResponse, OKResponse], Field(discriminator="type"), ] @@ -182,39 +216,6 @@ def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: Fil Stats.incr("dag.callback_exceptions", tags={"dag_id": request.dag_id}) -class DagFileParseRequest(BaseModel): - """ - Request for DAG File Parsing. - - This is the request that the manager will send to the DAG parser with the dag file and - any other necessary metadata. - """ - - file: str - - bundle_path: Path - """Passing bundle path around lets us figure out relative file path.""" - - requests_fd: int - callback_requests: list[CallbackRequest] = Field(default_factory=list) - type: Literal["DagFileParseRequest"] = "DagFileParseRequest" - - -class DagFileParsingResult(BaseModel): - """ - Result of DAG File Parsing. - - This is the result of a successful DAG parse, in this class, we gather all serialized DAGs, - import errors and warnings to send back to the scheduler to store in the DB. - """ - - fileloc: str - serialized_dags: list[LazyDeserializedDAG] - warnings: list | None = None - import_errors: dict[str, str] | None = None - type: Literal["DagFileParsingResult"] = "DagFileParsingResult" - - def in_process_api_server() -> InProcessExecutionAPI: from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI