Skip to content

Commit f234d4f

Browse files
committed
Resolve OOM When Reading Large Logs in Webserver (#49470)
* Add note for new usage of LogMetadata * Add _stream_parsed_lines_by_chunk * Refactor _read_from_local/logs_server as return stream * Refactor _interleave_logs with K-Way Merge * Add _get_compatible_log_stream * Refactor _read method to return stream with compatible interface - Add compatible interface for executor, remote_logs - Refactor skip log_pos with skip for each log source * Refactor log_reader to adapt stream * Fix _read_from_local open closed file error * Refactor LogReader by yielding in batch * Add ndjson header to get_log openapi schema * Fix _add_log_from_parsed_log_streams_to_heap - Add comparator for StructuredLogMessage - Refactor parsed_log_streams from list to dict for removing empty logs * Fix _interleave_logs dedupe logic - should check the current logs with default timestamp * Refactor test_log_handlers - Fix events utils - Add convert_list_to_stream, mock_parsed_logs_factory utils - Fix the following test after refactoring FileTaskHandler - test_file_task_handler_when_ti_value_is_invalid - test_file_task_handler - test_file_task_handler_running - test_file_task_handler_rotate_size_limit - test__read_when_local - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs - test_interleave_interleaves - test_interleave_logs_correct_ordering - test_interleave_logs_correct_dedupe - Add new test for refactoring FileTaskHandler - test__stream_lines_by_chunk - test__log_stream_to_parsed_log_stream - test__sort_key - test__is_sort_key_with_default_timestamp - test__is_logs_stream_like - test__add_log_from_parsed_log_streams_to_heap * Move test_log_handlers utils to test_common * Fix unit/celery/log_handlers test * Fix mypy-providers static check * Fix _get_compatible_log_stream - sequential yield instead of parallel yield for all log_stream * Fix amazon task_handler test * Fix wask task handler test * Fix elasticsearch task handler test * Fix opensearch task handler test * Fix TaskLogReader buffer - don't concat buffer with empty str, yield directly from buffer * Fix test_log_reader * Fix CloudWatchRemoteLogIO.read mypy * Fix test_gcs_task_handler * Fix core_api test_log * Fix CloudWatchRemoteLogIO._event_to_str dt format * Fix TestCloudRemoteLogIO.test_log_message * Fix es/os task_hander convert_list_to_stream * Fix compact tests * Refactor es,os task handler for 3.0 compact * Fix compat for RedisTaskHandler * Fix ruff format for test_cloudwatch_task_handler after rebase * Fix 2.10 compat TestCloudwatchTaskHandler * Fix 3.0 compat test for celery, wasb Fix wasb test, spelling * Fix 3.0 compat test for gcs * Fix 3.0 compat test for cloudwatch, s3 * Set get_log API default response format to JSON * Remove "first_time_read" key in log metadata * Remove "<source>_log_pos" key in log metadata * Add LogStreamCounter for backward compatibility * Remove "first_time_read" with backward "log_pos" for tests - test_log_reader - test_log_handlers - test_cloudwatch_task_handler - test_s3_task_handler - celery test_log_handler - test_gcs_task_handler - test_wasb_task_handler - fix redis_task_handler - fix log_pos * Fix RedisTaskHandler compatibility * Fix chores in self review - Fix typo in _read_from_logs_server - Remove unused parameters in _stream_lines_by_chunk - read_log_stream - Fix doc string by removing outdate note - Only add buffer for full_download - Add test ndjson format for get_log API * Fine-tune HEAP_DUMP_SIZE * Replace get_compatible_output_log_stream with iter * Remove buffer in log_reader * Fix log_id not found compact for es_task_handler * Fix review comments - rename LogStreamCounter as LogStreamAccumulator - simply for-yield with yield-from in log_reader - add type annotation for LogStreamAccumulator * Refactor LogStreamAccumulator._capture method - use itertools.isslice to get chunk * Fix type hint, joinedload for ti.dag_run after merge * Replace _sort_key as _create_sort_key * Add _flush_logs_out_of_heap common util * Fix review nits - _is_logs_stream_like - add type annotation - reduce to 1 isinstance call - construct log_streams in _get_compatible_log_stream inline - use TypeDict for LogMetadata - remove len(logs) to check empty - revert typo of self.log_handler.read in log_reader - log_stream_accumulator - refactor flush logic - make totoal_lines as property - make stream as property * Fix mypy errors after merge * Fix redis task handler test * Refactor _capture logic in LogStreamAccumulator * Add comments for ingore LogMetadata TypeDict * Add comment for offset; Fix commet for LogMessages * Refactor with from_iterable, islice * Fix nits in test - refactor structured_logs fixtures in TestLogStreamAccumulator - use f-strign in test_file_task_handler - assert actual value of _create_sort_key - add details comments in test__add_log_from_parsed_log_streams_to_heap * Refactor test_utils * Add comment for lazy initialization * Fix error handling for _stream_lines_by_chunk * Fix mypy error after merge * Fix final review nits * Fix mypy error (cherry picked from commit ee54fe9)
1 parent 9826d5a commit f234d4f

File tree

22 files changed

+1442
-273
lines changed

22 files changed

+1442
-273
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import contextlib
2121
import textwrap
2222

23-
from fastapi import Depends, HTTPException, Request, Response, status
23+
from fastapi import Depends, HTTPException, Request, status
24+
from fastapi.responses import StreamingResponse
2425
from itsdangerous import BadSignature, URLSafeSerializer
2526
from pydantic import NonNegativeInt
2627
from sqlalchemy.orm import joinedload
@@ -119,12 +120,17 @@ def get_log(
119120
)
120121
ti = session.scalar(query)
121122
if ti is None:
122-
query = select(TaskInstanceHistory).where(
123-
TaskInstanceHistory.task_id == task_id,
124-
TaskInstanceHistory.dag_id == dag_id,
125-
TaskInstanceHistory.run_id == dag_run_id,
126-
TaskInstanceHistory.map_index == map_index,
127-
TaskInstanceHistory.try_number == try_number,
123+
query = (
124+
select(TaskInstanceHistory)
125+
.where(
126+
TaskInstanceHistory.task_id == task_id,
127+
TaskInstanceHistory.dag_id == dag_id,
128+
TaskInstanceHistory.run_id == dag_run_id,
129+
TaskInstanceHistory.map_index == map_index,
130+
TaskInstanceHistory.try_number == try_number,
131+
)
132+
.options(joinedload(TaskInstanceHistory.dag_run))
133+
# we need to joinedload the dag_run, since FileTaskHandler._render_filename needs ti.dag_run
128134
)
129135
ti = session.scalar(query)
130136

@@ -137,21 +143,24 @@ def get_log(
137143
with contextlib.suppress(TaskNotFound):
138144
ti.task = dag.get_task(ti.task_id)
139145

140-
if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
141-
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
142-
encoded_token = None
146+
if accept == Mimetype.NDJSON: # only specified application/x-ndjson will return streaming response
147+
# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
148+
log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) # type: ignore[arg-type]
149+
headers = None
143150
if not metadata.get("end_of_log", False):
144-
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
145-
return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs)
146-
# text/plain, or something else we don't understand. Return raw log content
147-
148-
# We need to exhaust the iterator before we can generate the continuation token.
149-
# We could improve this by making it a streaming/async response, and by then setting the header using
150-
# HTTP Trailers
151-
logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
152-
headers = None
153-
if not metadata.get("end_of_log", False):
154-
headers = {
155-
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
156-
}
157-
return Response(media_type="application/x-ndjson", content=logs, headers=headers)
151+
headers = {
152+
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
153+
}
154+
return StreamingResponse(media_type="application/x-ndjson", content=log_stream, headers=headers)
155+
156+
# application/json, or something else we don't understand.
157+
# Return JSON format, which will be more easily for users to debug.
158+
159+
# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
160+
structured_log_stream, out_metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) # type: ignore[arg-type]
161+
encoded_token = None
162+
if not out_metadata.get("end_of_log", False):
163+
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(out_metadata)
164+
return TaskInstancesLogResponse.model_construct(
165+
continuation_token=encoded_token, content=list(structured_log_stream)
166+
)

0 commit comments

Comments
 (0)