|
| 1 | +--- |
| 2 | +title: Asynchronous Tasks |
| 3 | +sidebar_order: 70 |
| 4 | +--- |
| 5 | + |
| 6 | +Sentry includes a framework for scheduling and executing tasks that are executed |
| 7 | +asynchronously in a background worker process. The task framework within Sentry |
| 8 | +is inspired by [Celery](https://docs.celeryproject.org) as it was originally |
| 9 | +built with celery. |
| 10 | + |
| 11 | +# Defining Tasks |
| 12 | + |
| 13 | +Sentry tasks are configured with the `instrumented_task` decorator that |
| 14 | +includes features like automatic tracing and metric collection, and multi-region |
| 15 | +silo enforcement. |
| 16 | + |
| 17 | +```python |
| 18 | +from sentry.tasks.base import instrumented_task |
| 19 | +from sentry.taskworker.namespaces import issues_tasks |
| 20 | +from sentry.taskworker.retry import Retry |
| 21 | + |
| 22 | +@instrumented_task( |
| 23 | + name="sentry.widgets.tasks.do_work", |
| 24 | + namespace=issues_tasks, |
| 25 | + retry=Retry(times=3, on=(ConnectionError,)), |
| 26 | + processing_deadline_duration=60 |
| 27 | +) |
| 28 | +def do_work(organization_id: int, issue_id: int, **kwargs) -> None: |
| 29 | + ... |
| 30 | +``` |
| 31 | + |
| 32 | +When defining tasks there are some constraints: |
| 33 | + |
| 34 | +- All tasks _must_ have names. The name of task is serialized into an |
| 35 | + 'activation' message that is persisted in Kafka. Task names must be stable |
| 36 | + between deployments to avoid lost tasks. |
| 37 | + |
| 38 | +- Task parameters _must_ be JSON serializable. You cannot pass arbitrary python |
| 39 | + objects through task parameters. |
| 40 | + |
| 41 | +- Tasks _must_ define a 'processing deadline'. After a task's processing |
| 42 | + deadline has elapsed, it will be killed by the worker runtime. Tasks that are |
| 43 | + killed for execution duration are not automatically retried. |
| 44 | + |
| 45 | +- All tasks must be assigned to a 'namespace'. A namespace is a group of related |
| 46 | + tasks that are operated together and share a backlog. |
| 47 | + |
| 48 | +- The return value of a task is not stored and ignored by workers. |
| 49 | + |
| 50 | +- The module containing a task _must_ be added to `TASKWORKER_IMPORTS` in |
| 51 | + `src/sentry/conf/server.py` |
| 52 | + |
| 53 | +## Scheduling Tasks |
| 54 | + |
| 55 | +With our task defined we can schedule a task (also called an "activation"): |
| 56 | + |
| 57 | +```python |
| 58 | +from sentry.widgets.tasks import do_work |
| 59 | + |
| 60 | +# Can call the task synchronously like a normal function |
| 61 | +do_work(organization_id=org.id, issue_id=issue.id) |
| 62 | + |
| 63 | +# Use .delay() to schedule a task to run in the future as soon as possible |
| 64 | +do_work.delay(organization_id=org.id, issue_id=issue.id) |
| 65 | + |
| 66 | +# Use .apply_async() when you need to define headers, countdown, or expires |
| 67 | +# for your task. Here we schedule a task to run in 5 minutes (300 seconds) |
| 68 | +do_work.apply_async( |
| 69 | + kwargs={"organization_id": org.id, "issue_id": issue.id}, |
| 70 | + countdown=300 |
| 71 | +) |
| 72 | +``` |
| 73 | + |
| 74 | +When tasks are executed, the parameter payload is deserialized, and the task |
| 75 | +function is called. Tasks are successfully completed if they don't raise an |
| 76 | +error. If an error is raised from a task, or the task's deadline expires, the |
| 77 | +task is considered a failure and needs to be retried, put into a dead-letter |
| 78 | +queue or dropped depending on the task and failure. |
| 79 | + |
| 80 | +## Retries |
| 81 | + |
| 82 | +When defining tasks, you can define a retry policy with the `retry` parameter. |
| 83 | +When a worker executes an activation with a retry policy, any non-successful |
| 84 | +outcome will result in the retry policy being evaluated. If the task has retries |
| 85 | +remaining, and the captured error is a retriable error, the worker sends |
| 86 | +a status of retry to the worker's broker. The taskbroker will take care of marking |
| 87 | +the current activation as complete and producing a new activation to be |
| 88 | +processed later. |
| 89 | + |
| 90 | +If a task does not define a retry policy the retry policy of the task namespace |
| 91 | +is inherited. |
| 92 | + |
| 93 | +```python |
| 94 | +@instrumented_task( |
| 95 | + name="sentry.issues.tasks.deliver_issue_webhook", |
| 96 | + namespace=issues_tasks, |
| 97 | + retry=Retry(times=3, times_exceeded=LastAction.Deadletter), |
| 98 | +) |
| 99 | +def deliver_issue_webhook(organization_id: int, group_id: int) -> None: |
| 100 | + ... |
| 101 | +``` |
| 102 | + |
| 103 | +### Conditional Retries |
| 104 | + |
| 105 | +Retries can be conditional based on the exception type: |
| 106 | + |
| 107 | +```python |
| 108 | +retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard) |
| 109 | +``` |
| 110 | + |
| 111 | +### Retry delays |
| 112 | + |
| 113 | +By default retries will be executed as soon as they are consumed. If a task |
| 114 | +needs to stagger retries, it can use a delayed retry. |
| 115 | + |
| 116 | +```python |
| 117 | +@instrumented_task( |
| 118 | + name="sentry.integrations.fetch_commits", |
| 119 | + namespace=issues_tasks, |
| 120 | + retry=Retry(times=3, on=(IntegrationError, ), delay=30) |
| 121 | +) |
| 122 | +def fetch_commits(repository_id: int) -> None: |
| 123 | + ... |
| 124 | +``` |
| 125 | + |
| 126 | +With the above configuration, each retry will be processed at least 30 seconds |
| 127 | +after the previous attempt. The delay between retries could be longer than 30 |
| 128 | +seconds, but won’t be shorter. |
| 129 | + |
| 130 | +## Processing Deadlines |
| 131 | + |
| 132 | +Every task has a 'processing deadline' which is the maximum expected runtime for a task. If |
| 133 | +a task does not define a processing deadline, it will inherit the deadline |
| 134 | +defined on the task's namespace or use the default of **10 seconds**. Task |
| 135 | +deadlines are intended to be generous and are intended to prevent workers being |
| 136 | +saturated by tasks running for unbounded amounts of time. |
| 137 | + |
| 138 | +```python |
| 139 | +@instrumented_task( |
| 140 | + name="sentry.integrations.fetch_commits", |
| 141 | + namespace=issues_tasks, |
| 142 | + # Extended from the default 10 |
| 143 | + processing_deadline_duration=60 |
| 144 | +) |
| 145 | +def fetch_commits(repository_id: int) -> None: |
| 146 | + ... |
| 147 | +``` |
| 148 | + |
| 149 | +After this a task has run for the length of its processing deadline, it will be |
| 150 | +interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which |
| 151 | +will interrupt your task’s logic. |
| 152 | + |
| 153 | +### Resolving deadline issues |
| 154 | + |
| 155 | +In most scenarios the simplest solution is to extend the deadline for a task. |
| 156 | +This is the recommended solution until you get above 20min of duration. After |
| 157 | +this duration the chances of your task being terminated by a deploy increase |
| 158 | +quickly. Instead of extending the deadline further, you should rethink your |
| 159 | +logic and partition the workload into smaller batches, or individual jobs that |
| 160 | +can be processed independently. Instead of mapping all projects in a single |
| 161 | +task, spawn multiple tasks. |
| 162 | + |
| 163 | +## Expiration deadlines |
| 164 | + |
| 165 | +A task's expiration time defines a point in time after which a task is |
| 166 | +considered expired and should not be executed. This mechanism allows tasks to be |
| 167 | +skipped if they are stale and their results are no longer relevant. |
| 168 | + |
| 169 | +```python |
| 170 | +@instrumented_tasks( |
| 171 | + name="sentry.issues.tasks.deliver_issue_webhook", |
| 172 | + namespace=issues_tasks, |
| 173 | + expires=timedelta("5 minutes"), |
| 174 | +) |
| 175 | +def deliver_issue_webhook(organization_id: int, group_id: int): |
| 176 | + ... |
| 177 | +``` |
| 178 | + |
| 179 | +Expiration times can be expressed as `timedelta` objects or a number of seconds. |
| 180 | +Tasks that are past their expiration will not be sent to workers. Instead they |
| 181 | +will be discarded or dead-lettered depending on task configuration. |
| 182 | + |
| 183 | +## Future schedules |
| 184 | + |
| 185 | +Tasks can be scheduled to be run up to an hour in the future with the |
| 186 | +`countdown` parameter. |
| 187 | + |
| 188 | +```jsx |
| 189 | +deliver_issue_webhook.apply_async(countdown=timedelta(minutes=10)) |
| 190 | +``` |
| 191 | + |
| 192 | +Countdown tasks will be processed and retained by taskbroker until their |
| 193 | +countdown has elapsed. Once the countdown delay has elapsed the task will be |
| 194 | +made available for workers. |
| 195 | + |
| 196 | +## Idempotency (at_most_once) |
| 197 | + |
| 198 | +Tasks are processed with at-least-once guarantees. A task may be attempted |
| 199 | +multiple times if processing deadlines are exceeded. To prevent multiple |
| 200 | +executions, tasks can enable `at_most_once` which enables at-most-once |
| 201 | +execution. |
| 202 | + |
| 203 | +```python |
| 204 | +@instrumented_task( |
| 205 | + name="sentry.issues.tasks.deliver_issue_webhook", |
| 206 | + namespace=issues_tasks, |
| 207 | + at_most_once=True, |
| 208 | +) |
| 209 | +def deliver_issue_webhook(organization_id: int, group_id: int) -> None: |
| 210 | + ... |
| 211 | + |
| 212 | +``` |
| 213 | + |
| 214 | +If an idempotent task exceeds a processing deadline, it will *not* be retried. |
| 215 | + |
| 216 | +# Testing Tasks |
| 217 | + |
| 218 | +Tasks can be tested with a few different approaches. The first is with the |
| 219 | +`self.tasks()` or `TaskRunner` context manager. When these context managers are |
| 220 | +entered, tasks will be executed *synchronously* which allows you to validate the |
| 221 | +side-effects of your tasks and validate that parameters to your task are JSON |
| 222 | +compatible: |
| 223 | + |
| 224 | +```python |
| 225 | +def test_action_with_tasks(self): |
| 226 | + with self.tasks(): |
| 227 | + self.client.get("/organizations/slug/do-thing/") |
| 228 | + # can make assertions on side-effects of tasks spawned by the endpoint. |
| 229 | +``` |
| 230 | + |
| 231 | +Tasks can also be tested with `mock.patch` : |
| 232 | + |
| 233 | +```python |
| 234 | +@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox") |
| 235 | +def test_schedule_task(self, mock_deliver: MagicMock) -> None: |
| 236 | + # Do work to trigger the task |
| 237 | + # Assert that the task was scheduled |
| 238 | + mock_deliver.delay.assert_called_with(webhook_one.id) |
| 239 | +``` |
| 240 | + |
| 241 | +<Alert type="warning"> |
| 242 | +Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches. |
| 243 | +</Alert> |
| 244 | + |
| 245 | +# Task namespaces |
| 246 | + |
| 247 | +Task namespaces are created as code, and configuration are linked to the |
| 248 | +namespace when it is declared. |
| 249 | + |
| 250 | +```python |
| 251 | +# in sentry.taskworker.namespaces |
| 252 | +from sentry.taskworker.config import taskregistry |
| 253 | +from sentry.taskworker.retry import LastAction, Retry |
| 254 | + |
| 255 | +issues_tasks = taskregistry.create_namespace( |
| 256 | + "issues", |
| 257 | + retry=Retry(times=3, times_exceeded=LastAction.Discard) |
| 258 | +) |
| 259 | + |
| 260 | +# register tasks within a namespace |
| 261 | +@instrumented_task(name="tasks.do_work", namespace=issues_tasks) |
| 262 | +def do_work(**kwargs): |
| 263 | + ... |
| 264 | +``` |
| 265 | + |
| 266 | +Namespaces can define default behaviour for `retry` , `processing_deadline` and |
| 267 | +`expires` for the tasks they contain. Without explicit routing, any namespace |
| 268 | +will be run in our `default` worker pools. If your task namespace will be |
| 269 | +high-throughput (more than 1500 tasks per second) consider provisioning |
| 270 | +a dedicated pool for your tasks. |
| 271 | + |
| 272 | +# Periodically Scheduled Tasks |
| 273 | + |
| 274 | +Task can also be set to spawn on a periodic schedule. To accomplish this, simply |
| 275 | +update the `TASKWORKER_SCHEDULE` configuration found in |
| 276 | +`src/sentry/conf/server.py` with the appropriate namespace, task, and schedule. |
| 277 | +Taskworker supports `timedelta` and `crontab` schedule types: |
| 278 | + |
| 279 | +```python |
| 280 | +TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = { |
| 281 | + "send-beacon": { |
| 282 | + "task": "selfhosted:sentry.tasks.send_beacon", |
| 283 | + "schedule": task_crontab("0", "*/1", "*", "*", "*"), |
| 284 | + }, |
| 285 | +} |
| 286 | +``` |
0 commit comments