Skip to content

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jun 16, 2025

closes: #50423

Why?

Airflow 2 had support for user impersonation: https://airflow.apache.org/docs/apache-airflow/stable/security/workload.html. Quoting from docs:

Airflow has the ability to impersonate a unix user while running task instances based on the task’s run_as_user parameter, which takes a user’s name.

The intention here is to de-elevate the user running the task to reduce priviliges from the process / worker launching the process which runs as root. We can configure the task to impersonate as an user with lesser priviliges and control the behaviour of the tasks running for a more secure task run.

Quoting one of the use case from one of the airflow users too:

I suspect there might be something wrong with user impersonation (run_as_user) in 3.0.0+ with docker (tested on 3.0.1rc1). My setup uses an overlay filesystem so I can fake-interact with a shared network drive. When I connect to the docker cli, I can do sudo -u data_user -i and create/delete files without problem. However, the celery worker encounters a permission error when running a task that writes to the network drive as that very same data_user. Does anyone have any idea what might be the reason? Anything I can try to pinpoint the issue? In 2.10.5 my setup worked fine. The difference in configurations comes down to changes to the "official" docker-compose from 2.x to 3.x (on top of which my own additions, which didn't change, are included).

https://apache-airflow.slack.com/archives/CCQB40SQJ/p1746728794387939

Approach

We implement user impersonation by re-executing the task process with sudo as the specified user (run_as_user). The key is preserving the communication channel between the task and supervisor across this re-execution.

To do this we use the os.execvp which basically swaps out the current process with a new one. We run the task runner process indeed again but with the specified user.

Process Flow

  1. Initial process starts as Airflow user as usual
  2. If run_as_user is set, we:
    • Store startup details in environment
    • Make socket inheritable so the communication between supervisor and this rexec'd process can go on.
    • Re-execute with sudo as target user which is specified
  3. Re-executed process:
    • Detects it's re-executed via environment variable
    • Retrieves startup details from environment
    • Continues task execution as target user

Things to note

Startup Details Storage

  1. Store the startup details in the env variable so that the rexec process can pick it up without waiting for StartupDetails from the supervisor -- which wont come anyway. This also prevents sending duplicate /run call to the API server which makes the operation idempotent.
os.environ["_AIRFLOW__REEXECUTED_PROCESS"] = "1"
os.environ["_AIRFLOW__STARTUP_MSG"] = msg.model_dump_json()

This ensures the re-executed process has all necessary context without needing to re-parse the DAG or re-establish connections.

Socket Inheritance

os.set_inheritable(SUPERVISOR_COMMS.request_socket.fileno(), True)

This is imp because:

  • The socket is created by the supervisor
  • It needs to be accessible to the re-executed process
  • File descriptors are preserved across exec

Why os.execvp?

We use os.execvp instead of subprocess or os.fork because:

  1. It replaces the current process with the new one
  2. Preserves file descriptors by default which we created in the supervisor and do not want to rework / disturb those connections.
  3. Maintains the same process id, which is important for supervisor tracking
  4. Allows passing environment variables with -E flag and -H too.

Communication Channel

Introduced a helper to access the SUPERVISOR_COMMS via an utility function. This is because when we re-exec, the other modules were failing to be able to access the SUPERVISOR_COMMS defined at module level in task_runner (i am not so sure of the exact reason, but seemed like init sequence or its a python thing), but this approach does no harm. The new way to access things via SUPERVISOR_COMMS is to use the get_supervisor_comms helper which returns existing instance if available or returns it by initing.

Made a change to The communication channel is initialized lazily via get_supervisor_comms():

def get_supervisor_comms() -> CommsDecoder[ToTask, ToSupervisor]:
    global SUPERVISOR_COMMS
    if SUPERVISOR_COMMS is None:
        SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](input=sys.stdin)
    return SUPERVISOR_COMMS

This ensures:

  • Single instance of communication channel
  • Proper initialization in both initial and re-executed processes

Testing

Intention is to run airflow as "root" and switch to a lesser privileged user: "airflowuser". We will try and use a user that cannot list some files like /root/airflow/airflow.cfg intentionally.

Setup for testing

  1. Run airflow with celery executor
  2. Create a "airflowuser": sudo useradd -m -s /bin/bash airflowuser
  3. Switch to "airflowuser" and ensure that the privilieges are lesser, for eg:
root@1b92a329d570:/opt/airflow# sudo -u airflowuser -i
airflowuser@1b92a329d570:~$ namei -l /root/airflow/airflow.cfg
f: /root/airflow/airflow.cfg
drwxr-xr-x root root /
drwx------ root root root
                     airflow - Permission denied
  1. Run celery worker with root now:
root@1b92a329d570:/opt/airflow# airflow celery worker
2025-05-29 07:41:56.804540 [info     ] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command]
[2025-05-29 07:41:56 +0000] [418] [INFO] Starting gunicorn 23.0.0
[2025-05-29 07:41:56 +0000] [418] [INFO] Listening at: http://[::]:8793 (418)
[2025-05-29 07:41:56 +0000] [418] [INFO] Using worker: sync
[2025-05-29 07:41:56 +0000] [420] [INFO] Booting worker with pid: 420
[2025-05-29 07:41:56 +0000] [421] [INFO] Booting worker with pid: 421
2025-05-29 07:41:58.159935 [warning  ] You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0
 [py.warnings] category=SecurityWarning filename=/usr/local/lib/python3.9/site-packages/celery/platforms.py lineno=84

General Testing:

Test 1: Running a simple task that runs with airflowuser and pushes an xcom:

DAG:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    # check_access = BashOperator(
    #     task_id="ls_root_cfg",
    #     bash_command="ls -l /root/airflow/airflow.cfg",
    #     run_as_user="airflowuser"
    # )

    check_access = BashOperator(
        task_id="say_hi",
        bash_command="echo $HOME",
        run_as_user="airflowuser"
    )

image

Test 2: Running a simple task that tries to get a connection and set a variable with runasuser

DAG:

from __future__ import annotations

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.sdk.definitions.connection import Connection


class CustomOperator(BaseOperator):
    def execute(self, context):
        gc = Connection.get("athena_default")
        print("The conn is", gc)
        # print("Conn uri is", gc.get_uri())


with DAG("example_get_connection_from_defn", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="print_conn", run_as_user="airflowuser")

image

Testing run_as_user functionality

Test 1: run_as_user trying to access /root/airflow/airflow.cfg

DAG Used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
    )

Errors out, logs:
image

Test 2: Do not provide run_as_user but override with the conf instead: "airflowuser" itself

Set env in worker:

export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"

DAG Used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
    )

Same error as before:

image

Test 3: Provide run_as_user and in conf, to check which one is picked up

In worker, create new user: randomuser and set env to "airflowuser"

sudo useradd -m -s /bin/bash randomuser
export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"

DAG used:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
        run_as_user="randomuser"
    )

Random user picked up:
image

Test 4: User not present

DAG:

from airflow import DAG
from datetime import datetime

from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id="check_cfg_file_access",
    schedule=None,
    catchup=False,
) as dag:

    check_access = BashOperator(
        task_id="ls_root_cfg",
        bash_command="ls -l /root/airflow/airflow.cfg",
        run_as_user="not_a_user"
    )

image

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

ashb and others added 5 commits June 13, 2025 17:30
The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
@amoghrajesh amoghrajesh force-pushed the user-impersonation-reworked branch from b09744b to 595b550 Compare June 16, 2025 12:26
ashb and others added 6 commits June 16, 2025 14:25
The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
The existing JSON Lines based approach had two major drawbacks

1. In the case of really large lines (in the region of 10 or 20MB) the python
   line buffering could _sometimes_ result in a partial read
2. The JSON based approach didn't have the ability to add any metadata (such
   as errors).
3. Not every message type/call-site waited for a response, which meant those
   client functions could never get told about an error

One of the ways this line-based approach fell down was if you suddenly tried
to run 100s of triggers at the same time you would get an error like this:

```
Traceback (most recent call last):
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 568, in readline
    line = await self.readuntil(sep)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ash/.local/share/uv/python/cpython-3.12.7-macos-aarch64-none/lib/python3.12/asyncio/streams.py", line 663, in readuntil
    raise exceptions.LimitOverrunError(
asyncio.exceptions.LimitOverrunError: Separator is found, but chunk is longer than limit
```

The other way this caused problems was if you parse a large dag (as in one
with 20k tasks or more) the DagFileProcessor could end up getting a partial
read which would be invalid JSON.

This changes the communications protocol in in a couple of ways.

First off at the python level the separate send and receive methods in the
client/task side have been removed and replaced with a single `send()` that
sends the request, reads the response and raises an error if one is returned.
(But note, right now almost nothing in the supervisor side sets the error,
that will be a future PR.)

Secondly the JSON Lines approach has been changed from a line-based protocol
to a binary "frame" one. The protocol (which is the same for whichever side is
sending) is length-prefixed, i.e. we first send the length of the data as a
4byte big-endian integer, followed by the data itself. This should remove the
possibility of JSON parse errors due to reading incomplete lines

Finally the last change made in this PR is to remove the "extra" requests
socket/channel. Upon closer examination with this comms path I realised that
this socket is unnecessary: Since we are in 100% control of the client side we
can make use of the bi-directional nature of `socketpair` and save file
handles. This also happens to help the `run_as_user` feature which is
currently broken, as without extra config to `sudoers` file, `sudo` will close
all filehandles other than stdin, stdout, and stderr -- so by introducing this
change we make it easier to re-add run_as_user support.

In order to support this in the DagFileProcessor (as the fact that the proc
manager uses a single selector for multiple processes) means I have moved the
`on_close` callback to be part of the object we store in the `selector` object
in the supervisors, previoulsy it was the "on_read" callback, now we store a
tuple of `(on_read, on_close)` and on_close is called once universally.

This also changes the way comms are handled from the (async) TriggerRunner
process. Previously we had a sync+async lock, but that made it possible to end
up deadlocking things. The change now is to have `send` on
`TriggerCommsDecoder` "go back" to the async even loop via `async_to_sync`, so
that only async code deals with the socket, and we can use an async lock
(rather than the hybrid sync and async lock we tried before). This seems to
help the deadlock issue, but I'm not 100% sure it will remove it entirely, but
it makes it much much harder to hit - I've not been able to reprouce it with
this change
This compat issue is only in tests, as nothing in the runtime of airflow-core
imports/calls methods directly on SUPERVISOR_COMMS, we are only importing it
in tests to mkae assertions about the behavour/to stub the return values.
@amoghrajesh amoghrajesh force-pushed the user-impersonation-reworked branch from 952661c to a1c0dd2 Compare June 17, 2025 07:15
@amoghrajesh amoghrajesh self-assigned this Jun 17, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review June 17, 2025 15:11
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amoghrajesh amoghrajesh merged commit c8343d9 into apache:main Jun 18, 2025
72 checks passed
@amoghrajesh amoghrajesh deleted the user-impersonation-reworked branch June 18, 2025 17:04
RoyLee1224 pushed a commit to RoyLee1224/airflow that referenced this pull request Jun 21, 2025
@kaxil kaxil modified the milestones: Airflow 3.1.0, Airflow 3.0.3 Jul 2, 2025
kaxil pushed a commit that referenced this pull request Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle run_as_user / impersonation in the Supervisor
3 participants