Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,30 @@ def header_accept_json_or_text_depends(


HeaderAcceptJsonOrText = Annotated[Mimetype, Depends(header_accept_json_or_text_depends)]


def header_accept_json_or_ndjson_depends(
accept: Annotated[
str,
Header(
json_schema_extra={
"type": "string",
"enum": [Mimetype.JSON, Mimetype.NDJSON, Mimetype.ANY],
}
),
] = Mimetype.ANY,
) -> Mimetype:
if accept.startswith(Mimetype.ANY):
return Mimetype.ANY
if accept.startswith(Mimetype.JSON):
return Mimetype.JSON
if accept.startswith(Mimetype.NDJSON) or accept.startswith(Mimetype.ANY):
return Mimetype.NDJSON

raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
detail="Only application/json or application/x-ndjson is supported",
)


HeaderAcceptJsonOrNdjson = Annotated[Mimetype, Depends(header_accept_json_or_ndjson_depends)]
1 change: 1 addition & 0 deletions airflow-core/src/airflow/api_fastapi/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Mimetype(str, Enum):

TEXT = "text/plain"
JSON = "application/json"
NDJSON = "application/x-ndjson"
ANY = "*/*"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6489,7 +6489,7 @@ paths:
type: string
enum:
- application/json
- text/plain
- application/x-ndjson
- '*/*'
default: '*/*'
title: Accept
Expand All @@ -6500,10 +6500,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/TaskInstancesLogResponse'
text/plain:
application/x-ndjson:
schema:
type: string
example: 'content
example: '{"content": "content"}

{"content": "content"}

'
'401':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.api_fastapi.common.dagbag import DagBagDep
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrText
from airflow.api_fastapi.common.headers import HeaderAcceptJsonOrNdjson
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.types import Mimetype
from airflow.api_fastapi.core_api.datamodels.log import ExternalLogUrlResponse, TaskInstancesLogResponse
Expand All @@ -43,13 +43,14 @@
tags=["Task Instance"], prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances"
)

text_example_response_for_get_log = {
Mimetype.TEXT: {
ndjson_example_response_for_get_log = {
Mimetype.NDJSON: {
"schema": {
"type": "string",
"example": textwrap.dedent(
"""\
content
{"content": "content"}
{"content": "content"}
"""
),
}
Expand All @@ -63,7 +64,7 @@
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status.HTTP_200_OK: {
"description": "Successful Response",
"content": text_example_response_for_get_log,
"content": ndjson_example_response_for_get_log,
},
},
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.TASK_LOGS))],
Expand All @@ -75,7 +76,7 @@ def get_log(
dag_run_id: str,
task_id: str,
try_number: PositiveInt,
accept: HeaderAcceptJsonOrText,
accept: HeaderAcceptJsonOrNdjson,
request: Request,
dag_bag: DagBagDep,
session: SessionDep,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ export const UseTaskInstanceServiceGetLogKeyFn = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ export const ensureUseTaskInstanceServiceGetLogData = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ export const prefetchUseTaskInstanceServiceGetLog = (
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1993,7 +1993,7 @@ export const useTaskInstanceServiceGetLog = <
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ export const useTaskInstanceServiceGetLogSuspense = <
token,
tryNumber,
}: {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "*/*" | "application/x-ndjson";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,7 @@ export type PatchTaskInstanceDryRunData = {
export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse;

export type GetLogData = {
accept?: "application/json" | "text/plain" | "*/*";
accept?: "application/json" | "application/x-ndjson" | "*/*";
dagId: string;
dagRunId: string;
fullContent?: boolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@ def test_should_respond_200_json(self, try_number):
),
],
)
def test_should_respond_200_text_plain(
self, request_url, expected_filename, extra_query_string, try_number
):
def test_should_respond_200_ndjson(self, request_url, expected_filename, extra_query_string, try_number):
expected_filename = expected_filename.replace("LOG_DIR", str(self.log_dir))

key = self.app.state.secret_key
Expand All @@ -223,7 +221,7 @@ def test_should_respond_200_text_plain(
response = self.client.get(
request_url,
params={"token": token, **extra_query_string},
headers={"Accept": "text/plain"},
headers={"Accept": "application/x-ndjson"},
)
assert response.status_code == 200

Expand Down Expand Up @@ -279,7 +277,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu
response = self.client.get(
request_url,
params={"token": token, **extra_query_string},
headers={"Accept": "text/plain"},
headers={"Accept": "application/x-ndjson"},
)

assert response.status_code == 200
Expand Down Expand Up @@ -314,7 +312,7 @@ def test_get_logs_with_metadata_as_download_large_file(self, try_number):
response = self.client.get(
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/"
f"taskInstances/{self.TASK_ID}/logs/{try_number}?full_content=True",
headers={"Accept": "text/plain"},
headers={"Accept": "application/x-ndjson"},
)

assert "1st line" in response.content.decode("utf-8")
Expand Down Expand Up @@ -382,7 +380,7 @@ def test_should_raise_404_when_missing_map_index_param_for_mapped_task(self):
response = self.client.get(
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.MAPPED_TASK_ID}/logs/1",
params={"token": token},
headers={"Accept": "text/plain"},
headers={"Accept": "application/x-ndjson"},
)
assert response.status_code == 404
assert response.json()["detail"] == "TaskInstance not found"
Expand All @@ -395,7 +393,7 @@ def test_should_raise_404_when_filtering_on_map_index_for_unmapped_task(self):
response = self.client.get(
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/logs/1",
params={"token": token, "map_index": 0},
headers={"Accept": "text/plain"},
headers={"Accept": "application/x-ndjson"},
)
assert response.status_code == 404
assert response.json()["detail"] == "TaskInstance not found"
Expand Down