Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
16aeb66
Render structured logs in the new UI rather than showing raw JSON
ashb Feb 17, 2025
9da1231
Fix typescript useLogs
bbovenzi Feb 17, 2025
2303765
style: group metadata pop
Lee-W Feb 19, 2025
b79f726
style: reduce if-else and directly use bool for assigning metadata["d…
Lee-W Feb 19, 2025
0e65827
style: improve type annotation
Lee-W Feb 19, 2025
67976c5
test(test_log_reader): fix existing unit tests
Lee-W Feb 19, 2025
c511c12
test(api_fastapi): fix existing test_log unit tests
Lee-W Feb 20, 2025
eb11c6c
feat(api_connexion/log): update v1 api to the latest log format
Lee-W Feb 20, 2025
c3d29fa
test(providers/elasticsearch): fix part of the existing unit test
Lee-W Feb 20, 2025
3583276
test(providers/amazon): fix TestCloudwatchTaskHandler::test_read
Lee-W Feb 21, 2025
43018cc
feat(providers/amazon): add airflow 3 compat logic
Lee-W Feb 21, 2025
666564f
feat(providers/google): add airflow 3 task handler log handling logic
Lee-W Feb 21, 2025
4d1bd54
feat(providers/elasticsearch): add airflow 3 task handler log handlin…
Lee-W Feb 21, 2025
b6bec70
feat(providers/microsoft): add airflow 3 task handler log handling logic
Lee-W Feb 21, 2025
79159a6
feat(providers/redis): add airflow 3 task handler log handling logic
Lee-W Feb 21, 2025
1335623
feat(providers/opensearch): add airflow 3 task handler log handling l…
Lee-W Feb 21, 2025
d490b9a
test: ignore unneeded tests
Lee-W Feb 21, 2025
9a5aafa
test(log_handlers): fix pendulum.tz version imcompat
Lee-W Feb 21, 2025
d3e2dff
feat: force StructuredLogMessage check when initialing
Lee-W Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ def get_log(
if metadata.get("download_logs") and metadata["download_logs"]:
full_content = True

if full_content:
metadata["download_logs"] = True
else:
metadata["download_logs"] = False
metadata["download_logs"] = full_content

task_log_reader = TaskLogReader()

Expand Down Expand Up @@ -116,11 +113,18 @@ def get_log(
logs: Any
if return_type == "application/json" or return_type is None: # default
logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
logs = logs[0] if task_try_number is not None else logs
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment]
return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs))
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

return Response(logs, headers={"Content-Type": return_type})
encoded_token = None
if not metadata.get("end_of_log", False):
encoded_token = URLSafeSerializer(key).dumps(metadata)
return logs_schema.dump(LogResponseObject(continuation_token=encoded_token, content=logs))

# text/plain, or something else we don't understand. Return raw log content

# We need to exhaust the iterator before we can generate the continuation token.
# We could improve this by making it a streaming/async response, and by then setting the header using
# HTTP Trailers
logs = "".join(task_log_reader.read_log_stream(ti, task_try_number, metadata))
headers = None
if not metadata.get("end_of_log", False):
headers = {"Airflow-Continuation-Token": URLSafeSerializer(key).dumps(metadata)}
return Response(mimetype="application/x-ndjson", response=logs, headers=headers)
5 changes: 4 additions & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2145,8 +2145,11 @@ paths:
properties:
continuation_token:
type: string
nullable: true
content:
type: string
type: array
items:
type: string
text/plain:
schema:
type: string
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/schemas/log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
class LogsSchema(Schema):
"""Schema for logs."""

content = fields.Str(dump_only=True)
continuation_token = fields.Str(dump_only=True)
content = fields.List(fields.Str(dump_only=True))
continuation_token = fields.Str(dump_only=True, allow_none=True)


class LogResponseObject(NamedTuple):
"""Log Response Object."""

content: str
content: list[str]
continuation_token: str | None


Expand Down
22 changes: 20 additions & 2 deletions airflow/api_fastapi/core_api/datamodels/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,29 @@
# under the License.
from __future__ import annotations

from pydantic import BaseModel
from datetime import datetime
from typing import Annotated

from pydantic import BaseModel, ConfigDict, WithJsonSchema


class StructuredLogMessage(BaseModel):
"""An individual log message."""

# Not every message has a timestamp.
timestamp: Annotated[
datetime | None,
# Schema level, say this is always a datetime if it exists
WithJsonSchema({"type": "string", "format": "date-time"}),
] = None
event: str

model_config = ConfigDict(extra="allow")


class TaskInstancesLogResponse(BaseModel):
"""Log serializer for responses."""

content: str
content: list[StructuredLogMessage] | list[str]
"""Either a list of parsed events, or a list of lines on parse error"""
continuation_token: str | None
23 changes: 22 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10106,6 +10106,21 @@ components:
- arrange
title: StructureDataResponse
description: Structure Data serializer for responses.
StructuredLogMessage:
properties:
timestamp:
type: string
format: date-time
title: Timestamp
event:
type: string
title: Event
additionalProperties: true
type: object
required:
- event
title: StructuredLogMessage
description: An individual log message.
TaskCollectionResponse:
properties:
tasks:
Expand Down Expand Up @@ -10697,7 +10712,13 @@ components:
TaskInstancesLogResponse:
properties:
content:
type: string
anyOf:
- items:
$ref: '#/components/schemas/StructuredLogMessage'
type: array
- items:
type: string
type: array
title: Content
continuation_token:
anyOf:
Expand Down
31 changes: 19 additions & 12 deletions airflow/api_fastapi/core_api/routes/public/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import textwrap
from typing import Any

from fastapi import HTTPException, Request, Response, status
from itsdangerous import BadSignature, URLSafeSerializer
Expand Down Expand Up @@ -65,6 +64,7 @@
},
},
response_model=TaskInstancesLogResponse,
response_model_exclude_unset=True,
)
def get_log(
dag_id: str,
Expand Down Expand Up @@ -92,10 +92,7 @@ def get_log(
if metadata.get("download_logs") and metadata["download_logs"]:
full_content = True

if full_content:
metadata["download_logs"] = True
else:
metadata["download_logs"] = False
metadata["download_logs"] = full_content

task_log_reader = TaskLogReader()

Expand Down Expand Up @@ -135,12 +132,22 @@ def get_log(
except TaskNotFound:
pass

logs: Any
if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment]
return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump()
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, try_number, metadata)
return Response(media_type=accept, content="".join(list(logs)))
encoded_token = None
if not metadata.get("end_of_log", False):
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs)
else:
# text/plain, or something else we don't understand. Return raw log content

# We need to exhaust the iterator before we can generate the continuation token.
# We could improve this by making it a streaming/async response, and by then setting the header using
# HTTP Trailers
logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
headers = None
if not metadata.get("end_of_log", False):
headers = {
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
}
return Response(media_type="application/x-ndjson", content=logs, headers=headers)
34 changes: 33 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4645,6 +4645,25 @@ export const $StructureDataResponse = {
description: "Structure Data serializer for responses.",
} as const;

export const $StructuredLogMessage = {
properties: {
timestamp: {
type: "string",
format: "date-time",
title: "Timestamp",
},
event: {
type: "string",
title: "Event",
},
},
additionalProperties: true,
type: "object",
required: ["event"],
title: "StructuredLogMessage",
description: "An individual log message.",
} as const;

export const $TaskCollectionResponse = {
properties: {
tasks: {
Expand Down Expand Up @@ -5628,7 +5647,20 @@ export const $TaskInstancesBatchBody = {
export const $TaskInstancesLogResponse = {
properties: {
content: {
type: "string",
anyOf: [
{
items: {
$ref: "#/components/schemas/StructuredLogMessage",
},
type: "array",
},
{
items: {
type: "string",
},
type: "array",
},
],
title: "Content",
},
continuation_token: {
Expand Down
11 changes: 10 additions & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,15 @@ export type StructureDataResponse = {

export type arrange = "BT" | "LR" | "RL" | "TB";

/**
* An individual log message.
*/
export type StructuredLogMessage = {
timestamp?: string;
event: string;
[key: string]: unknown | string;
};

/**
* Task collection serializer for responses.
*/
Expand Down Expand Up @@ -1393,7 +1402,7 @@ export type TaskInstancesBatchBody = {
* Log serializer for responses.
*/
export type TaskInstancesLogResponse = {
content: string;
content: Array<StructuredLogMessage> | Array<string>;
continuation_token: string | null;
};

Expand Down
59 changes: 46 additions & 13 deletions airflow/ui/src/queries/useLogs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import dayjs from "dayjs";

import { useTaskInstanceServiceGetLog } from "openapi/queries";
import type { TaskInstanceResponse } from "openapi/requests/types.gen";
import type {
StructuredLogMessage,
TaskInstanceResponse,
TaskInstancesLogResponse,
} from "openapi/requests/types.gen";
import { isStatePending, useAutoRefresh } from "src/utils";

type Props = {
Expand All @@ -29,29 +33,58 @@ type Props = {
};

type ParseLogsProps = {
data: string | undefined;
data: TaskInstancesLogResponse["content"];
};

// TODO: add support for log groups, colors, formats, filters
const parseLogs = ({ data }: ParseLogsProps) => {
if (data === undefined) {
return {};
const renderStructuredLog = (logMessage: string | StructuredLogMessage, index: number) => {
if (typeof logMessage === "string") {
return <p key={index}>{logMessage}</p>;
}

const { event, level = undefined, timestamp, ...structured } = logMessage;
const elements = [];

if (Boolean(timestamp)) {
elements.push("[", <time dateTime={timestamp}>{timestamp}</time>, "] ");
}

if (typeof level === "string") {
elements.push(<span className={`log-level ${level}`}>{level.toUpperCase()}</span>, " - ");
}

elements.push(<span className="event">{event}</span>);

for (const key in structured) {
if (Object.hasOwn(structured, key)) {
elements.push(
" ",
<span className={`log-key ${key}`}>
{key}={JSON.stringify(structured[key])}
</span>,
);
}
}
let lines;

return <p key={index}>{elements}</p>;
};

// TODO: add support for log groups, colors, formats, filters
const parseLogs = ({ data }: ParseLogsProps) => {
let warning;
let parsedLines;

try {
lines = data.split("\\n");
} catch {
parsedLines = data.map((datum, index) => renderStructuredLog(datum, index));
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "An error occurred.";

// eslint-disable-next-line no-console
console.warn(`Error parsing logs: ${errorMessage}`);
warning = "Unable to show logs. There was an error parsing logs.";

return { data, warning };
}

// eslint-disable-next-line react/no-array-index-key
const parsedLines = lines.map((line, index) => <p key={index}>{line}</p>);

return {
fileSources: [],
parsedLogs: parsedLines,
Expand Down Expand Up @@ -82,7 +115,7 @@ export const useLogs = ({ dagId, taskInstance, tryNumber = 1 }: Props) => {
);

const parsedData = parseLogs({
data: data?.content,
data: data?.content ?? [],
});

return { data: parsedData, ...rest };
Expand Down
Loading