mirror of https://github.com/apache/kafka.git
MINOR: Fix flaky state updater test (#18253)
The tests are flaky because the tests end before the verified calls are executed. This happens because the state updater thread executes the verified calls, but the thread that executes the tests with the verifications is a different thread. This commit fixes the flaky tests by enusring that the calls were performed by the state updater by either shutting down the state updater or waiting for the condition. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
a9eb06bbdf
commit
a0291a8d50
|
|
@ -320,7 +320,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
|
||||
|
||||
private void handleRuntimeException(final RuntimeException runtimeException) {
|
||||
log.error("An unexpected error occurred within the state updater thread: " + runtimeException);
|
||||
log.error("An unexpected error occurred within the state updater thread: {}", String.valueOf(runtimeException));
|
||||
addToExceptionsAndFailedTasksThenClearUpdatingAndPausedTasks(runtimeException);
|
||||
isRunning.set(false);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -290,7 +290,11 @@ class DefaultStateUpdaterTest {
|
|||
stateUpdater.add(task2);
|
||||
|
||||
verifyFailedTasks(IllegalStateException.class, task1);
|
||||
assertFalse(stateUpdater.isRunning());
|
||||
waitForCondition(
|
||||
() -> !stateUpdater.isRunning(),
|
||||
VERIFICATION_TIMEOUT,
|
||||
"Did not switch to non-running within the given timeout!"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -1015,6 +1019,8 @@ class DefaultStateUpdaterTest {
|
|||
verifyRestoredActiveTasks();
|
||||
verifyUpdatingTasks(task2);
|
||||
verifyExceptionsAndFailedTasks();
|
||||
// shutdown ensures that the test does not end before changelog reader methods verified below are called
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
verify(changelogReader, times(1)).enforceRestoreActive();
|
||||
verify(changelogReader, times(1)).transitToUpdateStandby();
|
||||
}
|
||||
|
|
@ -1152,6 +1158,8 @@ class DefaultStateUpdaterTest {
|
|||
public void shouldResumeStandbyTask() throws Exception {
|
||||
final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_A_0)).inState(State.RUNNING).build();
|
||||
shouldResumeStatefulTask(task);
|
||||
// shutdown ensures that the test does not end before changelog reader methods verified below are called
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
verify(changelogReader, times(2)).transitToUpdateStandby();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue