KAFKA-19299: Fix race condition in RemoteIndexCacheTest (#19927)

This MR should be couple of race conditions in RemoteIndexCacheTest.

1. There was a race condition between cache-cleanup-thread and test
thread, which wants to check that cache is gone. This was fixed with
TestUtils#waitForCondition
2. After each test we check that there is not thread leak. This check
wasn't working properly, because live of thread status is set by JVM
level, we can only set interrupted status (using private native void
interrupt0(); method under the hood), but we don't really know when JVM
will change the live status of thread. To fix this I've refactored
TestUtils#assertNoLeakedThreadsWithNameAndDaemonStatus method to use
TestUtils#waitForCondition. This fix should also affect few other tests,
which were flaky because of this check. See gradle run on

[develocity](https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.storage.internals.log.RemoteIndexCacheTest&tests.sortField=FLAKY)

After fix test were run 10000 times with repeated test annotation:

`./gradlew clean storage:test --tests

org.apache.kafka.storage.internals.log.RemoteIndexCacheTest.testCacheEntryIsDeletedOnRemoval`
...  `Gradle Test Run :storage:test > Gradle Test Executor 20 >
RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval() > repetition
9998 of 10000 PASSED`  `Gradle Test Run :storage:test > Gradle Test
Executor 20 > RemoteIndexCacheTest > testCacheEntryIsDeletedOnRemoval()
> repetition 9999 of 10000 PASSED`  `Gradle Test Run :storage:test >
Gradle Test Executor 20 > RemoteIndexCacheTest >
testCacheEntryIsDeletedOnRemoval() > repetition 10000 of 10000 PASSED`
`BUILD SUCCESSFUL in 20m 9s`  `148 actionable tasks: 148 executed`

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Uladzislau Blok 2025-09-22 17:20:14 +02:00 committed by GitHub
parent da6a562f6d
commit f16d1f3c9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 15 deletions

View File

@ -219,7 +219,7 @@ public class KafkaProducerTest {
} }
@AfterEach @AfterEach
public void detectLeaks() { public void detectLeaks() throws InterruptedException {
// Assert no thread leakage of Kafka producer. // Assert no thread leakage of Kafka producer.
TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE); TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE);
} }

View File

@ -74,7 +74,6 @@ import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -161,7 +160,9 @@ public class TestUtils {
* Asserts that there are no leaked threads with a specified name prefix and daemon status. * Asserts that there are no leaked threads with a specified name prefix and daemon status.
* This method checks all threads in the JVM, filters them by the provided thread name prefix * This method checks all threads in the JVM, filters them by the provided thread name prefix
* and daemon status, and verifies that no matching threads are alive. * and daemon status, and verifies that no matching threads are alive.
* If any matching threads are found, the test will fail. * Use the {@link #waitForCondition(TestCondition, String) waitForCondition} to retry the check at a regular interval
* until either no matching threads are found or the timeout is exceeded.
* If any matching, alive threads are found after the timeout has elapsed, the assertion will fail.
* *
* @param threadName The prefix of the thread names to check. Only threads whose names * @param threadName The prefix of the thread names to check. Only threads whose names
* start with this prefix will be considered. * start with this prefix will be considered.
@ -169,14 +170,11 @@ public class TestUtils {
* daemon status (either true for daemon threads or false for non-daemon threads) * daemon status (either true for daemon threads or false for non-daemon threads)
* will be considered. * will be considered.
* *
* @throws AssertionError If any thread with the specified name prefix and daemon status is found and is alive. * @throws AssertionError If any thread with the specified name prefix and daemon status are found after the timeout.
*/ */
public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) { public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) throws InterruptedException {
List<Thread> threads = Thread.getAllStackTraces().keySet().stream() waitForCondition(() -> Thread.getAllStackTraces().keySet().stream()
.filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)) .noneMatch(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)), String.format("Thread leak detected: %s", threadName));
.collect(Collectors.toList());
int threadCount = threads.size();
assertEquals(0, threadCount);
} }
/** /**

View File

@ -127,7 +127,7 @@ public class RemoteIndexCacheTest {
} }
@AfterEach @AfterEach
public void cleanup() { public void cleanup() throws InterruptedException {
reset(rsm); reset(rsm);
// the files created for the test will be deleted automatically on thread exit since we use temp dir // the files created for the test will be deleted automatically on thread exit since we use temp dir
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
@ -344,13 +344,13 @@ public class RemoteIndexCacheTest {
}, "Failed to delete index file"); }, "Failed to delete index file");
// verify no index files on disk // verify no index files on disk
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(),
"Offset index file should not be present on disk at " + tpDir.toPath()); "Offset index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(),
"Txn index file should not be present on disk at " + tpDir.toPath()); "Txn index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(),
"Time index file should not be present on disk at " + tpDir.toPath()); "Time index file should not be present on disk at " + tpDir.toPath());
assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(), TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(),
"Index file marked for deletion should not be present on disk at " + tpDir.toPath()); "Index file marked for deletion should not be present on disk at " + tpDir.toPath());
} }