Skip to content

Commit ee54fe9

Browse files
authored
Resolve OOM When Reading Large Logs in Webserver (apache#49470)
* Add note for new usage of LogMetadata * Add _stream_parsed_lines_by_chunk * Refactor _read_from_local/logs_server as return stream * Refactor _interleave_logs with K-Way Merge * Add _get_compatible_log_stream * Refactor _read method to return stream with compatible interface - Add compatible interface for executor, remote_logs - Refactor skip log_pos with skip for each log source * Refactor log_reader to adapt stream * Fix _read_from_local open closed file error * Refactor LogReader by yielding in batch * Add ndjson header to get_log openapi schema * Fix _add_log_from_parsed_log_streams_to_heap - Add comparator for StructuredLogMessage - Refactor parsed_log_streams from list to dict for removing empty logs * Fix _interleave_logs dedupe logic - should check the current logs with default timestamp * Refactor test_log_handlers - Fix events utils - Add convert_list_to_stream, mock_parsed_logs_factory utils - Fix the following test after refactoring FileTaskHandler - test_file_task_handler_when_ti_value_is_invalid - test_file_task_handler - test_file_task_handler_running - test_file_task_handler_rotate_size_limit - test__read_when_local - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs - test_interleave_interleaves - test_interleave_logs_correct_ordering - test_interleave_logs_correct_dedupe - Add new test for refactoring FileTaskHandler - test__stream_lines_by_chunk - test__log_stream_to_parsed_log_stream - test__sort_key - test__is_sort_key_with_default_timestamp - test__is_logs_stream_like - test__add_log_from_parsed_log_streams_to_heap * Move test_log_handlers utils to test_common * Fix unit/celery/log_handlers test * Fix mypy-providers static check * Fix _get_compatible_log_stream - sequential yield instead of parallel yield for all log_stream * Fix amazon task_handler test * Fix wask task handler test * Fix elasticsearch task handler test * Fix opensearch task handler test * Fix TaskLogReader buffer - don't concat buffer with empty str, yield directly from buffer * Fix test_log_reader * Fix CloudWatchRemoteLogIO.read mypy * Fix test_gcs_task_handler * Fix core_api test_log * Fix CloudWatchRemoteLogIO._event_to_str dt format * Fix TestCloudRemoteLogIO.test_log_message * Fix es/os task_hander convert_list_to_stream * Fix compact tests * Refactor es,os task handler for 3.0 compact * Fix compat for RedisTaskHandler * Fix ruff format for test_cloudwatch_task_handler after rebase * Fix 2.10 compat TestCloudwatchTaskHandler * Fix 3.0 compat test for celery, wasb Fix wasb test, spelling * Fix 3.0 compat test for gcs * Fix 3.0 compat test for cloudwatch, s3 * Set get_log API default response format to JSON * Remove "first_time_read" key in log metadata * Remove "<source>_log_pos" key in log metadata * Add LogStreamCounter for backward compatibility * Remove "first_time_read" with backward "log_pos" for tests - test_log_reader - test_log_handlers - test_cloudwatch_task_handler - test_s3_task_handler - celery test_log_handler - test_gcs_task_handler - test_wasb_task_handler - fix redis_task_handler - fix log_pos * Fix RedisTaskHandler compatibility * Fix chores in self review - Fix typo in _read_from_logs_server - Remove unused parameters in _stream_lines_by_chunk - read_log_stream - Fix doc string by removing outdate note - Only add buffer for full_download - Add test ndjson format for get_log API * Fine-tune HEAP_DUMP_SIZE * Replace get_compatible_output_log_stream with iter * Remove buffer in log_reader * Fix log_id not found compact for es_task_handler * Fix review comments - rename LogStreamCounter as LogStreamAccumulator - simply for-yield with yield-from in log_reader - add type annotation for LogStreamAccumulator * Refactor LogStreamAccumulator._capture method - use itertools.isslice to get chunk * Fix type hint, joinedload for ti.dag_run after merge * Replace _sort_key as _create_sort_key * Add _flush_logs_out_of_heap common util * Fix review nits - _is_logs_stream_like - add type annotation - reduce to 1 isinstance call - construct log_streams in _get_compatible_log_stream inline - use TypeDict for LogMetadata - remove len(logs) to check empty - revert typo of self.log_handler.read in log_reader - log_stream_accumulator - refactor flush logic - make totoal_lines as property - make stream as property * Fix mypy errors after merge * Fix redis task handler test * Refactor _capture logic in LogStreamAccumulator * Add comments for ingore LogMetadata TypeDict * Add comment for offset; Fix commet for LogMessages * Refactor with from_iterable, islice * Fix nits in test - refactor structured_logs fixtures in TestLogStreamAccumulator - use f-strign in test_file_task_handler - assert actual value of _create_sort_key - add details comments in test__add_log_from_parsed_log_streams_to_heap * Refactor test_utils * Add comment for lazy initialization * Fix error handling for _stream_lines_by_chunk * Fix mypy error after merge * Fix final review nits * Fix mypy error
1 parent e142ab9 commit ee54fe9

File tree

22 files changed

+1439
-273
lines changed

22 files changed

+1439
-273
lines changed

airflow-core/src/airflow/api_fastapi/core_api/routes/public/log.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
import contextlib
2121
import textwrap
2222

23-
from fastapi import Depends, HTTPException, Request, Response, status
23+
from fastapi import Depends, HTTPException, Request, status
24+
from fastapi.responses import StreamingResponse
2425
from itsdangerous import BadSignature, URLSafeSerializer
2526
from pydantic import NonNegativeInt, PositiveInt
2627
from sqlalchemy.orm import joinedload
@@ -120,12 +121,17 @@ def get_log(
120121
)
121122
ti = session.scalar(query)
122123
if ti is None:
123-
query = select(TaskInstanceHistory).where(
124-
TaskInstanceHistory.task_id == task_id,
125-
TaskInstanceHistory.dag_id == dag_id,
126-
TaskInstanceHistory.run_id == dag_run_id,
127-
TaskInstanceHistory.map_index == map_index,
128-
TaskInstanceHistory.try_number == try_number,
124+
query = (
125+
select(TaskInstanceHistory)
126+
.where(
127+
TaskInstanceHistory.task_id == task_id,
128+
TaskInstanceHistory.dag_id == dag_id,
129+
TaskInstanceHistory.run_id == dag_run_id,
130+
TaskInstanceHistory.map_index == map_index,
131+
TaskInstanceHistory.try_number == try_number,
132+
)
133+
.options(joinedload(TaskInstanceHistory.dag_run))
134+
# we need to joinedload the dag_run, since FileTaskHandler._render_filename needs ti.dag_run
129135
)
130136
ti = session.scalar(query)
131137

@@ -138,24 +144,27 @@ def get_log(
138144
with contextlib.suppress(TaskNotFound):
139145
ti.task = dag.get_task(ti.task_id)
140146

141-
if accept == Mimetype.JSON or accept == Mimetype.ANY: # default
142-
logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
143-
encoded_token = None
147+
if accept == Mimetype.NDJSON: # only specified application/x-ndjson will return streaming response
148+
# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
149+
log_stream = task_log_reader.read_log_stream(ti, try_number, metadata) # type: ignore[arg-type]
150+
headers = None
144151
if not metadata.get("end_of_log", False):
145-
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
146-
return TaskInstancesLogResponse.model_construct(continuation_token=encoded_token, content=logs)
147-
# text/plain, or something else we don't understand. Return raw log content
148-
149-
# We need to exhaust the iterator before we can generate the continuation token.
150-
# We could improve this by making it a streaming/async response, and by then setting the header using
151-
# HTTP Trailers
152-
logs = "".join(task_log_reader.read_log_stream(ti, try_number, metadata))
153-
headers = None
154-
if not metadata.get("end_of_log", False):
155-
headers = {
156-
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
157-
}
158-
return Response(media_type="application/x-ndjson", content=logs, headers=headers)
152+
headers = {
153+
"Airflow-Continuation-Token": URLSafeSerializer(request.app.state.secret_key).dumps(metadata)
154+
}
155+
return StreamingResponse(media_type="application/x-ndjson", content=log_stream, headers=headers)
156+
157+
# application/json, or something else we don't understand.
158+
# Return JSON format, which will be more easily for users to debug.
159+
160+
# LogMetadata(TypedDict) is used as type annotation for log_reader; added ignore to suppress mypy error
161+
structured_log_stream, out_metadata = task_log_reader.read_log_chunks(ti, try_number, metadata) # type: ignore[arg-type]
162+
encoded_token = None
163+
if not out_metadata.get("end_of_log", False):
164+
encoded_token = URLSafeSerializer(request.app.state.secret_key).dumps(out_metadata)
165+
return TaskInstancesLogResponse.model_construct(
166+
continuation_token=encoded_token, content=list(structured_log_stream)
167+
)
159168

160169

161170
@task_instances_log_router.get(

0 commit comments

Comments
 (0)