diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index bf27fbaa11f..9d8aa35f8fe 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -219,7 +219,7 @@ public class KafkaProducerTest { } @AfterEach - public void detectLeaks() { + public void detectLeaks() throws InterruptedException { // Assert no thread leakage of Kafka producer. TestUtils.assertNoLeakedThreadsWithNameAndDaemonStatus(NETWORK_THREAD_PREFIX, Boolean.TRUE); } diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 7748bbe15f0..078d006e37a 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -74,7 +74,6 @@ import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -161,7 +160,9 @@ public class TestUtils { * Asserts that there are no leaked threads with a specified name prefix and daemon status. * This method checks all threads in the JVM, filters them by the provided thread name prefix * and daemon status, and verifies that no matching threads are alive. - * If any matching threads are found, the test will fail. + * Use the {@link #waitForCondition(TestCondition, String) waitForCondition} to retry the check at a regular interval + * until either no matching threads are found or the timeout is exceeded. + * If any matching, alive threads are found after the timeout has elapsed, the assertion will fail. * * @param threadName The prefix of the thread names to check. Only threads whose names * start with this prefix will be considered. @@ -169,14 +170,11 @@ public class TestUtils { * daemon status (either true for daemon threads or false for non-daemon threads) * will be considered. * - * @throws AssertionError If any thread with the specified name prefix and daemon status is found and is alive. + * @throws AssertionError If any thread with the specified name prefix and daemon status are found after the timeout. */ - public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) { - List threads = Thread.getAllStackTraces().keySet().stream() - .filter(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)) - .collect(Collectors.toList()); - int threadCount = threads.size(); - assertEquals(0, threadCount); + public static void assertNoLeakedThreadsWithNameAndDaemonStatus(String threadName, boolean isDaemon) throws InterruptedException { + waitForCondition(() -> Thread.getAllStackTraces().keySet().stream() + .noneMatch(t -> t.isDaemon() == isDaemon && t.isAlive() && t.getName().startsWith(threadName)), String.format("Thread leak detected: %s", threadName)); } /** diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java index 42002b69da6..25c8bce5c3a 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/RemoteIndexCacheTest.java @@ -127,7 +127,7 @@ public class RemoteIndexCacheTest { } @AfterEach - public void cleanup() { + public void cleanup() throws InterruptedException { reset(rsm); // the files created for the test will be deleted automatically on thread exit since we use temp dir Utils.closeQuietly(cache, "RemoteIndexCache created for unit test"); @@ -344,13 +344,13 @@ public class RemoteIndexCacheTest { }, "Failed to delete index file"); // verify no index files on disk - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty(), "Offset index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty(), "Txn index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty(), "Time index file should not be present on disk at " + tpDir.toPath()); - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent(), + TestUtils.waitForCondition(() -> getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty(), "Index file marked for deletion should not be present on disk at " + tpDir.toPath()); }