Skip to content

Conversation

jason810496
Copy link
Member

related: #49470

What

Backport Resolve OOM When Reading Large Logs in Webserver #49470 to v3-0-test branch.

The only conflict when cherry-pick:

Merge Changes
└── airflow-core/src/airflow
    ├── api_fastapi/core_api/routes/public
    │   └── log.py                   [9+, !]
    ├── utils/log
    │   ├── file_task_handler.py     [4, !]
    │   └── log_reader.py            [!]
└── providers
    ├── amazon/tests/unit/amazon/aws/log
    │   └── test_cloudwatch_task_handler.py [!]
    ├── elasticsearch/src/airflow/providers/elasticsearch/log
    │   └── es_task_handler.py       [!]
    └── opensearch/src/airflow/providers/opensearch/log
        └── os_task_handler.py      [!]

@jason810496 jason810496 self-assigned this Jul 11, 2025
@jason810496 jason810496 added the full tests needed We need to run full set of tests for this PR to merge label Jul 11, 2025
@jason810496 jason810496 marked this pull request as ready for review July 11, 2025 09:57
@jason810496 jason810496 requested a review from potiuk July 11, 2025 09:58
@jason810496 jason810496 reopened this Jul 11, 2025
pierrejeambrun
pierrejeambrun previously approved these changes Jul 11, 2025
@pierrejeambrun pierrejeambrun dismissed their stale review July 11, 2025 21:35

Maybe we should wait for 3.0.3 release before merging it into the test branch. (Just in case there is an RC6)

@gopidesupavan
Copy link
Member

@jason810496 is backport something went wrong? could see many files

@kaxil kaxil force-pushed the backport-ee54fe9-v3-0-test branch from 67f2044 to f7692e0 Compare July 11, 2025 21:48
@kaxil
Copy link
Member

kaxil commented Jul 11, 2025

@jason810496 is backport something went wrong? could see many files

Fixed it

@jason810496
Copy link
Member Author

jason810496 commented Jul 12, 2025

@jason810496 is backport something went wrong? could see many files

Fixed it

Thanks @kaxil!
I just check this backport PR modify the same files by the following script.

REPO=apache/airflow
PR1=53167
PR2=49470

# Get the list of changed files from each PR
FILES_PR1=$(gh pr view "$PR1" --repo "$REPO" --json files --jq '.files[].path' | sort)
FILES_PR2=$(gh pr view "$PR2" --repo "$REPO" --json files --jq '.files[].path' | sort)

TMP1=$(mktemp)
TMP2=$(mktemp)

echo "$FILES_PR1" > "$TMP1"
echo "$FILES_PR2" > "$TMP2"

# Compare the two sorted lists
if cmp -s "$TMP1" "$TMP2"; then
    echo "✅ PR #$PR1 and PR #$PR2 modify the exact same set of files."
else
    echo "❌ PR #$PR1 and PR #$PR2 modify different sets of files."

    echo -e "\n📄 Files only in PR #$PR1:"
    comm -23 "$TMP1" "$TMP2"

    echo -e "\n📄 Files only in PR #$PR2:"
    comm -13 "$TMP1" "$TMP2"
fi

rm -f "$TMP1" "$TMP2"

I have also test the behavior using breeze k8s with the following setup, and all work well!

  1. kubernetes executor
  2. celery executor
  3. kubernetes executor + remote logging with Google Cloud Storage
  4. celery executor + remote logging with Google Cloud Storage

Update

but the setup with remote logging with Google Cloud Storage result in errors.
I just found the root cause and will raise another PR to fix it, mark this PR to draft to void mislead merging.

This is because the providers and helm chart are outdate when I run k8s build-k8s-image and k8s deploy-airflow on and switch to v3-0-test branch.
So I encounter this error for GCSRemoteLogIO which fixed in #50590 already and the extraVolumeMounts not work properly.

@jason810496 jason810496 marked this pull request as draft July 12, 2025 14:00
@jason810496 jason810496 marked this pull request as ready for review July 12, 2025 16:47
@jason810496
Copy link
Member Author

After configuring the remote logging setup correctly, all the behavior of getting task instance logs page work well.
Ready to merge if this CI run is green.

* Add note for new usage of LogMetadata

