Skip to content

Commit 981b3b4

Browse files
TheForbiddenAisiri-varmacicoyle
authored
Add retry handler support (#1412)
* Add retry handler support Signed-off-by: Mason <[email protected]> * Wrap DurableTask objects Signed-off-by: Mason <[email protected]> * Rename method Signed-off-by: Mason <[email protected]> * Add isNonRetriable field to WorkflowTaskFailureDetails Signed-off-by: Mason <[email protected]> * Add unit test Signed-off-by: Mason <[email protected]> * Removed duplicate WorkflowFailureDetails class Signed-off-by: Mason <[email protected]> * Removed unneeded when statements in retry policy unit test Signed-off-by: Mason <[email protected]> * Add unit test to test both RetryPolicy and RetryHandler Signed-off-by: Mason <[email protected]> * Create toRetryPolicy method Signed-off-by: Mason <[email protected]> --------- Signed-off-by: Mason <[email protected]> Co-authored-by: Siri Varma Vegiraju <[email protected]> Co-authored-by: Cassie Coyle <[email protected]>
1 parent dcaca77 commit 981b3b4

File tree

8 files changed

+288
-9
lines changed

8 files changed

+288
-9
lines changed

sdk-workflows/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
<dependency>
4848
<groupId>io.dapr</groupId>
4949
<artifactId>durabletask-client</artifactId>
50-
<version>1.5.5</version>
50+
<version>1.5.6</version>
5151
</dependency>
5252
<!--
5353
manually declare durabletask-client's jackson dependencies

sdk-workflows/src/main/java/io/dapr/workflows/WorkflowTaskOptions.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,27 @@
1616
public class WorkflowTaskOptions {
1717

1818
private final WorkflowTaskRetryPolicy retryPolicy;
19+
private final WorkflowTaskRetryHandler retryHandler;
1920

20-
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
21+
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) {
2122
this.retryPolicy = retryPolicy;
23+
this.retryHandler = retryHandler;
24+
}
25+
26+
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
27+
this(retryPolicy, null);
28+
}
29+
30+
public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) {
31+
this(null, retryHandler);
2232
}
2333

2434
public WorkflowTaskRetryPolicy getRetryPolicy() {
2535
return retryPolicy;
2636
}
2737

