Skip to content

Commit a0291a8

Browse files
authored
MINOR: Fix flaky state updater test (#18253)
The tests are flaky because the tests end before the verified calls are executed. This happens because the state updater thread executes the verified calls, but the thread that executes the tests with the verifications is a different thread. This commit fixes the flaky tests by enusring that the calls were performed by the state updater by either shutting down the state updater or waiting for the condition. Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax <[email protected]>
1 parent a9eb06b commit a0291a8

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ private KafkaFutureImpl<Uuid> restoreConsumerInstanceId(final Duration timeout)
320320

321321

322322
private void handleRuntimeException(final RuntimeException runtimeException) {
323-
log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
323+
log.error("An unexpected error occurred within the state updater thread: {}", String.valueOf(runtimeException));
324324
addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);
325325
isRunning.set(false);
326326
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,11 @@ private void shouldThrowIfAddingTasksWithSameId(final Task task1, final Task tas
290290
stateUpdater.add(task2);
291291

292292
verifyFailedTasks(IllegalStateException.class, task1);
293-
assertFalse(stateUpdater.isRunning());
293+
waitForCondition(
294+
() -> !stateUpdater.isRunning(),
295+
VERIFICATION_TIMEOUT,
296+
"Did not switch to non-running within the given timeout!"
297+
);
294298
}
295299

296300
@Test
@@ -1015,6 +1019,8 @@ public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception {
10151019
verifyRestoredActiveTasks();
10161020
verifyUpdatingTasks(task2);
10171021
verifyExceptionsAndFailedTasks();
1022+
// shutdown ensures that the test does not end before changelog reader methods verified below are called
1023+
stateUpdater.shutdown(Duration.ofMinutes(1));
10181024
verify(changelogReader, times(1)).enforceRestoreActive();
10191025
verify(changelogReader, times(1)).transitToUpdateStandby();
10201026
}
@@ -1152,6 +1158,8 @@ public void shouldIdleWhenAllTasksPaused() throws Exception {
11521158
public void shouldResumeStandbyTask() throws Exception {
11531159
final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
11541160
shouldResumeStatefulTask(task);
1161+
// shutdown ensures that the test does not end before changelog reader methods verified below are called
1162+
stateUpdater.shutdown(Duration.ofMinutes(1));
11551163
verify(changelogReader, times(2)).transitToUpdateStandby();
11561164
}
11571165

0 commit comments

Comments
 (0)