diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py index 37138dc12be8e..29e96a20228ca 100644 --- a/airflow/api_connexion/endpoints/log_endpoint.py +++ b/airflow/api_connexion/endpoints/log_endpoint.py @@ -67,10 +67,7 @@ def get_log( if metadata.get("download_logs") and metadata["download_logs"]: full_content = True - if full_content: - metadata["download_logs"] = True - else: - metadata["download_logs"] = False + metadata["download_logs"] = full_content task_log_reader = TaskLogReader() @@ -116,11 +113,18 @@ def get_log( logs: Any if return_type == "application/json" or return_type is None: # default logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) - logs = logs[0] if task_try_number is not None else logs - # we must have token here, so we can safely ignore it - token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment] - return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs)) - # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) - - return Response(logs, headers={"Content-Type": return_type}) + encoded_token = None + if not metadata.get("end_of_log", False): + encoded_token = URLSafeSerializer(key).dumps(metadata) + return logs_schema.dump(LogResponseObject(continuation_token=encoded_token, content=logs)) + + # text/plain, or something else we don't understand. Return raw log content + + # We need to exhaust the iterator before we can generate the continuation token. + # We could improve this by making it a streaming/async response, and by then setting the header using + # HTTP Trailers + logs = "".join(task_log_reader.read_log_stream(ti, task_try_number, metadata)) + headers = None + if not metadata.get("end_of_log", False): + headers = {"Airflow-Continuation-Token": URLSafeSerializer(key).dumps(metadata)} + return Response(mimetype="application/x-ndjson", response=logs, headers=headers) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index c63fb72f2a9a0..8b4d780d68a37 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2145,8 +2145,11 @@ paths: properties: continuation_token: type: string + nullable: true content: - type: string + type: array + items: + type: string text/plain: schema: type: string diff --git a/airflow/api_connexion/schemas/log_schema.py b/airflow/api_connexion/schemas/log_schema.py index 6651057218e68..8416b03f145bc 100644 --- a/airflow/api_connexion/schemas/log_schema.py +++ b/airflow/api_connexion/schemas/log_schema.py @@ -24,14 +24,14 @@ class LogsSchema(Schema): """Schema for logs.""" - content = fields.Str(dump_only=True) - continuation_token = fields.Str(dump_only=True) + content = fields.List(fields.Str(dump_only=True)) + continuation_token = fields.Str(dump_only=True, allow_none=True) class LogResponseObject(NamedTuple): """Log Response Object.""" - content: str + content: list[str] continuation_token: str | None diff --git a/airflow/api_fastapi/core_api/datamodels/log.py b/airflow/api_fastapi/core_api/datamodels/log.py index 79b341d6e3808..e67264ae3c315 100644 --- a/airflow/api_fastapi/core_api/datamodels/log.py +++ b/airflow/api_fastapi/core_api/datamodels/log.py @@ -16,11 +16,29 @@ # under the License. from __future__ import annotations -from pydantic import BaseModel +from datetime import datetime +from typing import Annotated + +from pydantic import BaseModel, ConfigDict, WithJsonSchema + + +class StructuredLogMessage(BaseModel): + """An individual log message.""" + + # Not every message has a timestamp. + timestamp: Annotated[ + datetime | None, + # Schema level, say this is always a datetime if it exists + WithJsonSchema({"type": "string", "format": "date-time"}), + ] = None + event: str + + model_config = ConfigDict(extra="allow") class TaskInstancesLogResponse(BaseModel): """Log serializer for responses.""" - content: str + content: list[StructuredLogMessage] | list[str] + """Either a list of parsed events, or a list of lines on parse error""" continuation_token: str | None diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index a71113cb13bd4..d9d38cd39d2c2 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -10106,6 +10106,21 @@ components: - arrange title: StructureDataResponse description: Structure Data serializer for responses. + StructuredLogMessage: + properties: + timestamp: + type: string + format: date-time + title: Timestamp + event: + type: string + title: Event + additionalProperties: true + type: object + required: + - event + title: StructuredLogMessage + description: An individual log message. TaskCollectionResponse: properties: tasks: @@ -10697,7 +10712,13 @@ components: TaskInstancesLogResponse: properties: content: - type: string + anyOf: + - items: + $ref: '#/components/schemas/StructuredLogMessage' + type: array + - items: + type: string + type: array title: Content continuation_token: anyOf: diff --git a/airflow/api_fastapi/core_api/routes/public/log.py b/airflow/api_fastapi/core_api/routes/public/log.py index a6bc24f638067..1cb6bd9d66ef7 100644 --- a/airflow/api_fastapi/core_api/routes/public/log.py +++ b/airflow/api_fastapi/core_api/routes/public/log.py @@ -18,7 +18,6 @@ from __future__ import annotations import textwrap -from typing import Any from fastapi import HTTPException, Request, Response, status from itsdangerous import BadSignature, URLSafeSerializer @@ -65,6 +64,7 @@ }, }, response_model=TaskInstancesLogResponse, + response_model_exclude_unset=True, ) def get_log( dag_id: str, @@ -92,10 +92,7 @@ def get_log( if metadata.get("download_logs") and metadata["download_logs"]: full_content = True - if full_content: - metadata["download_logs"] = True - else: - metadata["download_logs"] = False + metadata["download_logs"] = full_content task_log_reader = TaskLogReader() @@ -135,12 +132,22 @@ def get_log( except TaskNotFound: pass - logs: Any if accept == Mimetype.JSON or accept == Mimetype.ANY: # default logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) - # we must have token here, so we can safely ignore it - token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) # type: ignore[assignment] - return TaskInstancesLogResponse(continuation_token=token, content=str(logs[0])).model_dump() - # text/plain. Stream - logs = task_log_reader.read_log_stream(ti, try_number, metadata) - return Response(media_type=accept, content="".join(list(logs))) + encoded_token = None + if not metadata.get("end_of_log", False): + encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata) + return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs) + else: + # text/plain, or something else we don't understand. Return raw log content + + # We need to exhaust the iterator before we can generate the continuation token. + # We could improve this by making it a streaming/async response, and by then setting the header using + # HTTP Trailers + logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata)) + headers = None + if not metadata.get("end_of_log", False): + headers = { + "Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata) + } + return Response(media_type="application/x-ndjson", content=logs, headers=headers) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 1c0a537d38120..89bd00998f730 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4645,6 +4645,25 @@ export const $StructureDataResponse = { description: "Structure Data serializer for responses.", } as const; +export const $StructuredLogMessage = { + properties: { + timestamp: { + type: "string", + format: "date-time", + title: "Timestamp", + }, + event: { + type: "string", + title: "Event", + }, + }, + additionalProperties: true, + type: "object", + required: ["event"], + title: "StructuredLogMessage", + description: "An individual log message.", +} as const; + export const $TaskCollectionResponse = { properties: { tasks: { @@ -5628,7 +5647,20 @@ export const $TaskInstancesBatchBody = { export const $TaskInstancesLogResponse = { properties: { content: { - type: "string", + anyOf: [ + { + items: { + $ref: "#/components/schemas/StructuredLogMessage", + }, + type: "array", + }, + { + items: { + type: "string", + }, + type: "array", + }, + ], title: "Content", }, continuation_token: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 32982e08b0195..2ca718644c5fd 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1216,6 +1216,15 @@ export type StructureDataResponse = { export type arrange = "BT" | "LR" | "RL" | "TB"; +/** + * An individual log message. + */ +export type StructuredLogMessage = { + timestamp?: string; + event: string; + [key: string]: unknown | string; +}; + /** * Task collection serializer for responses. */ @@ -1393,7 +1402,7 @@ export type TaskInstancesBatchBody = { * Log serializer for responses. */ export type TaskInstancesLogResponse = { - content: string; + content: Array | Array; continuation_token: string | null; }; diff --git a/airflow/ui/src/queries/useLogs.tsx b/airflow/ui/src/queries/useLogs.tsx index c165e13c0b893..799a01fe4a7e7 100644 --- a/airflow/ui/src/queries/useLogs.tsx +++ b/airflow/ui/src/queries/useLogs.tsx @@ -19,7 +19,11 @@ import dayjs from "dayjs"; import { useTaskInstanceServiceGetLog } from "openapi/queries"; -import type { TaskInstanceResponse } from "openapi/requests/types.gen"; +import type { + StructuredLogMessage, + TaskInstanceResponse, + TaskInstancesLogResponse, +} from "openapi/requests/types.gen"; import { isStatePending, useAutoRefresh } from "src/utils"; type Props = { @@ -29,29 +33,58 @@ type Props = { }; type ParseLogsProps = { - data: string | undefined; + data: TaskInstancesLogResponse["content"]; }; -// TODO: add support for log groups, colors, formats, filters -const parseLogs = ({ data }: ParseLogsProps) => { - if (data === undefined) { - return {}; +const renderStructuredLog = (logMessage: string | StructuredLogMessage, index: number) => { + if (typeof logMessage === "string") { + return

{logMessage}

; + } + + const { event, level = undefined, timestamp, ...structured } = logMessage; + const elements = []; + + if (Boolean(timestamp)) { + elements.push("[", , "] "); + } + + if (typeof level === "string") { + elements.push({level.toUpperCase()}, " - "); + } + + elements.push({event}); + + for (const key in structured) { + if (Object.hasOwn(structured, key)) { + elements.push( + " ", + + {key}={JSON.stringify(structured[key])} + , + ); + } } - let lines; + return

{elements}

; +}; + +// TODO: add support for log groups, colors, formats, filters +const parseLogs = ({ data }: ParseLogsProps) => { let warning; + let parsedLines; try { - lines = data.split("\\n"); - } catch { + parsedLines = data.map((datum, index) => renderStructuredLog(datum, index)); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "An error occurred."; + + // eslint-disable-next-line no-console + console.warn(`Error parsing logs: ${errorMessage}`); warning = "Unable to show logs. There was an error parsing logs."; return { data, warning }; } - // eslint-disable-next-line react/no-array-index-key - const parsedLines = lines.map((line, index) =>

{line}

); - return { fileSources: [], parsedLogs: parsedLines, @@ -82,7 +115,7 @@ export const useLogs = ({ dagId, taskInstance, tryNumber = 1 }: Props) => { ); const parsedData = parseLogs({ - data: data?.content, + data: data?.content ?? [], }); return { data: parsedData, ...rest }; diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6f4086843db8f..c5798c1e49599 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -19,16 +19,19 @@ from __future__ import annotations +import itertools import logging import os from collections.abc import Iterable from contextlib import suppress +from datetime import datetime from enum import Enum from pathlib import Path from typing import TYPE_CHECKING, Any, Callable from urllib.parse import urljoin import pendulum +from pydantic import BaseModel, ConfigDict, ValidationError from airflow.configuration import conf from airflow.exceptions import AirflowException @@ -47,6 +50,18 @@ logger = logging.getLogger(__name__) +class StructuredLogMessage(BaseModel): + """An individual log message.""" + + timestamp: datetime | None = None + event: str + + # We don't need to cache string when parsing in to this, as almost every line will have a different + # values; `extra=allow` means we'll create extra properties as needed. Only timestamp and event are + # required, everything else is up to what ever is producing the logs + model_config = ConfigDict(cache_strings=False, extra="allow") + + class LogType(str, Enum): """ Type of service from which we retrieve logs. @@ -107,30 +122,36 @@ def _parse_timestamp(line: str): return pendulum.parse(timestamp_str.strip("[]")) -def _parse_timestamps_in_log_file(lines: Iterable[str]): +def _parse_log_lines(lines: Iterable[str]) -> Iterable[tuple[datetime | None, int, StructuredLogMessage]]: + from airflow.utils.timezone import coerce_datetime + timestamp = None next_timestamp = None for idx, line in enumerate(lines): if line: - with suppress(Exception): - # next_timestamp unchanged if line can't be parsed - next_timestamp = _parse_timestamp(line) - if next_timestamp: - timestamp = next_timestamp - yield timestamp, idx, line - - -def _interleave_logs(*logs): - records = [] - for log in logs: - records.extend(_parse_timestamps_in_log_file(log.splitlines())) + try: + # Try to parse it as json first + log = StructuredLogMessage.model_validate_json(line) + except ValidationError: + with suppress(Exception): + # If we can't parse the timestamp, don't attach one to the row + next_timestamp = _parse_timestamp(line) + log = StructuredLogMessage(event=line, timestamp=next_timestamp) + if log.timestamp: + log.timestamp = coerce_datetime(log.timestamp) + timestamp = log.timestamp + yield timestamp, idx, log + + +def _interleave_logs(*logs: str) -> Iterable[StructuredLogMessage]: + min_date = pendulum.datetime(2000, 1, 1) + + records = itertools.chain.from_iterable(_parse_log_lines(log.splitlines()) for log in logs) last = None - for timestamp, _, line in sorted( - records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1]) - ): - if line != last or not timestamp: # dedupe - yield line - last = line + for timestamp, _, msg in sorted(records, key=lambda x: (x[0] or min_date, x[1])): + if msg != last or not timestamp: # dedupe + yield msg + last = msg def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: @@ -297,9 +318,6 @@ def _render_filename(self, ti: TaskInstance, try_number: int, session=NEW_SESSIO else: raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") - def _read_grouped_logs(self): - return False - def _get_executor_get_task_log( self, ti: TaskInstance ) -> Callable[[TaskInstance, int], tuple[list[str], list[str]]]: @@ -351,40 +369,40 @@ def _read( # initializing the handler. Thus explicitly getting log location # is needed to get correct log path. worker_log_rel_path = self._render_filename(ti, try_number) - messages_list: list[str] = [] + source_list: list[str] = [] remote_logs: list[str] = [] local_logs: list[str] = [] - executor_messages: list[str] = [] + sources: list[str] = [] executor_logs: list[str] = [] served_logs: list[str] = [] with suppress(NotImplementedError): - remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) - messages_list.extend(remote_messages) + sources, remote_logs = self._read_remote_logs(ti, try_number, metadata) + source_list.extend(sources) has_k8s_exec_pod = False if ti.state == TaskInstanceState.RUNNING: executor_get_task_log = self._get_executor_get_task_log(ti) response = executor_get_task_log(ti, try_number) if response: - executor_messages, executor_logs = response - if executor_messages: - messages_list.extend(executor_messages) + sources, executor_logs = response + if sources: + source_list.extend(sources) has_k8s_exec_pod = True if not (remote_logs and ti.state not in State.unfinished): # when finished, if we have remote logs, no need to check local worker_log_full_path = Path(self.local_base, worker_log_rel_path) - local_messages, local_logs = self._read_from_local(worker_log_full_path) - messages_list.extend(local_messages) + sources, local_logs = self._read_from_local(worker_log_full_path) + source_list.extend(sources) if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: - served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) - messages_list.extend(served_messages) + sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) + source_list.extend(sources) elif ti.state not in State.unfinished and not (local_logs or remote_logs): # ordinarily we don't check served logs, with the assumption that users set up # remote logging or shared drive for logs for persistence, but that's not always true # so even if task is done, if no local logs or remote logs are found, we'll check the worker - served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) - messages_list.extend(served_messages) + sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) + source_list.extend(sources) - logs = "\n".join( + logs = list( _interleave_logs( *local_logs, *remote_logs, @@ -395,18 +413,20 @@ def _read( log_pos = len(logs) # Log message source details are grouped: they are not relevant for most users and can # distract them from finding the root cause of their errors - messages = " INFO - ::group::Log message source details\n" - messages += "".join([f"*** {x}\n" for x in messages_list]) - messages += " INFO - ::endgroup::\n" + header = [ + StructuredLogMessage(event="::group::Log message source details", sources=source_list), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), + ] end_of_log = ti.try_number != try_number or ti.state not in ( TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED, ) if metadata and "log_pos" in metadata: - previous_chars = metadata["log_pos"] - logs = logs[previous_chars:] # Cut off previously passed log test as new tail - out_message = logs if "log_pos" in (metadata or {}) else messages + logs - return out_message, {"end_of_log": end_of_log, "log_pos": log_pos} + previous_line = metadata["log_pos"] + logs = logs[previous_line:] # Cut off previously passed log test as new tail + else: + logs = header + logs + return logs, {"end_of_log": end_of_log, "log_pos": log_pos} @staticmethod def _get_pod_namespace(ti: TaskInstance): @@ -439,43 +459,32 @@ def _get_log_retrieval_url( log_relative_path, ) - def read(self, task_instance, try_number=None, metadata=None): + def read( + self, + task_instance: TaskInstance, + try_number: int | None = None, + metadata: dict[str, Any] | None = None, + ) -> tuple[list[StructuredLogMessage] | str, dict[str, Any]]: """ Read logs of given task instance from local machine. :param task_instance: task instance object :param try_number: task instance try_number to read logs from. If None - it returns all logs separated by try_number + it returns the log of task_instance.try_number :param metadata: log metadata, can be used for steaming log reading and auto-tailing. :return: a list of listed tuples which order log string by host """ - # Task instance increments its try number when it starts to run. - # So the log for a particular task try will only show up when - # try number gets incremented in DB, i.e logs produced the time - # after cli run and before try_number + 1 in DB will not be displayed. if try_number is None: - next_try = task_instance.try_number + 1 - try_numbers = list(range(1, next_try)) - elif try_number < 1: + try_number = task_instance.try_number + if try_number is None or try_number < 1: logs = [ - [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], + StructuredLogMessage( # type: ignore[call-arg] + level="error", event=f"Error fetching the logs. Try number {try_number} is invalid." + ) ] - return logs, [{"end_of_log": True}] - else: - try_numbers = [try_number] - - logs = [""] * len(try_numbers) - metadata_array = [{}] * len(try_numbers) + return logs, {"end_of_log": True} - # subclasses implement _read and may not have log_type, which was added recently - for i, try_number_element in enumerate(try_numbers): - log, out_metadata = self._read(task_instance, try_number_element, metadata) - # es_task_handler return logs grouped by host. wrap other handler returning log string - # with default/ empty host so that UI can render the response in the same way - logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] - metadata_array[i] = out_metadata - - return logs, metadata_array + return self._read(task_instance, try_number, metadata) @staticmethod def _prepare_log_folder(directory: Path, new_folder_permissions: int): @@ -542,23 +551,20 @@ def _init_file(self, ti, *, identifier: str | None = None): @staticmethod def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]: - messages = [] paths = sorted(worker_log_path.parent.glob(worker_log_path.name + "*")) - if paths: - messages.append("Found local files:") - messages.extend(f" * {x}" for x in paths) + sources = [os.fspath(x) for x in paths] logs = [file.read_text() for file in paths] - return messages, logs + return sources, logs def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]: - messages = [] + sources = [] logs = [] try: log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type) response = _fetch_logs_from_service(url, rel_path) if response.status_code == 403: - messages.append( + sources.append( "!!!! Please make sure that all your Airflow components (e.g. " "schedulers, webservers, workers and triggerer) have " "the same 'secret_key' configured in 'webserver' section and " @@ -566,20 +572,21 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li "See more at https://airflow.apache.org/docs/apache-airflow/" "stable/configurations-ref.html#secret-key" ) - # Check if the resource was properly fetched - response.raise_for_status() - if response.text: - messages.append(f"Found logs served from host {url}") - logs.append(response.text) + else: + # Check if the resource was properly fetched + response.raise_for_status() + if response.text: + sources.append(url) + logs.append(response.text) except Exception as e: from requests.exceptions import InvalidSchema if isinstance(e, InvalidSchema) and ti.task.inherits_from_empty_operator is True: - messages.append(self.inherits_from_empty_operator_log_message) + sources.append(self.inherits_from_empty_operator_log_message) else: - messages.append(f"Could not read served logs: {e}") + sources.append(f"Could not read served logs: {e}") logger.exception("Could not read served logs") - return messages, logs + return sources, logs def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: """ diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index cc60500532fb1..d50b8fc126021 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -20,10 +20,11 @@ import time from collections.abc import Iterator from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any, Union from airflow.configuration import conf from airflow.utils.helpers import render_log_filename +from airflow.utils.log.file_task_handler import StructuredLogMessage from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import TaskInstanceState @@ -32,6 +33,10 @@ from sqlalchemy.orm.session import Session from airflow.models.taskinstance import TaskInstance + from airflow.typing_compat import TypeAlias + +LogMessages: TypeAlias = Union[list[StructuredLogMessage], str] +LogMetadata: TypeAlias = dict[str, Any] class TaskLogReader: @@ -42,13 +47,12 @@ class TaskLogReader: def read_log_chunks( self, ti: TaskInstance, try_number: int | None, metadata - ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]: + ) -> tuple[LogMessages, LogMetadata]: """ Read chunks of Task Instance logs. :param ti: The taskInstance - :param try_number: If provided, logs for the given try will be returned. - Otherwise, logs from all attempts are returned. + :param try_number: :param metadata: A dictionary containing information about how to read the task log The following is an example of how to use this method to read log: @@ -62,9 +66,7 @@ def read_log_chunks( contain information about the task log which can enable you read logs to the end. """ - logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata) - metadata = metadatas[0] - return logs, metadata + return self.log_handler.read(ti, try_number, metadata=metadata) def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: dict) -> Iterator[str]: """ @@ -75,29 +77,33 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di :param metadata: A dictionary containing information about how to read the task log """ if try_number is None: - next_try = ti.try_number + 1 - try_numbers = list(range(1, next_try)) - else: - try_numbers = [try_number] - for current_try_number in try_numbers: - metadata.pop("end_of_log", None) - metadata.pop("max_offset", None) - metadata.pop("offset", None) - metadata.pop("log_pos", None) - while True: - logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) - for host, log in logs[0]: - yield "\n".join([host or "", log]) + "\n" - if "end_of_log" not in metadata or ( - not metadata["end_of_log"] - and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) - ): - if not logs[0]: - # we did not receive any logs in this loop - # sleeping to conserve resources / limit requests on external services - time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) - else: - break + try_number = ti.try_number + + for key in ("end_of_log", "max_offset", "offset", "log_pos"): + metadata.pop(key, None) + + while True: + logs, out_metadata = self.read_log_chunks(ti, try_number, metadata) + # Update the metadata dict in place so caller can get new values/end-of-log etc. + + for log in logs: + # It's a bit wasteful here to parse the JSON then dump it back again. + # Optimize this so in stream mode we can just pass logs right through, or even better add + # support to 307 redirect to a signed URL etc. + yield (log if isinstance(log, str) else log.model_dump_json()) + "\n" + + if not out_metadata.get("end_of_log", False) and ti.state not in ( + TaskInstanceState.RUNNING, + TaskInstanceState.DEFERRED, + ): + if not logs[0]: + # we did not receive any logs in this loop + # sleeping to conserve resources / limit requests on external services + time.sleep(self.STREAM_LOOP_SLEEP_SECONDS) + else: + metadata.clear() + metadata.update(out_metadata) + break @cached_property def log_handler(self): diff --git a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py index a1ba2d57ef8ae..121e865b30b69 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -26,6 +26,7 @@ from airflow.configuration import conf from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -58,7 +59,8 @@ def __init__(self, base_log_folder: str, s3_log_folder: str, **kwargs): def hook(self): """Returns S3Hook.""" return S3Hook( - aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), transfer_config_args={"use_threads": False} + aws_conn_id=conf.get("logging", "REMOTE_LOG_CONN_ID"), + transfer_config_args={"use_threads": False}, ) def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: @@ -116,12 +118,16 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l keys = self.hook.list_keys(bucket_name=bucket, prefix=prefix) if keys: keys = sorted(f"s3://{bucket}/{key}" for key in keys) - messages.append("Found logs in s3:") - messages.extend(f" * {key}" for key in keys) + if AIRFLOW_V_3_0_PLUS: + messages = keys + else: + messages.append("Found logs in s3:") + messages.extend(f" * {key}" for key in keys) for key in keys: logs.append(self.s3_read(key, return_error=True)) else: - messages.append(f"No logs found on s3 for ti={ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found on s3 for ti={ti}") return messages, logs def s3_log_exists(self, remote_log_location: str) -> bool: @@ -152,7 +158,13 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str: return msg return "" - def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool: + def s3_write( + self, + log: str, + remote_log_location: str, + append: bool = True, + max_retry: int = 1, + ) -> bool: """ Write the log to the remote_log_location; return `True` or fails silently and return `False`. @@ -185,7 +197,10 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_ break except Exception: if try_num < max_retry: - self.log.warning("Failed attempt to write logs to %s, will retry", remote_log_location) + self.log.warning( + "Failed attempt to write logs to %s, will retry", + remote_log_location, + ) else: self.log.exception("Could not write logs to %s", remote_log_location) return False diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py index 839b644f998a2..48ea1a3cb48fe 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py @@ -30,7 +30,9 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook -from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler +from airflow.providers.amazon.aws.log.cloudwatch_task_handler import ( + CloudwatchTaskHandler, +) from airflow.providers.amazon.aws.utils import datetime_to_epoch_utc_ms from airflow.providers.standard.operators.empty import EmptyOperator from airflow.utils.state import State @@ -74,9 +76,19 @@ def setup_tests(self, create_log_template, tmp_path_factory, session): self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date) task = EmptyOperator(task_id=task_id, dag=self.dag) if AIRFLOW_V_3_0_PLUS: - dag_run = DagRun(dag_id=self.dag.dag_id, logical_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=self.dag.dag_id, + logical_date=date, + run_id="test", + run_type="scheduled", + ) else: - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=self.dag.dag_id, + execution_date=date, + run_id="test", + run_type="scheduled", + ) session.add(dag_run) session.commit() session.refresh(dag_run) @@ -124,8 +136,8 @@ def test_event_to_str(self): ] assert [handler._event_to_str(event) for event in events] == ( [ - f"[{get_time_str(current_time-2000)}] First", - f"[{get_time_str(current_time-1000)}] Second", + f"[{get_time_str(current_time - 2000)}] First", + f"[{get_time_str(current_time - 1000)}] Second", f"[{get_time_str(current_time)}] Third", ] ) @@ -150,21 +162,37 @@ def test_read(self): msg_template = "*** Reading remote log from Cloudwatch log_group: {} log_stream: {}.\n{}\n" events = "\n".join( [ - f"[{get_time_str(current_time-2000)}] First", - f"[{get_time_str(current_time-1000)}] Second", + f"[{get_time_str(current_time - 2000)}] First", + f"[{get_time_str(current_time - 1000)}] Second", f"[{get_time_str(current_time)}] Third", ] ) - assert self.cloudwatch_task_handler.read(self.ti) == ( - [[("", msg_template.format(self.remote_log_group, self.remote_log_stream, events))]], - [{"end_of_log": True}], - ) + if AIRFLOW_V_3_0_PLUS: + assert self.cloudwatch_task_handler.read(self.ti) == ( + msg_template.format(self.remote_log_group, self.remote_log_stream, events), + {"end_of_log": True}, + ) + else: + assert self.cloudwatch_task_handler.read(self.ti) == ( + [ + [ + ( + "", + msg_template.format(self.remote_log_group, self.remote_log_stream, events), + ) + ] + ], + [{"end_of_log": True}], + ) @pytest.mark.parametrize( "end_date, expected_end_time", [ (None, None), - (datetime(2020, 1, 2), datetime_to_epoch_utc_ms(datetime(2020, 1, 2) + timedelta(seconds=30))), + ( + datetime(2020, 1, 2), + datetime_to_epoch_utc_ms(datetime(2020, 1, 2) + timedelta(seconds=30)), + ), ], ) @mock.patch.object(AwsLogsHook, "get_log_events") @@ -191,7 +219,9 @@ def test_get_cloudwatch_logs(self, mock_get_log_events, end_date, expected_end_t id="json-serialize", ), pytest.param( - None, '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": null}', id="not-set" + None, + '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": null}', + id="not-set", ), ], ) @@ -219,7 +249,10 @@ def __repr__(self): "customObject": ToSerialize(), }, ) - with mock.patch("watchtower.threading.Thread"), mock.patch("watchtower.queue.Queue") as mq: + with ( + mock.patch("watchtower.threading.Thread"), + mock.patch("watchtower.queue.Queue") as mq, + ): mock_queue = Mock() mq.return_value = mock_queue handler.handle(message) diff --git a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py index 7bf4110e90dfd..e44991ffe3058 100644 --- a/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py +++ b/providers/amazon/tests/unit/amazon/aws/log/test_s3_task_handler.py @@ -62,9 +62,19 @@ def setup_tests(self, create_log_template, tmp_path_factory, session): self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date) task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag) if AIRFLOW_V_3_0_PLUS: - dag_run = DagRun(dag_id=self.dag.dag_id, logical_date=date, run_id="test", run_type="manual") + dag_run = DagRun( + dag_id=self.dag.dag_id, + logical_date=date, + run_id="test", + run_type="manual", + ) else: - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual") + dag_run = DagRun( + dag_id=self.dag.dag_id, + execution_date=date, + run_id="test", + run_type="manual", + ) session.add(dag_run) session.commit() session.refresh(dag_run) @@ -131,22 +141,36 @@ def test_read(self): ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS log, metadata = self.s3_task_handler.read(ti) - actual = log[0][0][-1] - assert "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\n" in actual - assert actual.endswith("Log line") - assert metadata == [{"end_of_log": True, "log_pos": 8}] + + expected_s3_uri = f"s3://bucket/{self.remote_log_key}" + + if AIRFLOW_V_3_0_PLUS: + assert log[0].event == "::group::Log message source details" + assert expected_s3_uri in log[0].sources + assert log[1].event == "::endgroup::" + assert log[2].event == "Log line" + assert metadata == {"end_of_log": True, "log_pos": 1} + else: + actual = log[0][0][-1] + assert f"*** Found logs in s3:\n*** * {expected_s3_uri}\n" in actual + assert actual.endswith("Log line") + assert metadata == [{"end_of_log": True, "log_pos": 8}] def test_read_when_s3_log_missing(self): ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS self.s3_task_handler._read_from_logs_server = mock.Mock(return_value=([], [])) log, metadata = self.s3_task_handler.read(ti) - assert len(log) == 1 - assert len(log) == len(metadata) - actual = log[0][0][-1] - expected = "*** No logs found on s3 for ti=\n" - assert expected in actual - assert metadata[0] == {"end_of_log": True, "log_pos": 0} + if AIRFLOW_V_3_0_PLUS: + assert len(log) == 2 + assert metadata == {"end_of_log": True, "log_pos": 0} + else: + assert len(log) == 1 + assert len(log) == len(metadata) + actual = log[0][0][-1] + expected = "*** No logs found on s3 for ti=\n" + assert expected in actual + assert metadata[0] == {"end_of_log": True, "log_pos": 0} def test_s3_read_when_log_missing(self): handler = self.s3_task_handler diff --git a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py index be37e13b14bd9..456f680de41fa 100644 --- a/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py +++ b/providers/celery/tests/unit/celery/log_handlers/test_log_handlers.py @@ -37,6 +37,7 @@ from airflow.utils.types import DagRunType from tests_common.test_utils.config import conf_vars +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -78,8 +79,14 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) + logs, metadata = fth._read(ti=ti, try_number=1) fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in actual[0] - assert actual[0].endswith("this\nlog\ncontent") - assert actual[1] == {"end_of_log": False, "log_pos": 16} + + if AIRFLOW_V_3_0_PLUS: + assert metadata == {"end_of_log": False, "log_pos": 3} + assert logs[0].sources == ["this message"] + assert [x.event for x in logs[-3:]] == ["this", "log", "content"] + else: + assert "*** this message\n" in logs + assert logs.endswith("this\nlog\ncontent") + assert metadata == {"end_of_log": False, "log_pos": 16} diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 15904e7ebf3b4..5bf3d2308c294 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -40,7 +40,9 @@ from airflow.configuration import conf from airflow.exceptions import AirflowException from airflow.models.dagrun import DagRun -from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter +from airflow.providers.elasticsearch.log.es_json_formatter import ( + ElasticsearchJSONFormatter, +) from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone @@ -54,10 +56,18 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceKey +if AIRFLOW_V_3_0_PLUS: + from typing import Union + + from airflow.utils.log.file_task_handler import StructuredLogMessage + + EsLogMsgType = Union[list[StructuredLogMessage], str] +else: + EsLogMsgType = list[tuple[str, str]] # type: ignore[misc] + LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} # Elasticsearch hosted log type -EsLogMsgType = list[tuple[str, str]] # Compatibility: Airflow 2.3.3 and up uses this method, which accesses the # LogTemplate model to record the log ID template used. If this function does @@ -344,7 +354,10 @@ def _read( "If your task started recently, please wait a moment and reload this page. " "Otherwise, the logs for this task instance may have been removed." ) - return [("", missing_log_message)], metadata + if AIRFLOW_V_3_0_PLUS: + return missing_log_message, metadata + else: + return [("", missing_log_message)], metadata # type: ignore[list-item] if ( # Assume end of log after not receiving new log for N min, cur_ts.diff(last_log_ts).in_minutes() >= 5 @@ -358,12 +371,30 @@ def _read( # If we hit the end of the log, remove the actual end_of_log message # to prevent it from showing in the UI. - def concat_logs(hits: list[Hit]): + def concat_logs(hits: list[Hit]) -> str: log_range = (len(hits) - 1) if hits[-1].message == self.end_of_log_mark else len(hits) return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) if logs_by_host: - message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.log.file_task_handler import StructuredLogMessage + + header = [ + StructuredLogMessage( + event="::group::Log message source details", + sources=[host for host in logs_by_host.keys()], + ), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), + ] # type: ignore[misc] + + message = header + [ + StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values() + ] # type: ignore[misc] + else: + message = [ + (host, concat_logs(hits)) # type: ignore[misc] + for host, hits in logs_by_host.items() + ] else: message = [] return message, metadata diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index dc9602ee03650..43f63f86d888a 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -208,13 +208,24 @@ def test_read(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns(self, ti): ts = pendulum.now() @@ -223,13 +234,24 @@ def test_read_with_patterns(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns_no_match(self, ti): ts = pendulum.now() @@ -238,20 +260,30 @@ def test_read_with_patterns_no_match(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_missing_index(self, ti): ts = pendulum.now() with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"): with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r"IndexMissingException.*"): self.es_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}, ) @pytest.mark.parametrize("seconds", [3, 6]) @@ -268,23 +300,35 @@ def test_read_missing_logs(self, seconds, create_task_instance): ) ts = pendulum.now().add(seconds=-seconds) logs, metadatas = self.es_task_handler.read(ti, 1, {"offset": 0, "last_log_timestamp": str(ts)}) - - assert len(logs) == 1 - if seconds > 5: - # we expect a log not found message when checking began more than 5 seconds ago - assert len(logs[0]) == 1 - actual_message = logs[0][0][1] - expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" - assert re.match(expected_pattern, actual_message) is not None - assert metadatas[0]["end_of_log"] is True + if AIRFLOW_V_3_0_PLUS: + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" + assert re.match(expected_pattern, logs) is not None + assert metadatas["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert logs == [] + assert metadatas["end_of_log"] is False + assert metadatas["offset"] == "0" + assert timezone.parse(metadatas["last_log_timestamp"]) == ts else: - # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message - assert len(logs[0]) == 0 - assert logs == [[]] - assert metadatas[0]["end_of_log"] is False - assert len(logs) == len(metadatas) - assert metadatas[0]["offset"] == "0" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert len(logs) == 1 + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 1 + actual_message = logs[0][0][1] + expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas[0]["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert len(logs[0]) == 0 + assert logs == [[]] + assert metadatas[0]["end_of_log"] is False + assert len(logs) == len(metadatas) + assert metadatas[0]["offset"] == "0" + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts def test_read_with_match_phrase_query(self, ti): similar_log_id = ( @@ -293,30 +337,63 @@ def test_read_with_match_phrase_query(self, ti): ) another_test_message = "another message" - another_body = {"message": another_test_message, "log_id": similar_log_id, "offset": 1} + another_body = { + "message": another_test_message, + "log_id": similar_log_id, + "offset": 1, + } self.es.index(index=self.index_name, doc_type=self.doc_type, body=another_body, id=1) ts = pendulum.now() logs, metadatas = self.es_task_handler.read( - ti, 1, {"offset": "0", "last_log_timestamp": str(ts), "end_of_log": False, "max_offset": 2} + ti, + 1, + { + "offset": "0", + "last_log_timestamp": str(ts), + "end_of_log": False, + "max_offset": 2, + }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert another_test_message != logs[0] + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_none_metadata(self, ti): logs, metadatas = self.es_task_handler.read(ti, 1) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) < pendulum.now() + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now() def test_read_nonexistent_log(self, ti): ts = pendulum.now() @@ -327,39 +404,67 @@ def test_read_nonexistent_log(self, ti): logs, metadatas = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_empty_metadata(self, ti): ts = pendulum.now() logs, metadatas = self.es_task_handler.read(ti, 1, {}) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] # offset should be initialized to 0 if not provided. - assert metadatas[0]["offset"] == "1" + assert metadata["offset"] == "1" # last_log_timestamp will be initialized using log reading time # if not last_log_timestamp is provided. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + assert timezone.parse(metadata["last_log_timestamp"]) > ts # case where offset is missing but metadata not empty. self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log": False}) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] # offset should be initialized to 0 if not provided. - assert metadatas[0]["offset"] == "0" + assert metadata["offset"] == "0" # last_log_timestamp will be initialized using log reading time # if not last_log_timestamp is provided. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_timeout(self, ti): ts = pendulum.now().subtract(minutes=5) @@ -378,28 +483,51 @@ def test_read_timeout(self, ti): "end_of_log": False, }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert metadatas[0]["end_of_log"] - assert str(offset) == metadatas[0]["offset"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["end_of_log"] + assert str(offset) == metadata["offset"] + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_as_download_logs(self, ti): ts = pendulum.now() logs, metadatas = self.es_task_handler.read( ti, 1, - {"offset": 0, "last_log_timestamp": str(ts), "download_logs": True, "end_of_log": False}, + { + "offset": 0, + "last_log_timestamp": str(ts), + "download_logs": True, + "end_of_log": False, + }, ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert self.test_message == logs[0][0][-1] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["download_logs"] - assert metadatas[0]["offset"] == "1" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "some random stuff" + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert self.test_message == logs[0][0][-1] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "1" + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_raises(self, ti): with mock.patch.object(self.es_task_handler.log, "exception") as mock_exception: @@ -409,11 +537,20 @@ def test_read_raises(self, ti): assert mock_exception.call_count == 1 args, kwargs = mock_exception.call_args assert "Could not read log with log_id:" in args[0] - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + + if AIRFLOW_V_3_0_PLUS: + assert logs == [] + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert metadata["offset"] == "0" + assert not metadata["end_of_log"] def test_set_context(self, ti): self.es_task_handler.set_context(ti) @@ -449,7 +586,11 @@ def test_read_with_json_format(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + if AIRFLOW_V_3_0_PLUS: + assert logs[2].event == expected_message + else: + assert logs[0][0][1] == expected_message def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti): ts = pendulum.now() @@ -477,7 +618,11 @@ def test_read_with_json_format_with_custom_offset_and_host_fields(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert logs[0][0][1] == "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + expected_message = "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " + if AIRFLOW_V_3_0_PLUS: + assert logs[2].event == expected_message + else: + assert logs[0][0][1] == expected_message def test_read_with_custom_offset_and_host_fields(self, ti): ts = pendulum.now() @@ -498,7 +643,10 @@ def test_read_with_custom_offset_and_host_fields(self, ti): logs, _ = self.es_task_handler.read( ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert self.test_message == logs[0][0][1] + if AIRFLOW_V_3_0_PLUS: + pass + else: + assert self.test_message == logs[0][0][1] def test_close(self, ti): formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -578,14 +726,34 @@ def test_clean_date(self): "json_format, es_frontend, expected_url", [ # Common cases - (True, "localhost:5601/{log_id}", "https://localhost:5601/" + quote(JSON_LOG_ID)), - (False, "localhost:5601/{log_id}", "https://localhost:5601/" + quote(LOG_ID)), + ( + True, + "localhost:5601/{log_id}", + "https://localhost:5601/" + quote(JSON_LOG_ID), + ), + ( + False, + "localhost:5601/{log_id}", + "https://localhost:5601/" + quote(LOG_ID), + ), # Ignore template if "{log_id}"" is missing in the URL (False, "localhost:5601", "https://localhost:5601"), # scheme handling - (False, "https://localhost:5601/path/{log_id}", "https://localhost:5601/path/" + quote(LOG_ID)), - (False, "http://localhost:5601/path/{log_id}", "http://localhost:5601/path/" + quote(LOG_ID)), - (False, "other://localhost:5601/path/{log_id}", "other://localhost:5601/path/" + quote(LOG_ID)), + ( + False, + "https://localhost:5601/path/{log_id}", + "https://localhost:5601/path/" + quote(LOG_ID), + ), + ( + False, + "http://localhost:5601/path/{log_id}", + "http://localhost:5601/path/" + quote(LOG_ID), + ), + ( + False, + "other://localhost:5601/path/{log_id}", + "other://localhost:5601/path/" + quote(LOG_ID), + ), ], ) def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url): diff --git a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py index ba2cf4db27d97..45e45b3942b6f 100644 --- a/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/providers/google/src/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -31,9 +31,12 @@ from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url -from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id +from airflow.providers.google.cloud.utils.credentials_provider import ( + get_credentials_and_project_id, +) from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID +from airflow.providers.google.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -188,9 +191,13 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if blobs: uris = [f"gs://{bucket}/{b.name}" for b in blobs] - messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) + if AIRFLOW_V_3_0_PLUS: + messages = uris + else: + messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) else: - messages.append(f"No logs found in GCS; ti=%s {ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found in GCS; ti=%s {ti}") try: for key in sorted(uris): blob = storage.Blob.from_string(key, self.client) @@ -198,7 +205,8 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if remote_log: logs.append(remote_log) except Exception as e: - messages.append(f"Unable to read remote log {e}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"Unable to read remote log {e}") return messages, logs def gcs_write(self, log, remote_log_location) -> bool: diff --git a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py index 3d179e7de1b3f..b86a6fa2c2745 100644 --- a/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py +++ b/providers/google/tests/unit/google/cloud/log/test_gcs_task_handler.py @@ -30,6 +30,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS @pytest.mark.db_test @@ -67,11 +68,15 @@ def gcs_task_handler(self, create_log_template, local_log_location): @mock.patch("google.cloud.storage.Client") @mock.patch("airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id") @pytest.mark.parametrize( - "conn_id", [pytest.param("", id="no-conn"), pytest.param("my_gcs_conn", id="with-conn")] + "conn_id", + [pytest.param("", id="no-conn"), pytest.param("my_gcs_conn", id="with-conn")], ) def test_client_conn_id_behavior(self, mock_get_cred, mock_client, mock_hook, conn_id): """When remote log conn id configured, hook will be used""" - mock_hook.return_value.get_credentials_and_project_id.return_value = ("test_cred", "test_proj") + mock_hook.return_value.get_credentials_and_project_id.return_value = ( + "test_cred", + "test_proj", + ) mock_get_cred.return_value = ("test_cred", "test_proj") with conf_vars({("logging", "remote_log_conn_id"): conn_id}): return_value = self.gcs_task_handler.client @@ -104,12 +109,20 @@ def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds, session.add(ti) session.commit() logs, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) - mock_blob.from_string.assert_called_once_with( - "gs://bucket/remote/log/location/1.log", mock_client.return_value - ) - assert "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\n" in logs - assert logs.endswith("CONTENT") - assert metadata == {"end_of_log": True, "log_pos": 7} + expected_gs_uri = f"gs://bucket/{mock_obj.name}" + + mock_blob.from_string.assert_called_once_with(expected_gs_uri, mock_client.return_value) + + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [expected_gs_uri] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "CONTENT" + assert metadata == {"end_of_log": True, "log_pos": 1} + else: + assert f"*** Found remote logs:\n*** * {expected_gs_uri}\n" in logs + assert logs.endswith("CONTENT") + assert metadata == {"end_of_log": True, "log_pos": 7} @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -127,18 +140,26 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number) + expected_gs_uri = f"gs://bucket/{mock_obj.name}" - assert ( - "*** Found remote logs:\n" - "*** * gs://bucket/remote/log/location/1.log\n" - "*** Unable to read remote log Failed to connect\n" - "*** Found local files:\n" - f"*** * {self.gcs_task_handler.local_base}/1.log\n" - ) in log - assert metadata == {"end_of_log": True, "log_pos": 0} - mock_blob.from_string.assert_called_once_with( - "gs://bucket/remote/log/location/1.log", mock_client.return_value - ) + if AIRFLOW_V_3_0_PLUS: + assert log[0].event == "::group::Log message source details" + assert log[0].sources == [ + expected_gs_uri, + f"{self.gcs_task_handler.local_base}/1.log", + ] + assert log[1].event == "::endgroup::" + assert metadata == {"end_of_log": True, "log_pos": 0} + else: + assert ( + "*** Found remote logs:\n" + "*** * gs://bucket/remote/log/location/1.log\n" + "*** Unable to read remote log Failed to connect\n" + "*** Found local files:\n" + f"*** * {self.gcs_task_handler.local_base}/1.log\n" + ) in log + assert metadata == {"end_of_log": True, "log_pos": 0} + mock_blob.from_string.assert_called_once_with(expected_gs_uri, mock_client.return_value) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py index d32a3463d74cd..8dc5e0ac174d6 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -26,6 +26,7 @@ from azure.core.exceptions import HttpResponseError from airflow.configuration import conf +from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -137,9 +138,13 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l if blob_names: uris = [f"https://{self.wasb_container}.blob.core.windows.net/{b}" for b in blob_names] - messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) + if AIRFLOW_V_3_0_PLUS: + messages = uris + else: + messages.extend(["Found remote logs:", *[f" * {x}" for x in sorted(uris)]]) else: - messages.append(f"No logs found in WASB; ti=%s {ti}") + if not AIRFLOW_V_3_0_PLUS: + messages.append(f"No logs found in WASB; ti=%s {ti}") for name in sorted(blob_names): remote_log = "" diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py index 73084f2a9a7ba..8f48c96002870 100644 --- a/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py +++ b/providers/microsoft/azure/tests/unit/microsoft/azure/log/test_wasb_task_handler.py @@ -32,6 +32,7 @@ from tests_common.test_utils.config import conf_vars from tests_common.test_utils.db import clear_db_dags, clear_db_runs +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS pytestmark = pytest.mark.db_test @@ -112,13 +113,29 @@ def test_wasb_read(self, mock_hook_cls, ti): assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line" ti = copy.copy(ti) ti.state = TaskInstanceState.SUCCESS - assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost" - assert ( - "*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n" - in self.wasb_task_handler.read(ti)[0][0][0][1] - ) - assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1] - assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True, "log_pos": 8} + + logs, metadata = self.wasb_task_handler.read(ti) + + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["https://wasb-container.blob.core.windows.net/abc/hello.log"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == "Log line" + assert metadata == { + "end_of_log": True, + "log_pos": 1, + } + else: + assert logs[0][0][0] == "localhost" + assert ( + "*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n" + in logs[0][0][1] + ) + assert "Log line" in logs[0][0][1] + assert metadata[0] == { + "end_of_log": True, + "log_pos": 8, + } @mock.patch( "airflow.providers.microsoft.azure.hooks.wasb.WasbHook", @@ -152,7 +169,10 @@ def test_write_on_existing_log(self, mock_log_exists, mock_wasb_read, mock_hook) mock_wasb_read.return_value = "old log" self.wasb_task_handler.wasb_write("text", self.remote_log_location) mock_hook.return_value.load_string.assert_called_once_with( - "old log\ntext", self.container_name, self.remote_log_location, overwrite=True + "old log\ntext", + self.container_name, + self.remote_log_location, + overwrite=True, ) @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") diff --git a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py index e1a0a083e7291..58561acfb0628 100644 --- a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py +++ b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py @@ -44,8 +44,19 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + + +if AIRFLOW_V_3_0_PLUS: + from typing import Union + + from airflow.utils.log.file_task_handler import StructuredLogMessage + + OsLogMsgType = Union[list[StructuredLogMessage], str] +else: + OsLogMsgType = list[tuple[str, str]] # type: ignore[misc] + + USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") -OsLogMsgType = list[tuple[str, str]] LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} @@ -375,7 +386,7 @@ def _read( "If your task started recently, please wait a moment and reload this page. " "Otherwise, the logs for this task instance may have been removed." ) - return [("", missing_log_message)], metadata + return [("", missing_log_message)], metadata # type: ignore[list-item] if ( # Assume end of log after not receiving new log for N min, cur_ts.diff(last_log_ts).in_minutes() >= 5 @@ -394,7 +405,22 @@ def concat_logs(hits: list[Hit]): return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) if logs_by_host: - message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.log.file_task_handler import StructuredLogMessage + + header = [ + StructuredLogMessage( + event="::group::Log message source details", + sources=[host for host in logs_by_host.keys()], + ), # type: ignore[call-arg] + StructuredLogMessage(event="::endgroup::"), + ] + + message = header + [ + StructuredLogMessage(event=concat_logs(hits)) for hits in logs_by_host.values() + ] + else: + message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] # type: ignore[misc] else: message = [] return message, metadata diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index 475a325a242e3..6e7db1527189a 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -80,7 +80,10 @@ def ti(self, create_task_instance, create_log_template): if AIRFLOW_V_3_0_PLUS: create_log_template(self.FILENAME_TEMPLATE, "{dag_id}-{task_id}-{logical_date}-{try_number}") else: - create_log_template(self.FILENAME_TEMPLATE, "{dag_id}-{task_id}-{execution_date}-{try_number}") + create_log_template( + self.FILENAME_TEMPLATE, + "{dag_id}-{task_id}-{execution_date}-{try_number}", + ) yield get_ti( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -195,17 +198,28 @@ def test_read(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + expected_msg = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_msg + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_msg + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns(self, ti): ts = pendulum.now() @@ -214,17 +228,28 @@ def test_read_with_patterns(self, ti): ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} ) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert len(logs[0]) == 1 - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + expected_msg = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_msg + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_msg + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) > ts def test_read_with_patterns_no_match(self, ti): ts = pendulum.now() @@ -240,26 +265,43 @@ def test_read_with_patterns_no_match(self, ti): }, ): logs, metadatas = self.os_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}, ) + if AIRFLOW_V_3_0_PLUS: + assert logs == [] - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert logs == [[]] - assert not metadatas[0]["end_of_log"] - assert metadatas[0]["offset"] == "0" + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert logs == [[]] + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert metadata["offset"] == "0" # last_log_timestamp won't change if no log lines read. - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert timezone.parse(metadata["last_log_timestamp"]) == ts def test_read_with_missing_index(self, ti): ts = pendulum.now() with mock.patch.object(self.os_task_handler, "index_patterns", new="nonexistent,test_*"): with mock.patch.object( - self.os_task_handler.client, "count", side_effect=NotFoundError(404, "IndexNotFoundError") + self.os_task_handler.client, + "count", + side_effect=NotFoundError(404, "IndexNotFoundError"), ): with pytest.raises(NotFoundError, match=r"IndexNotFoundError"): self.os_task_handler.read( - ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False} + ti, + 1, + { + "offset": 0, + "last_log_timestamp": str(ts), + "end_of_log": False, + }, ) @pytest.mark.parametrize("seconds", [3, 6]) @@ -286,36 +328,64 @@ def test_read_missing_logs(self, seconds, create_task_instance): }, ): logs, metadatas = self.os_task_handler.read(ti, 1, {"offset": 0, "last_log_timestamp": str(ts)}) - - assert len(logs) == 1 - if seconds > 5: - # we expect a log not found message when checking began more than 5 seconds ago - assert len(logs[0]) == 1 - actual_message = logs[0][0][1] - expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" - assert re.match(expected_pattern, actual_message) is not None - assert metadatas[0]["end_of_log"] is True + if AIRFLOW_V_3_0_PLUS: + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 2 + actual_message = logs[0][1] + expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert logs == [] + assert metadatas["end_of_log"] is False + assert metadatas["offset"] == "0" + assert timezone.parse(metadatas["last_log_timestamp"]) == ts else: - # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message - assert len(logs[0]) == 0 - assert logs == [[]] - assert metadatas[0]["end_of_log"] is False - assert len(logs) == len(metadatas) - assert metadatas[0]["offset"] == "0" - assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + assert len(logs) == 1 + if seconds > 5: + # we expect a log not found message when checking began more than 5 seconds ago + assert len(logs[0]) == 1 + actual_message = logs[0][0][1] + expected_pattern = r"^\*\*\* Log .* not found in Opensearch.*" + assert re.match(expected_pattern, actual_message) is not None + assert metadatas[0]["end_of_log"] is True + else: + # we've "waited" less than 5 seconds so it should not be "end of log" and should be no log message + assert len(logs[0]) == 0 + assert logs == [[]] + assert metadatas[0]["end_of_log"] is False + assert len(logs) == len(metadatas) + assert metadatas[0]["offset"] == "0" + assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts def test_read_with_none_metadata(self, ti): logs, metadatas = self.os_task_handler.read(ti, 1) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert ( - logs[0][0][-1] == "Dependencies all met for dep_context=non-requeueable" + + expected_message = ( + "Dependencies all met for dep_context=non-requeueable" " deps ti=\n" "Starting attempt 1 of 1\nExecuting " "on 2023-07-09 07:47:32+00:00" ) - assert not metadatas[0]["end_of_log"] - assert timezone.parse(metadatas[0]["last_log_timestamp"]) < pendulum.now() + if AIRFLOW_V_3_0_PLUS: + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == ["default_host"] + assert logs[1].event == "::endgroup::" + assert logs[2].event == expected_message + + metadata = metadatas + else: + assert len(logs) == 1 + assert len(logs) == len(metadatas) + assert len(logs[0]) == 1 + assert logs[0][0][-1] == expected_message + + metadata = metadatas[0] + + assert not metadata["end_of_log"] + assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now() def test_set_context(self, ti): self.os_task_handler.set_context(ti) diff --git a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py index 7107b2ab3a421..d1fd1cc8de0d6 100644 --- a/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py +++ b/providers/redis/src/airflow/providers/redis/log/redis_task_handler.py @@ -23,6 +23,7 @@ from airflow.configuration import conf from airflow.providers.redis.hooks.redis import RedisHook +from airflow.providers.redis.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin @@ -79,6 +80,8 @@ def _read( log_str = b"\n".join( self.conn.lrange(self._render_filename(ti, try_number), start=0, end=-1) ).decode() + if AIRFLOW_V_3_0_PLUS: + log_str = [log_str] # type: ignore[assignment] return log_str, {"end_of_log": True} def set_context(self, ti: TaskInstance, **kwargs) -> None: diff --git a/providers/redis/src/airflow/providers/redis/version_compat.py b/providers/redis/src/airflow/providers/redis/version_compat.py new file mode 100644 index 0000000000000..21e7170194e36 --- /dev/null +++ b/providers/redis/src/airflow/providers/redis/version_compat.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS COPIED MANUALLY IN OTHER PROVIDERS DELIBERATELY TO AVOID ADDING UNNECESSARY +# DEPENDENCIES BETWEEN PROVIDERS. IF YOU WANT TO ADD CONDITIONAL CODE IN YOUR PROVIDER THAT DEPENDS +# ON AIRFLOW VERSION, PLEASE COPY THIS FILE TO THE ROOT PACKAGE OF YOUR PROVIDER AND IMPORT +# THOSE CONSTANTS FROM IT RATHER THAN IMPORTING THEM FROM ANOTHER PROVIDER OR TEST CODE +# +from __future__ import annotations + + +def get_base_airflow_version_tuple() -> tuple[int, int, int]: + from packaging.version import Version + + from airflow import __version__ + + airflow_version = Version(__version__) + return airflow_version.major, airflow_version.minor, airflow_version.micro + + +AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0) +AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0) diff --git a/providers/redis/tests/unit/redis/log/test_redis_task_handler.py b/providers/redis/tests/unit/redis/log/test_redis_task_handler.py index 8d3081b028057..ac3c3a4078ea9 100644 --- a/providers/redis/tests/unit/redis/log/test_redis_task_handler.py +++ b/providers/redis/tests/unit/redis/log/test_redis_task_handler.py @@ -51,7 +51,12 @@ def ti(self): run_type="scheduled", ) else: - dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + dag_run = DagRun( + dag_id=dag.dag_id, + execution_date=date, + run_id="test", + run_type="scheduled", + ) dag_run.set_state(State.RUNNING) with create_session() as session: @@ -105,5 +110,8 @@ def test_read(self, ti): lrange.return_value = [b"Line 1", b"Line 2"] logs = handler.read(ti) - assert logs == ([[("", "Line 1\nLine 2")]], [{"end_of_log": True}]) + if AIRFLOW_V_3_0_PLUS: + assert logs == (["Line 1\nLine 2"], {"end_of_log": True}) + else: + assert logs == ([[("", "Line 1\nLine 2")]], [{"end_of_log": True}]) lrange.assert_called_once_with(key, start=0, end=-1) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 2aeb00f83491d..548679757104d 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -183,9 +183,11 @@ def test_providers_modules_should_have_tests(self): "providers/google/tests/unit/google/test_version_compat.py", "providers/http/tests/unit/http/test_exceptions.py", "providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_adls.py", + "providers/microsoft/azure/tests/unit/microsoft/azure/test_version_compat.py", "providers/openlineage/tests/unit/openlineage/test_version_compat.py", "providers/opensearch/tests/unit/opensearch/test_version_compat.py", "providers/presto/tests/unit/presto/test_version_compat.py", + "providers/redis/tests/unit/redis/test_version_compat.py", "providers/snowflake/tests/unit/snowflake/triggers/test_snowflake_trigger.py", "providers/standard/tests/unit/standard/operators/test_empty.py", "providers/standard/tests/unit/standard/operators/test_latest_only.py", @@ -558,8 +560,7 @@ class TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest "GoogleCampaignManagerReportSensor", "airflow.providers.google.marketing_platform.sensors.display_video." "GoogleDisplayVideo360GetSDFDownloadOperationSensor", - "airflow.providers.google.marketing_platform.sensors.display_video." - "GoogleDisplayVideo360ReportSensor", + "airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor", } @pytest.mark.xfail(reason="We did not reach full coverage yet") diff --git a/tests/api_fastapi/core_api/routes/public/test_log.py b/tests/api_fastapi/core_api/routes/public/test_log.py index 2792dd776cf97..b9c43639539d6 100644 --- a/tests/api_fastapi/core_api/routes/public/test_log.py +++ b/tests/api_fastapi/core_api/routes/public/test_log.py @@ -165,12 +165,11 @@ def test_should_respond_200_json(self, try_number): ) expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log" log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "[('localhost'," in response.json()["content"] - assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json()["content"] - assert f"{log_content}')]" in response.json()["content"] + resp_contnt = response.json()["content"] + assert expected_filename in resp_contnt[0]["sources"] + assert log_content in resp_contnt[2]["event"] - info = serializer.loads(response.json()["continuation_token"]) - assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18} + assert response.json()["continuation_token"] is None assert response.status_code == 200 @pytest.mark.parametrize( @@ -220,9 +219,10 @@ def test_should_respond_200_text_plain( assert response.status_code == 200 log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") + resp_content = response.content.decode("utf-8") + + assert expected_filename in resp_content + assert log_content in resp_content @pytest.mark.parametrize( "request_url, expected_filename, extra_query_string, try_number", @@ -275,9 +275,9 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu assert response.status_code == 200 log_content = "Log for testing." if try_number == 1 else "Log for testing 2." - assert "localhost\n" in response.content.decode("utf-8") - assert f"*** Found local files:\n*** * {expected_filename}\n" in response.content.decode("utf-8") - assert f"{log_content}\n" in response.content.decode("utf-8") + resp_content = response.content.decode("utf-8") + assert expected_filename in resp_content + assert log_content in resp_content @pytest.mark.parametrize("try_number", [1, 2]) def test_get_logs_response_with_ti_equal_to_none(self, try_number): @@ -295,10 +295,10 @@ def test_get_logs_response_with_ti_equal_to_none(self, try_number): @pytest.mark.parametrize("try_number", [1, 2]) def test_get_logs_with_metadata_as_download_large_file(self, try_number): with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock: - first_return = ([[("", "1st line")]], [{}]) - second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) - third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + first_return = (["", "1st line"], {}) + second_return = (["", "2nd line"], {"end_of_log": False}) + third_return = (["", "3rd line"], {"end_of_log": True}) + fourth_return = (["", "should never be read"], {"end_of_log": True}) read_mock.side_effect = [first_return, second_return, third_return, fourth_return] response = self.client.get( diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index fe0f13297c86f..3f9e3c67e5552 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -125,103 +125,95 @@ def test_test_read_log_chunks_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS - logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={}) - assert logs[0] == [ - ( - "localhost", - " INFO - ::group::Log message source details\n" - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\n" - "try_number=1.", - ) + logs, metadata = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={}) + + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [ + f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log" ] - assert metadatas == {"end_of_log": True, "log_pos": 13} + assert logs[1].event == "::endgroup::" + assert logs[2].event == "try_number=1." + assert metadata == {"end_of_log": True, "log_pos": 1} - def test_test_read_log_chunks_should_read_all_files(self): + def test_test_read_log_chunks_should_read_latest_files(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS - logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={}) + logs, metadata = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={}) - for i in range(0, 3): - assert logs[i][0][0] == "localhost" - assert ( - "*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i + 1}.log\n" - ) in logs[i][0][1] - assert f"try_number={i + 1}." in logs[i][0][1] - assert metadatas == {"end_of_log": True, "log_pos": 13} + assert logs[0].event == "::group::Log message source details" + assert logs[0].sources == [ + f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log" + ] + assert logs[1].event == "::endgroup::" + assert logs[2].event == f"try_number={ti.try_number}." + assert metadata == {"end_of_log": True, "log_pos": 1} def test_test_test_read_log_stream_should_read_one_try(self): task_log_reader = TaskLogReader() ti = copy.copy(self.ti) ti.state = TaskInstanceState.SUCCESS stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={}) + assert list(stream) == [ - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\ntry_number=1.\n" + '{"timestamp":null,' + '"event":"::group::Log message source details",' + f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log"]' + "}\n", + '{"timestamp":null,"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"try_number=1."}\n', ] - def test_test_test_read_log_stream_should_read_all_logs(self): + def test_test_test_read_log_stream_should_read_latest_logs(self): task_log_reader = TaskLogReader() self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) + assert list(stream) == [ - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n" - " INFO - ::endgroup::\ntry_number=1." - "\n", - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n" - " INFO - ::endgroup::\ntry_number=2." - "\n", - "localhost\n INFO - ::group::Log message source details\n*** Found local files:\n" - f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n" - " INFO - ::endgroup::\ntry_number=3." - "\n", + '{"timestamp":null,' + '"event":"::group::Log message source details",' + f'"sources":["{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log"]' + "}\n", + '{"timestamp":null,"event":"::endgroup::"}\n', + '{"timestamp":null,"event":"try_number=3."}\n', ] @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") def test_read_log_stream_should_support_multiple_chunks(self, mock_read): - first_return = ([[("", "1st line")]], [{}]) - second_return = ([[("", "2nd line")]], [{"end_of_log": False}]) - third_return = ([[("", "3rd line")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) + first_return = (["1st line"], {}) + second_return = (["2nd line"], {"end_of_log": False}) + third_return = (["3rd line"], {"end_of_log": True}) + fourth_return = (["should never be read"], {"end_of_log": True}) mock_read.side_effect = [first_return, second_return, third_return, fourth_return] task_log_reader = TaskLogReader() self.ti.state = TaskInstanceState.SUCCESS log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={}) - assert list(log_stream) == ["\n1st line\n", "\n2nd line\n", "\n3rd line\n"] + assert list(log_stream) == ["1st line\n", "2nd line\n", "3rd line\n"] + # as the metadata is now updated in place, when the latest run update metadata. + # the metadata stored in the mock_read will also be updated + # https://github.com/python/cpython/issues/77848 mock_read.assert_has_calls( [ - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 1, metadata={"end_of_log": False}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), + mock.call(self.ti, 1, metadata={"end_of_log": True}), ], any_order=False, ) @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") def test_read_log_stream_should_read_each_try_in_turn(self, mock_read): - first_return = ([[("", "try_number=1.")]], [{"end_of_log": True}]) - second_return = ([[("", "try_number=2.")]], [{"end_of_log": True}]) - third_return = ([[("", "try_number=3.")]], [{"end_of_log": True}]) - fourth_return = ([[("", "should never be read")]], [{"end_of_log": True}]) - mock_read.side_effect = [first_return, second_return, third_return, fourth_return] + mock_read.side_effect = [(["try_number=3."], {"end_of_log": True})] task_log_reader = TaskLogReader() log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={}) - assert list(log_stream) == ["\ntry_number=1.\n", "\ntry_number=2.\n", "\ntry_number=3.\n"] + assert list(log_stream) == ["try_number=3.\n"] mock_read.assert_has_calls( [ - mock.call(self.ti, 1, metadata={}), - mock.call(self.ti, 2, metadata={}), - mock.call(self.ti, 3, metadata={}), + mock.call(self.ti, 3, metadata={"end_of_log": True}), ], any_order=False, ) diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f7d1b092f13c4..f1245b863636b 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -17,10 +17,12 @@ # under the License. from __future__ import annotations +import itertools import logging import logging.config import os import re +from collections.abc import Iterable from http import HTTPStatus from importlib import reload from pathlib import Path @@ -28,7 +30,9 @@ from unittest.mock import patch import pendulum +import pendulum.tz import pytest +from pydantic import TypeAdapter from pydantic.v1.utils import deep_update from requests.adapters import Response @@ -43,9 +47,10 @@ from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, + StructuredLogMessage, _fetch_logs_from_service, _interleave_logs, - _parse_timestamps_in_log_file, + _parse_log_lines, ) from airflow.utils.log.logging_mixin import set_context from airflow.utils.net import get_hostname @@ -63,6 +68,19 @@ FILE_TASK_HANDLER = "task" +def events(logs: Iterable[StructuredLogMessage], skip_source_info=True) -> list[str]: + """Helper function to return just the event (a.k.a message) from a list of StructuredLogMessage""" + logs = iter(logs) + if skip_source_info: + + def is_source_group(log: StructuredLogMessage): + return not hasattr(log, "timestamp") or log.event == "::endgroup" + + logs = itertools.dropwhile(is_source_group, logs) + + return [s.event for s in logs] + + class TestFileTaskLogHandler: def clean_up(self): with create_session() as session: @@ -111,6 +129,7 @@ def task_callable(ti): assert file_handler.handler is not None # We expect set_context generates a file locally. log_filename = file_handler.handler.baseFilename + assert os.path.isfile(log_filename) assert log_filename.endswith("0.log"), log_filename @@ -122,14 +141,9 @@ def task_callable(ti): assert hasattr(file_handler, "read") # Return value of read must be a tuple of list and list. # passing invalid `try_number` to read function - logs, metadatas = file_handler.read(ti, 0) - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) - assert logs[0][0][0] == "default_host" - assert logs[0][0][1] == "Error fetching the logs. Try number 0 is invalid." + log, metadata = file_handler.read(ti, 0) + assert isinstance(metadata, dict) + assert log[0].event == "Error fetching the logs. Try number 0 is invalid." # Remove the generated tmp log file. os.remove(log_filename) @@ -146,9 +160,8 @@ def task_callable(ti): dagrun = dag_maker.create_dagrun() - (ti,) = dagrun.get_task_instances() + (ti,) = dagrun.get_task_instances(session=session) ti.try_number += 1 - session.merge(ti) session.flush() logger = ti.log ti.log.disabled = False @@ -171,18 +184,14 @@ def task_callable(ti): file_handler.close() assert hasattr(file_handler, "read") - # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 1 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) - target_re = r"\n\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\n" + log, metadata = file_handler.read(ti, 1) + assert isinstance(metadata, dict) + target_re = re.compile(r"\A\[[^\]]+\] {test_log_handlers.py:\d+} INFO - test\Z") # We should expect our log line from the callable above to appear in # the logs we read back - assert re.search(target_re, logs[0][0][-1]), "Logs were " + str(logs) + + assert any(re.search(target_re, e) for e in events(log)), "Logs were " + str(log) # Remove the generated tmp log file. os.remove(log_filename) @@ -309,14 +318,10 @@ def task_callable(ti): logger.info("Test") # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) + logs, metadata = file_handler.read(ti) assert isinstance(logs, list) # Logs for running tasks should show up too. - assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == 2 - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) + assert isinstance(metadata, dict) # Remove the generated tmp log file. os.remove(log_filename) @@ -377,7 +382,7 @@ def task_callable(ti): assert current_file_size < max_bytes_size # Return value of read must be a tuple of list and list. - logs, metadatas = file_handler.read(ti) + logs, metadata = file_handler.read(ti) # the log content should have the filename of both current log file and rotate log file. find_current_log = False @@ -390,12 +395,8 @@ def task_callable(ti): assert find_current_log is True assert find_rotate_log_1 is True - assert isinstance(logs, list) # Logs for running tasks should show up too. assert isinstance(logs, list) - assert isinstance(metadatas, list) - assert len(logs) == len(metadatas) - assert isinstance(metadatas[0], dict) # Remove the two generated tmp log files. os.remove(log_filename) @@ -418,11 +419,12 @@ def test__read_when_local(self, mock_read_local, create_task_instance): logical_date=DEFAULT_DATE, ) fth = FileTaskHandler("") - actual = fth._read(ti=local_log_file_read, try_number=1) + logs, metadata = fth._read(ti=local_log_file_read, try_number=1) mock_read_local.assert_called_with(path) - assert "*** the messages\n" in actual[0] - assert actual[0].endswith("the log") - assert actual[1] == {"end_of_log": True, "log_pos": 7} + as_text = events(logs) + assert logs[0].sources == ["the messages"] + assert as_text[-1] == "the log" + assert metadata == {"end_of_log": True, "log_pos": 1} def test__read_from_local(self, tmp_path): """Tests the behavior of method _read_from_local""" @@ -433,11 +435,7 @@ def test__read_from_local(self, tmp_path): path2.write_text("file2 content") fth = FileTaskHandler("") assert fth._read_from_local(path1) == ( - [ - "Found local files:", - f" * {path1}", - f" * {path2}", - ], + [str(path1), str(path2)], ["file1 content", "file2 content"], ) @@ -480,16 +478,21 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs( fth._read_from_local.return_value = ["found local logs"], ["local\nlog\ncontent"] fth._read_from_logs_server = mock.Mock() fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"] - actual = fth._read(ti=ti, try_number=1) + logs, metadata = fth._read(ti=ti, try_number=1) if served_logs_checked: fth._read_from_logs_server.assert_called_once() - assert "*** this message\n" in actual[0] - assert actual[0].endswith("this\nlog\ncontent") - assert actual[1] == {"end_of_log": True, "log_pos": 16} + assert events(logs) == [ + "::group::Log message source details", + "::endgroup::", + "this", + "log", + "content", + ] + assert metadata == {"end_of_log": True, "log_pos": 3} else: fth._read_from_logs_server.assert_not_called() - assert actual[0] - assert actual[1] + assert logs + assert metadata def test_add_triggerer_suffix(self): sample = "any/path/to/thing.txt" @@ -617,7 +620,7 @@ def test_log_retrieval_valid_trigger(self, create_task_instance): def test_parse_timestamps(): actual = [] - for timestamp, _, _ in _parse_timestamps_in_log_file(log_sample.splitlines()): + for timestamp, _, _ in _parse_log_lines(log_sample.splitlines()): actual.append(timestamp) assert actual == [ pendulum.parse("2022-11-16T00:05:54.278000-08:00"), @@ -671,85 +674,88 @@ def test_interleave_interleaves(): "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", ] ) - expected = "\n".join( - [ - "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", - "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing on 2022-11-16 08:05:52.324532+00:00", - "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task", - "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']", - "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait", - "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running on host daniels-mbp-2.lan", - "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow", - "AIRFLOW_CTX_DAG_ID=simple_async_timedelta", - "AIRFLOW_CTX_TASK_ID=wait", - "AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00", - "AIRFLOW_CTX_TRY_NUMBER=1", - "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00", - "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554", - ] + + # -08:00 + tz = pendulum.tz.fixed_timezone(-28800) + DateTime = pendulum.DateTime + expected = [ + { + "event": "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 278000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - " + "Executing on 2022-11-16 " + "08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 295000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - " + "Started process 52536 to run task", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 300000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - " + "Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', " + "'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', " + "'33648', '--raw', '--subdir', " + "'/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', " + "'--cfg-path', " + "'/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 306000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - " + "Job 33648: Subtask wait", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 309000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - " + "Running on host " + "daniels-mbp-2.lan", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 457000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - " + "Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_DAG_ID=simple_async_timedelta", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_TASK_ID=wait", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_LOGICAL_DATE=2022-11-16T08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_TRY_NUMBER=1", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 592000, tzinfo=tz), + }, + { + "event": "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - " + "Pausing task as DEFERRED. dag_id=simple_async_timedelta, " + "task_id=wait, execution_date=20221116T080552, " + "start_date=20221116T080554", + "timestamp": DateTime(2022, 11, 16, 0, 5, 54, 604000, tzinfo=tz), + }, + ] + # Use a type adapter to durn it in to dicts -- makes it easier to compare/test than a bunch of objects + results = TypeAdapter(list[StructuredLogMessage]).dump_python( + _interleave_logs(log_sample2, log_sample1, log_sample3) ) - assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected - - -long_sample = """ -*** yoyoyoyo -[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1 -[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task -[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging'] -[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait -[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_LOGICAL_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00' -[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646 -[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral) - -[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1 -[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task -[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging'] -[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait -[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_LOGICAL_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00' -[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646 -[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral) -[2023-01-15T22:37:17.673-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti= -[2023-01-15T22:37:17.681-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti= -[2023-01-15T22:37:17.682-0800] {taskinstance.py:1330} INFO - resuming after deferral -[2023-01-15T22:37:17.693-0800] {taskinstance.py:1351} INFO - Executing on 2023-01-16 06:36:43.044492+00:00 -[2023-01-15T22:37:17.697-0800] {standard_task_runner.py:56} INFO - Started process 39090 to run task -[2023-01-15T22:37:17.703-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '488', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp_sa9sau4', '--no-shut-down-logging'] -[2023-01-15T22:37:17.707-0800] {standard_task_runner.py:84} INFO - Job 488: Subtask wait -[2023-01-15T22:37:17.771-0800] {task_command.py:417} INFO - Running on host daniels-mbp-2.lan -[2023-01-15T22:37:18.043-0800] {taskinstance.py:1369} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646, end_date=20230116T063718 -[2023-01-15T22:37:18.117-0800] {local_task_job.py:220} INFO - Task exited with return code 0 -[2023-01-15T22:37:18.147-0800] {taskinstance.py:2648} INFO - 0 downstream tasks scheduled from follow-on schedule check -[2023-01-15T22:37:18.173-0800] {:0} Level None - end_of_log - -*** hihihi! -[2023-01-15T22:36:48.348-0800] {temporal.py:62} INFO - trigger starting -[2023-01-15T22:36:48.348-0800] {temporal.py:66} INFO - 24 seconds remaining; sleeping 10 seconds -[2023-01-15T22:36:58.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:36:59.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:00.349-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:01.350-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:02.350-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:03.351-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:04.351-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:05.353-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:06.354-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:07.355-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:08.356-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:09.357-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:10.358-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:11.359-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:12.359-0800] {temporal.py:71} INFO - sleeping 1 second... -[2023-01-15T22:37:13.360-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC')) -[2023-01-15T22:37:13.361-0800] {triggerer_job.py:540} INFO - Trigger (ID 106) fired: TriggerEvent -""" + # TypeAdapter gives us a generator out when it's generator is an input. Nice, but not useful for testing + results = list(results) + assert results == expected def test_interleave_logs_correct_ordering(): @@ -765,7 +771,8 @@ def test_interleave_logs_correct_ordering(): [2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger (ID 1) fired: TriggerEvent """ - assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) + logs = events(_interleave_logs(sample_with_dupe, "", sample_with_dupe)) + assert sample_with_dupe == "\n".join(logs) def test_interleave_logs_correct_dedupe(): @@ -780,7 +787,8 @@ def test_interleave_logs_correct_dedupe(): test, test""" - assert sample_without_dupe == "\n".join(_interleave_logs(",\n ".join(["test"] * 10))) + logs = events(_interleave_logs(",\n ".join(["test"] * 10))) + assert sample_without_dupe == "\n".join(logs) def test_permissions_for_new_directories(tmp_path):