KAFKA-19091 Fix race condition in DelayedFutureTest (#19553)

The root case of flakiness is race condition between worker thread
(thread which is executing the test) and executor-testDelayedFuture
(thread which should execute callback).

It was fixed with TestUtils#waitForCondition to wait until callback will
be done

Test evidence:

Test was running 1000 times with repeated test.

Results: `~/p/kafka (bloku/kafka-19091) [1]> ./gradlew server:test
--tests DelayedFutureTest --fail-fast > res.txt` `~/p/kafka
(bloku/kafka-19091)> grep FAILED res.txt ` `~/p/kafka
(bloku/kafka-19091) [1]>`

res.txt: `> Task :server:test`

`Gradle Test Run :server:test > Gradle Test Executor 14 >
DelayedFutureTest > testDelayedFuture() > repetition 1 of 1000 PASSED`

...

`BUILD SUCCESSFUL in 37m`

Reviewers: Ken Huang <s7133700@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
 PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Uladzislau Blok 2025-05-14 17:12:40 +02:00 committed by GitHub
parent 4f2e3ecad4
commit 053b9e423c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 11 additions and 13 deletions

View File

@ -26,7 +26,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -49,13 +49,11 @@ public class DelayedFutureTest {
.map(Thread::getName)
.anyMatch(name -> name.contains("DelayedExecutor-" + purgatoryName));
Function<List<CompletableFuture<Integer>>, Void> updateResult = futures -> {
Consumer<List<CompletableFuture<Integer>>> updateResult = futures ->
result.set(futures.stream()
.filter(Predicate.not(CompletableFuture::isCompletedExceptionally))
.mapToInt(future -> assertDoesNotThrow(() -> future.get()))
.sum());
return null;
};
assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
@ -64,7 +62,7 @@ public class DelayedFutureTest {
CompletableFuture.completedFuture(10),
CompletableFuture.completedFuture(11)
);
DelayedFuture<Integer> r1 = purgatory.tryCompleteElseWatch(100000L, futures1, () -> updateResult.apply(futures1));
DelayedFuture<Integer> r1 = purgatory.tryCompleteElseWatch(100000L, futures1, () -> updateResult.accept(futures1));
assertTrue(r1.isCompleted(), "r1 not completed");
assertEquals(21, result.get());
assertFalse(hasExecutorThread.get(), "Unnecessary thread created");
@ -72,7 +70,7 @@ public class DelayedFutureTest {
// Two delayed futures: callback should wait for both to complete
result.set(-1);
List<CompletableFuture<Integer>> futures2 = List.of(new CompletableFuture<>(), new CompletableFuture<>());
DelayedFuture<Integer> r2 = purgatory.tryCompleteElseWatch(100000L, futures2, () -> updateResult.apply(futures2));
DelayedFuture<Integer> r2 = purgatory.tryCompleteElseWatch(100000L, futures2, () -> updateResult.accept(futures2));
assertFalse(r2.isCompleted(), "r2 should be incomplete");
futures2.get(0).complete(20);
assertFalse(r2.isCompleted());
@ -88,7 +86,7 @@ public class DelayedFutureTest {
new CompletableFuture<>(),
CompletableFuture.completedFuture(31)
);
DelayedFuture<Integer> r3 = purgatory.tryCompleteElseWatch(100000L, futures3, () -> updateResult.apply(futures3));
DelayedFuture<Integer> r3 = purgatory.tryCompleteElseWatch(100000L, futures3, () -> updateResult.accept(futures3));
assertFalse(r3.isCompleted(), "r3 should be incomplete");
assertEquals(-1, result.get());
futures3.get(0).complete(30);
@ -100,7 +98,7 @@ public class DelayedFutureTest {
long start = Time.SYSTEM.hiResClockMs();
long expirationMs = 2000L;
List<CompletableFuture<Integer>> futures4 = List.of(new CompletableFuture<>(), new CompletableFuture<>());
DelayedFuture<Integer> r4 = purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> updateResult.apply(futures4));
DelayedFuture<Integer> r4 = purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> updateResult.accept(futures4));
futures4.get(0).complete(40);
TestUtils.waitForCondition(() -> futures4.get(1).isDone(), "r4 futures not expired");
assertTrue(r4.isCompleted(), "r4 not completed after timeout");
@ -109,7 +107,7 @@ public class DelayedFutureTest {
assertEquals(40, futures4.get(0).get());
Exception exception = assertThrows(ExecutionException.class, () -> futures4.get(1).get());
assertEquals(TimeoutException.class, exception.getCause().getClass());
assertEquals(40, result.get());
TestUtils.waitForCondition(() -> result.get() == 40, "callback not invoked");
} finally {
purgatory.shutdown();
}