* Add _stream_parsed_lines_by_chunk

* Refactor _read_from_local/logs_server as return stream

* Refactor _interleave_logs with K-Way Merge

* Add _get_compatible_log_stream

* Refactor _read method to return stream with compatible interface

- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source

* Refactor log_reader to adapt stream

* Fix _read_from_local open closed file error

* Refactor LogReader by yielding in batch

* Add ndjson header to get_log openapi schema

* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs

* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp

* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
    - test_file_task_handler_when_ti_value_is_invalid
    - test_file_task_handler
    - test_file_task_handler_running
    - test_file_task_handler_rotate_size_limit
    - test__read_when_local
    - test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
    - test_interleave_interleaves
    - test_interleave_logs_correct_ordering
    - test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
    - test__stream_lines_by_chunk
    - test__log_stream_to_parsed_log_stream
    - test__sort_key
    - test__is_sort_key_with_default_timestamp
    - test__is_logs_stream_like
    - test__add_log_from_parsed_log_streams_to_heap

* Move test_log_handlers utils to test_common

* Fix unit/celery/log_handlers test

* Fix mypy-providers static check

* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream

* Fix amazon task_handler test

* Fix wask task handler test

* Fix elasticsearch task handler test

* Fix opensearch task handler test

* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer

* Fix test_log_reader

* Fix CloudWatchRemoteLogIO.read mypy

* Fix test_gcs_task_handler

* Fix core_api test_log

* Fix CloudWatchRemoteLogIO._event_to_str dt format

* Fix TestCloudRemoteLogIO.test_log_message

* Fix es/os task_hander convert_list_to_stream

* Fix compact tests

* Refactor es,os task handler for 3.0 compact

* Fix compat for RedisTaskHandler

* Fix ruff format for test_cloudwatch_task_handler after rebase

* Fix 2.10 compat TestCloudwatchTaskHandler

* Fix 3.0 compat test for celery, wasb

Fix wasb test, spelling

* Fix 3.0 compat test for gcs

* Fix 3.0 compat test for cloudwatch, s3

* Set get_log API default response format to JSON

* Remove "first_time_read" key in log metadata

* Remove "<source>_log_pos" key in log metadata

* Add LogStreamCounter for backward compatibility

* Remove "first_time_read" with backward "log_pos" for tests

- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos

* Fix RedisTaskHandler compatibility

* Fix chores in self review

- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
    - Fix doc string by removing outdate note
    - Only add buffer for full_download
- Add test ndjson format for get_log API

* Fine-tune HEAP_DUMP_SIZE

* Replace get_compatible_output_log_stream with iter

* Remove buffer in log_reader

* Fix log_id not found compact for es_task_handler

* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator

* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk

* Fix type hint, joinedload for ti.dag_run after merge

* Replace _sort_key as _create_sort_key

* Add _flush_logs_out_of_heap common util

* Fix review nits

 - _is_logs_stream_like
    - add type annotation
    - reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
    - refactor flush logic
    - make totoal_lines as property
    - make stream as property

* Fix mypy errors after merge

* Fix redis task handler test

* Refactor _capture logic in LogStreamAccumulator

* Add comments for ingore LogMetadata TypeDict

* Add comment for offset; Fix commet for LogMessages

* Refactor with from_iterable, islice

* Fix nits in test

- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap

* Refactor test_utils

* Add comment for lazy initialization

* Fix error handling for _stream_lines_by_chunk

* Fix mypy error after merge

* Fix final review nits

* Fix mypy error

(cherry picked from commit ee54fe9)
@jason810496 jason810496 force-pushed the backport-ee54fe9-v3-0-test branch from f7692e0 to f234d4f Compare July 14, 2025 02:18
@jason810496 jason810496 merged commit 571ea4f into apache:v3-0-test Jul 14, 2025
74 checks passed
@potiuk
Copy link
Member

potiuk commented Jul 14, 2025

cool!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:logging area:providers full tests needed We need to run full set of tests for this PR to merge provider:amazon AWS/Amazon - related issues provider:celery provider:elasticsearch provider:google Google (including GCP) related issues provider:microsoft-azure Azure-related issues provider:opensearch provider:redis
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants