diff --git a/airflow-core/src/airflow/api_fastapi/common/headers.py b/airflow-core/src/airflow/api_fastapi/common/headers.py index 7d1a0fa69613b..13567e32bdc7d 100644 --- a/airflow-core/src/airflow/api_fastapi/common/headers.py +++ b/airflow-core/src/airflow/api_fastapi/common/headers.py @@ -47,3 +47,30 @@ def header_accept_json_or_text_depends( HeaderAcceptJsonOrText = Annotated[Mimetype, Depends(header_accept_json_or_text_depends)] + + +def header_accept_json_or_ndjson_depends( + accept: Annotated[ + str, + Header( + json_schema_extra={ + "type": "string", + "enum": [Mimetype.JSON, Mimetype.NDJSON, Mimetype.ANY], + } + ), + ] = Mimetype.ANY, +) -> Mimetype: + if accept.startswith(Mimetype.ANY): + return Mimetype.ANY + if accept.startswith(Mimetype.JSON): + return Mimetype.JSON + if accept.startswith(Mimetype.NDJSON) or accept.startswith(Mimetype.ANY): + return Mimetype.NDJSON + + raise HTTPException( + status_code=status.HTTP_406_NOT_ACCEPTABLE, + detail="Only application/json or application/x-ndjson is supported", + ) + + +HeaderAcceptJsonOrNdjson = Annotated[Mimetype, Depends(header_accept_json_or_ndjson_depends)] diff --git a/airflow-core/src/airflow/api_fastapi/common/types.py b/airflow-core/src/airflow/api_fastapi/common/types.py index 0b431dfdef466..18e5dc7387d62 100644 --- a/airflow-core/src/airflow/api_fastapi/common/types.py +++ b/airflow-core/src/airflow/api_fastapi/common/types.py @@ -72,6 +72,7 @@ class Mimetype(str, Enum): TEXT = "text/plain" JSON = "application/json" + NDJSON = "application/x-ndjson" ANY = "*/*" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index 1cb0a8bfdbab1..399820b344cf4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -6489,7 +6489,7 @@ paths: type: string enum: - application/json - - text/plain + - application/x-ndjson - '*/*' default: '*/*' title: Accept @@ -6500,10 +6500,12 @@ paths: application/json: schema: $ref: '#/components/schemas/TaskInstancesLogResponse' - text/plain: + application/x-ndjson: schema: type: string - example: 'content + example: '{"content": "content"} + + {"content": "content"} ' '401': diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py index 091e6f3cde53a..282e7b8f945a3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py @@ -28,7 +28,7 @@ from airflow.api_fastapi.common.dagbag import DagBagDep from airflow.api_fastapi.common.db.common import SessionDep -from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText +from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.common.types import Mimetype from airflow.api_fastapi.core_api.datamodels.log import ExternalLogUrlResponse, TaskInstancesLogResponse @@ -43,13 +43,14 @@ tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances" ) -text_example_response_for_get_log = { - Mimetype.TEXT: { +ndjson_example_response_for_get_log = { + Mimetype.NDJSON: { "schema": { "type": "string", "example": textwrap.dedent( """\ - content + {"content": "content"} + {"content": "content"} """ ), } @@ -63,7 +64,7 @@ **create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), status.HTTP_200_OK: { "description": "Successful Response", - "content": text_example_response_for_get_log, + "content": ndjson_example_response_for_get_log, }, }, dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))], @@ -75,7 +76,7 @@ def get_log( dag_run_id: str, task_id: str, try_number: PositiveInt, - accept: HeaderAcceptJsonOrText, + accept: HeaderAcceptJsonOrNdjson, request: Request, dag_bag: DagBagDep, session: SessionDep, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index c0e71a471280b..bafbac9fb23eb 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1214,7 +1214,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 5ef4350e67b8f..b158f1fb0b985 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1665,7 +1665,7 @@ export const ensureUseTaskInstanceServiceGetLogData = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 588de763a15c8..8319f1a197d72 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1665,7 +1665,7 @@ export const prefetchUseTaskInstanceServiceGetLog = ( token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 9fe536cd513d7..d2700b8266e02 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1993,7 +1993,7 @@ export const useTaskInstanceServiceGetLog = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index bb62d3ba359f5..939f4293201de 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1970,7 +1970,7 @@ export const useTaskInstanceServiceGetLogSuspense = < token, tryNumber, }: { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "*/*" | "application/x-ndjson"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 294f9f0b02d41..0730d23002046 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2461,7 +2461,7 @@ export type PatchTaskInstanceDryRunData = { export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse; export type GetLogData = { - accept?: "application/json" | "text/plain" | "*/*"; + accept?: "application/json" | "application/x-ndjson" | "*/*"; dagId: string; dagRunId: string; fullContent?: boolean; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py index 9ee0abd14ad2e..63ed4f9231a21 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_log.py @@ -211,9 +211,7 @@ def test_should_respond_200_json(self, try_number): ), ], ) - def test_should_respond_200_text_plain( - self, request_url, expected_filename, extra_query_string, try_number - ): + def test_should_respond_200_ndjson(self, request_url, expected_filename, extra_query_string, try_number): expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir)) key = self.app.state.secret_key @@ -223,7 +221,7 @@ def test_should_respond_200_text_plain( response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -279,7 +277,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu response = self.client.get( request_url, params={"token": token, **extra_query_string}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 200 @@ -314,7 +312,7 @@ def test_get_logs_with_metadata_as_download_large_file(self, try_number): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/" f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True", - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert "1st line" in response.content.decode("utf-8") @@ -382,7 +380,7 @@ def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1", params={"token": token}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found" @@ -395,7 +393,7 @@ def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self): response = self.client.get( f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1", params={"token": token, "map_index": 0}, - headers={"Accept": "text/plain"}, + headers={"Accept": "application/x-ndjson"}, ) assert response.status_code == 404 assert response.json()["detail"] == "TaskInstance not found"