38+
public WorkflowTaskRetryHandler getRetryHandler() {
39+
return retryHandler;
40+
}
41+
2842
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.workflows;
15+
16+
import io.dapr.workflows.client.WorkflowFailureDetails;
17+
import io.dapr.workflows.runtime.DefaultWorkflowContext;
18+
19+
import java.time.Duration;
20+
21+
public class WorkflowTaskRetryContext {
22+
23+
private final DefaultWorkflowContext workflowContext;
24+
private final int lastAttemptNumber;
25+
private final WorkflowFailureDetails lastFailure;
26+
private final Duration totalRetryTime;
27+
28+
/**
29+
* Constructor for WorkflowTaskRetryContext.
30+
*
31+
* @param workflowContext The workflow context
32+
* @param lastAttemptNumber The number of the previous attempt
33+
* @param lastFailure The failure details from the most recent failure
34+
* @param totalRetryTime The amount of time spent retrying
35+
*/
36+
public WorkflowTaskRetryContext(
37+
DefaultWorkflowContext workflowContext,
38+
int lastAttemptNumber,
39+
WorkflowFailureDetails lastFailure,
40+
Duration totalRetryTime) {
41+
this.workflowContext = workflowContext;
42+
this.lastAttemptNumber = lastAttemptNumber;
43+
this.lastFailure = lastFailure;
44+
this.totalRetryTime = totalRetryTime;
45+
}
46+
47+
/**
48+
* Gets the context of the current workflow.
49+
*
50+
* <p>The workflow context can be used in retry handlers to schedule timers (via the
51+
* {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be
52+
* used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method.
53+
*
54+
* @return the context of the parent workflow
55+
*/
56+
public DefaultWorkflowContext getWorkflowContext() {
57+
return this.workflowContext;
58+
}
59+
60+
/**
61+
* Gets the details of the previous task failure, including the exception type, message, and callstack.
62+
*
63+
* @return the details of the previous task failure
64+
*/
65+
public WorkflowFailureDetails getLastFailure() {
66+
return this.lastFailure;
67+
}
68+
69+
/**
70+
* Gets the previous retry attempt number. This number starts at 1 and increments each time the retry handler
71+
* is invoked for a particular task failure.
72+
*
73+
* @return the previous retry attempt number
74+
*/
75+
public int getLastAttemptNumber() {
76+
return this.lastAttemptNumber;
77+
}
78+
79+
/**
80+
* Gets the total amount of time spent in a retry loop for the current task.
81+
*
82+
* @return the total amount of time spent in a retry loop for the current task
83+
*/
84+
public Duration getTotalRetryTime() {
85+
return this.totalRetryTime;
86+
}
87+
88+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.workflows;
15+
16+
public interface WorkflowTaskRetryHandler {
17+
18+
/**
19+
* Invokes retry handler logic. Return value indicates whether to continue retrying.
20+
*
21+
* @param retryContext The context of the retry
22+
* @return {@code true} to continue retrying or {@code false} to stop retrying.
23+
*/
24+
boolean handle(WorkflowTaskRetryContext retryContext);
25+
26+
}

sdk-workflows/src/main/java/io/dapr/workflows/client/WorkflowFailureDetails.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,14 @@ public interface WorkflowFailureDetails {
3939
*/
4040
String getStackTrace();
4141

42+
/**
43+
* Checks whether the failure was caused by the provided exception class.
44+
*
45+
* @param exceptionClass the exception class to check
46+
* @return {@code true} if the failure was caused by the provided exception class
47+
*/
48+
default boolean isCausedBy(Class<? extends Exception> exceptionClass) {
49+
throw new UnsupportedOperationException("This method is not implemented");
50+
}
51+
4252
}

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
package io.dapr.workflows.runtime;
1515

1616
import io.dapr.durabletask.CompositeTaskFailedException;
17+
import io.dapr.durabletask.RetryHandler;
1718
import io.dapr.durabletask.RetryPolicy;
1819
import io.dapr.durabletask.Task;
1920
import io.dapr.durabletask.TaskCanceledException;
2021
import io.dapr.durabletask.TaskOptions;
2122
import io.dapr.durabletask.TaskOrchestrationContext;
2223
import io.dapr.workflows.WorkflowContext;
2324
import io.dapr.workflows.WorkflowTaskOptions;
25+
import io.dapr.workflows.WorkflowTaskRetryContext;
26+
import io.dapr.workflows.WorkflowTaskRetryHandler;
2427
import io.dapr.workflows.WorkflowTaskRetryPolicy;
2528
import org.slf4j.Logger;
2629
import org.slf4j.LoggerFactory;
@@ -228,22 +231,61 @@ public UUID newUuid() {
228231
return this.innerContext.newUUID();
229232
}
230233

231-
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
234+
private TaskOptions toTaskOptions(WorkflowTaskOptions options) {
232235
if (options == null) {
233236
return null;
234237
}
235238

236-
WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
239+
RetryPolicy retryPolicy = toRetryPolicy(options.getRetryPolicy());
240+
RetryHandler retryHandler = toRetryHandler(options.getRetryHandler());
241+
242+
return new TaskOptions(retryPolicy, retryHandler);
243+
}
244+
245+
/**
246+
* Converts a {@link WorkflowTaskRetryPolicy} to a {@link RetryPolicy}.
247+
*
248+
* @param workflowTaskRetryPolicy The {@link WorkflowTaskRetryPolicy} being converted
249+
* @return A {@link RetryPolicy}
250+
*/
251+
private RetryPolicy toRetryPolicy(WorkflowTaskRetryPolicy workflowTaskRetryPolicy) {
252+
if (workflowTaskRetryPolicy == null) {
253+
return null;
254+
}
255+
237256
RetryPolicy retryPolicy = new RetryPolicy(
238-
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
239-
workflowTaskRetryPolicy.getFirstRetryInterval()
257+
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
258+
workflowTaskRetryPolicy.getFirstRetryInterval()
240259
);
241260

242261
retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
243262
if (workflowTaskRetryPolicy.getRetryTimeout() != null) {
244263
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
245264
}
246265

247-
return new TaskOptions(retryPolicy);
266+
return retryPolicy;
267+
}
268+
269+
/**
270+
* Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}.
271+
*
272+
* @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted
273+
* @return A {@link RetryHandler}
274+
*/
275+
private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) {
276+
if (workflowTaskRetryHandler == null) {
277+
return null;
278+
}
279+
280+
return retryContext -> {
281+
WorkflowTaskRetryContext workflowRetryContext = new WorkflowTaskRetryContext(
282+
this,
283+
retryContext.getLastAttemptNumber(),
284+
new DefaultWorkflowFailureDetails(retryContext.getLastFailure()),
285+
retryContext.getTotalRetryTime()
286+
);
287+
288+
return workflowTaskRetryHandler.handle(workflowRetryContext);
289+
};
248290
}
249291
}

sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ public String getStackTrace() {
6262
return workflowFailureDetails.getStackTrace();
6363
}
6464

65+
/**
66+
* Checks whether the failure was caused by the provided exception class.
67+
*
68+
* @param exceptionClass the exception class to check
69+
* @return {@code true} if the failure was caused by the provided exception class
70+
*/
71+
@Override
72+
public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
73+
return workflowFailureDetails.isCausedBy(exceptionClass);
74+
}
75+
6576
@Override
6677
public String toString() {
6778
return workflowFailureDetails.toString();

sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
package io.dapr.workflows;
1515

1616
import io.dapr.durabletask.CompositeTaskFailedException;
17+
import io.dapr.durabletask.FailureDetails;
18+
import io.dapr.durabletask.RetryContext;
19+
import io.dapr.durabletask.RetryHandler;
1720
import io.dapr.durabletask.Task;
1821
import io.dapr.durabletask.TaskCanceledException;
1922
import io.dapr.durabletask.TaskOptions;
@@ -35,10 +38,11 @@
3538
import java.util.List;
3639

3740
import static org.junit.jupiter.api.Assertions.assertEquals;
38-
import static org.junit.jupiter.api.Assertions.assertNotNull;
41+
import static org.junit.jupiter.api.Assertions.assertNull;
3942
import static org.mockito.ArgumentMatchers.any;
4043
import static org.mockito.ArgumentMatchers.eq;
4144
import static org.mockito.Mockito.mock;
45+
import static org.mockito.Mockito.spy;
4246
import static org.mockito.Mockito.times;
4347
import static org.mockito.Mockito.verify;
4448
import static org.mockito.Mockito.when;
@@ -278,7 +282,7 @@ public void callChildWorkflowWithName() {
278282
}
279283

280284
@Test
281-
public void callChildWorkflowWithOptions() {
285+
public void callChildWorkflowWithRetryPolicy() {
282286
String expectedName = "TestActivity";
283287
String expectedInput = "TestInput";
284288
String expectedInstanceId = "TestInstanceId";
@@ -305,6 +309,90 @@ public void callChildWorkflowWithOptions() {
305309
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
306310
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
307311
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
312+
assertNull(taskOptions.getRetryHandler());
313+
}
314+
315+
@Test
316+
public void callChildWorkflowWithRetryHandler() {
317+
String expectedName = "TestActivity";
318+
String expectedInput = "TestInput";
319+
String expectedInstanceId = "TestInstanceId";
320+
321+
WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
322+
@Override
323+
public boolean handle(WorkflowTaskRetryContext retryContext) {
324+
return true;
325+
}
326+
});
327+
328+
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryHandler);
329+
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
330+
331+
context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);
332+
333+
verify(mockInnerContext, times(1))
334+
.callSubOrchestrator(
335+
eq(expectedName),
336+
eq(expectedInput),
337+
eq(expectedInstanceId),
338+
captor.capture(),
339+
eq(String.class)
340+
);
341+
342+
TaskOptions taskOptions = captor.getValue();
343+
344+
RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
345+
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);
346+
347+
durableRetryHandler.handle(retryContext);
348+
349+
verify(retryHandler, times(1)).handle(any());
350+
assertNull(taskOptions.getRetryPolicy());
351+
}
352+
353+
@Test
354+
public void callChildWorkflowWithRetryPolicyAndHandler() {
355+
String expectedName = "TestActivity";
356+
String expectedInput = "TestInput";
357+
String expectedInstanceId = "TestInstanceId";
358+
359+
WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
360+
.setMaxNumberOfAttempts(1)
361+
.setFirstRetryInterval(Duration.ofSeconds(10))
362+
.build();
363+
364+
WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
365+
@Override
366+
public boolean handle(WorkflowTaskRetryContext retryContext) {
367+
return true;
368+
}
369+
});
370+
371+
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy, retryHandler);
372+
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
373+
374+
context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);
375+
376+
verify(mockInnerContext, times(1))
377+
.callSubOrchestrator(
378+
eq(expectedName),
379+
eq(expectedInput),
380+
eq(expectedInstanceId),
381+
captor.capture(),
382+
eq(String.class)
383+
);
384+
385+
TaskOptions taskOptions = captor.getValue();
386+
387+
RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
388+
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);
389+
390+
durableRetryHandler.handle(retryContext);
391+
392+
verify(retryHandler, times(1)).handle(any());
393+
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
394+
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
395+
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
308396
}
309397

310398
@Test

0 commit comments

Comments
 (0)