KAFKA-17554 Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest (#18298)

Jira: https://issues.apache.org/jira/browse/KAFKA-17554

In the previous workflow, the test passes under two conditions:

1. The `t1` thread is waiting for the main thread's `client.wakeup()`.
If successful, `t1` will wake up `t2`, allowing `t2` to complete the
future.
2. If `t1` fails to receive the `client.wakeup()` from the main thread,
`t2` will be woken up by the main thread.

In the previous implementation, we used a `CountDownLatch` to control
the execution of three threads, but it often led to race conditions.
Currently, we have modified it to use two threads to test this scenario.

I run `I=0;  while ./gradlew :clients:test --tests
ConsumerNetworkClientTest.testFutureCompletionOutsidePoll --rerun
--fail-fast; do  (( I=$I+1 )); echo "Completed run: $I"; sleep 1; done`
and pass 3000+ times.

![image](https://github.com/user-attachments/assets/3b8d804e-fbe0-4030-8686-4960fc717d07)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-09-24 18:42:25 +08:00 committed by GitHub
parent 1f7631c8c6
commit 8036e49a6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 12 additions and 24 deletions

View File

@ -41,7 +41,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.time.Duration; import java.time.Duration;
@ -266,45 +265,34 @@ public class ConsumerNetworkClientTest {
assertEquals(metadataException, exc); assertEquals(metadataException, exc);
} }
@Disabled("KAFKA-17554")
@Test @Test
public void testFutureCompletionOutsidePoll() throws Exception { public void testFutureCompletionOutsidePoll() throws Exception {
// Tests the scenario in which the request that is being awaited in one thread // Tests the scenario in which the request that is being awaited in one thread
// is received and completed in another thread. // is received and completed in another thread.
final CountDownLatch t1TheardCountDownLatch = new CountDownLatch(1);
final CountDownLatch t2ThreadCountDownLatch = new CountDownLatch(2);
final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat()); final RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
consumerClient.pollNoWakeup(); // dequeue and send the request consumerClient.pollNoWakeup(); // dequeue and send the request
CountDownLatch bothThreadsReady = new CountDownLatch(2);
client.enableBlockingUntilWakeup(2); client.enableBlockingUntilWakeup(2);
Thread t1 = new Thread(() -> {
t1TheardCountDownLatch.countDown(); Thread t1 = new Thread(() -> {
bothThreadsReady.countDown();
consumerClient.pollNoWakeup(); consumerClient.pollNoWakeup();
t2ThreadCountDownLatch.countDown(); });
Thread t2 = new Thread(() -> {
bothThreadsReady.countDown();
consumerClient.poll(future);
}); });
t1.start(); t1.start();
Thread t2 = new Thread(() -> {
try {
t2ThreadCountDownLatch.await();
consumerClient.poll(future);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t2.start(); t2.start();
// Simulate a network response and return from the poll in t1 // Wait until both threads are blocked in poll
bothThreadsReady.await();
client.respond(heartbeatResponse(Errors.NONE)); client.respond(heartbeatResponse(Errors.NONE));
// Wait for t1 to block in poll
t1TheardCountDownLatch.await();
client.wakeup(); client.wakeup();
// while t1 is blocked in poll, t2 should be able to complete the future
t2ThreadCountDownLatch.countDown();
// Both threads should complete since t1 should wakeup t2 // Both threads should complete since t1 should wakeup t2
t1.join(); t1.join();