Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ title: Asynchronous Workers
sidebar_order: 70
---

<Alert level="warning" title="Deprecated">
Celery workers are deprecated, and will be removed as of the 2025.10 self-hosted
release.
</Alert>

Sentry comes with a built-in queue to process tasks in a more asynchronous fashion. For example when an event comes in instead of writing it to the database immediately, it sends a job to the queue so that the request can be returned right away, and the background workers handle actually saving that data.

Sentry relies on the [Celery](https://docs.celeryproject.org/) library for managing workers.
Expand Down Expand Up @@ -75,7 +80,7 @@ There are a few important points:
because this is what registers the task by name. Thus every module
containing a task must be added to the `CELERY_IMPORTS` and `TASKWORKER_IMPORTS` settings in
`src/sentry/conf/server.py`.

We have a separate setting for the Taskbroker workers until we fully deprecate the Celery workers.

## Running a Worker
Expand Down
272 changes: 272 additions & 0 deletions develop-docs/backend/application-domains/tasks/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
---
title: Asynchronous Tasks
sidebar_order: 70
---

Sentry includes a framework for scheduling and executing tasks that are executed
asynchronously in a background worker process. The task framework within Sentry
is inspired by [Celery](https://docs.celeryproject.org) as it was originally
built with celery.

# Defining Tasks

Sentry tasks are configured with the `instrumented_task` decorator that
includes features like automatic tracing and metric collection, and multi-region
silo enforcement.

```python
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import issues_tasks
from sentry.taskworker.retry import Retry

@instrumented_task(
name="sentry.widgets.tasks.do_work",
namespace=issues_tasks,
retry=Retry(times=3, on=(ConnectionError,))
processing_deadline_duration=60
)
def do_work(organization_id: int, issue_id: int, **kwargs) -> None:
...
```

When defining tasks there are some constraints:

- All tasks _must_ have names. The name of task is serialized into an
'activation' message that is persisted in Kafka. Task names must be stable
between deployments to avoid lost tasks.

- Task parameters _must_ be JSON serializable. You cannot pass arbitrary python
objects through task parameters.

- Tasks _must_ define a 'processing deadline'. After a task's processing
deadline has elapsed, it will be killed by the worker runtime. Tasks that are
killed for execution duration are not automatically retried.

- All tasks must be assigned to a 'namespace'. A namespace is a group of related
tasks that are operated together and share a backlog.

- The return value of a task is not stored and ignored by workers.

- The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in
`src/sentry/conf/server.py`

## Scheduling Tasks

With our task defined we can schedule a task (also called an "activation"):

```python
from sentry.widgets.tasks import do_work

# Can call the task synchronously like a normal function
do_work(organization_id=org.id, issue_id=issue.id)

# Use .delay() to schedule a task to run in the future as soon as possible
do_work.delay(organization_id=org.id, issue_id=issue.id)

# Use .apply_async() when you need to define headers, countdown, or expires
# for your task. Here we schedule a task to run in 5 minutes (300 seconds)
do_work.apply_async(
kwargs={"organization_id": org.id, "issue_id": issue.id},
countdown=300
)
```

When tasks are executed, the parameter payload is deserialized, and the task
function is called. Tasks are successfully completed if they don't raise an
error. If an error is raised from a task, or the task's deadline expires, the
task is considered a failure and needs to be retried, put into a dead-letter
queue or dropped depending on the task and failure.

## Retries

When defining tasks, you can define a retry policy with the `retry` parameter.
When a worker executes an activation with a retry policy, any non-successful
outcome will result in the retry policy being evaluated. If the task has retries
remaining, and the captured error is a retriable error, the worker sends
a status of retry to the worker's broke. The taskbroker will take care of marking
the current activation as complete and producing a new activation to be
processed later.

If a task does not define a retry policy the retry policy of the task namespace
is inherited.

```python
@instrumented_task(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
retry=Retry(times=3, times_exceeded=LastAction.Deadletter),
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...
```

### Conditional Retries

Retries can be conditional based on the exception type:

```python
retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard)
```

### Retry delays

By default retries will be executed as soon as they are consumed. If a task
needs to stagger retries, it can use a delayed retry.

```python
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
retry=Retry(times=3, on=(IntegrationError, ), delay=30)
)
def fetch_commits(repository_id: int) -> None:
...
```

With the above configuration, each retry will be processed at least 30 seconds
after the previous attempt. The delay between retries could be longer than 30
seconds, but won’t be shorter.

## Processing Deadlines

Every task has a ‘processing deadline’ which is the maximum expected runtime for a task. The default duration is **10 seconds**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This raises a question for self-hosted: How do we increase the processing deadline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processing deadlines are defined in code. In general processing deadlines are fairly generous as we don't want tasks timing out regularly. They are mostly intended as a backstop to changes that dramatically change the runtime of a task where it would consume vastly more worker resources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I think it's better if you put your answer into the docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to the docs.


```python
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
# Extended from the default 10
processing_deadline_duration=60
)
def fetch_commits(repository_id: int) -> None:
...
```

After this a task has run for the length of its processing deadline, it will be interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which will interrupt your task’s logic.

### Resolving deadline issues

In most scenarios the simplest solution is to extend the deadline for a task. This is the recommended solution until you get above 20min of duration. After this duration the chances of your task being terminated by a deploy increase quickly. Instead of extending the deadline further, you should rethink your logic and partition the workload into smaller batches, or individual jobs that can be processed independently. Instead of mapping all projects in a single task, spawn multiple tasks.

## Expiration deadlines

A task's expiration time defines a point in time after which a task is
considered expired and should not be executed. This mechanism allows tasks to be
skipped if they are stale and their results are no longer relevant.

```python
@task_app.register(
name="sentry.issues.tasks.deliver_issue_webhook",
expires=timedelta("5 minutes"),
)
def deliver_issue_webhook(organization_id: int, group_id: int):
...
```

Expiration times can be expressed as `timedelta` objects or a number of seconds.
Tasks that are past their expiration will not be sent to workers. Instead they
will be discarded or dead-lettered depending on task configuration.

## Future schedules

Tasks can be scheduled to be run up to an hour in the future with the
`countdown` parameter.

```jsx
deliver_issue_webhook.apply_async(countdown=timedelta(minutes=10))
```

Countdown tasks will be processed and retained by taskbroker until their
countdown has elapsed. Once the countdown delay has elapsed the task will be
made available for workers.

## Idempotency (at_most_once)

Tasks are processed with at-least-once guarantees. A task may be attempted
multiple times if processing deadlines are exceeded. To prevent multiple
executions, tasks can enable `at_most_once` which enables at-most-once
execution.

```python
@task_app.register(
name="sentry.issues.tasks.deliver_issue_webhook",
at_most_once=True,
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...

```

If an idempotent task exceeds a processing deadline, it will *not* be retried.

# Testing Tasks

Tasks can be tested with a few different approaches. The first is with the
`self.tasks()` or `TaskRunner` context manager. When these context managers are
entered, tasks will be executed *synchronously* which allows you to validate the
side-effects of your tasks and validate that parameters to your task are JSON
compatible:

```python
def test_action_with_tasks(self):
with self.tasks():
self.client.get("/organizations/slug/do-thing/")
# can make assertions on side-effects of tasks spawned by the endpoint.
```

Tasks can also be tested with `mock.patch` :

```python
@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox")
def test_schedule_task(self, mock_deliver: MagicMock) -> None:
# Do work to trigger the task
# Assert that the task was scheduled
mock_deliver.delay.assert_called_with(webhook_one.id)
```

<Alert type="warning">
Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches.
</Alert>

# Task namespaces

Task namespaces are created as code, and configuration are linked to the
namespace when it is declared.

```python
# in sentry.taskworker.namespaces
from sentry.taskworker.config import taskregistry
from sentry.taskworker.retry import LastAction, Retry

task_app = taskregistry.create_namespace(
"issues",
retry=Retry(times=3, times_exceeded=LastAction.Discard)
)

# register tasks within a namespace
@instrumented_task(name="tasks.do_work", namespace=task_app)
def do_work(**kwargs):
...
```

Namespaces can define default behaviour for `retry` , `processing_deadline` and
`expires` for the tasks they contain. Without explicit routing, any namespace
will be run in our `default` worker pools. If your task namespace will be
high-throughput (more than 1500 tasks per second) consider provisioning
a dedicated pool for your tasks.

# Periodically Scheduled Tasks

Task can also be set to spawn on a periodic schedule. To accomplish this, simply
update the `TASKWORKER_SCHEDULE` configuration found in
`src/sentry/conf/server.py` with the appropriate namespace, task, and schedule.
Taskworker supports `timedelta` and `crontab` schedule types:

```python
TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = {
"send-beacon": {
"task": "selfhosted:sentry.tasks.send_beacon",
"schedule": task_crontab("0", "*/1", "*", "*", "*"),
},
}
```
Loading
Loading