From a157071a0d4c01ede279ccbaca913e22fb0fbc31 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Mon, 29 Sep 2025 13:04:57 -0700 Subject: [PATCH] Refactor and cleanup --- .../kafka/clients/consumer/ConsumerPollTestUtils.java | 6 +++--- .../kafka/clients/consumer/KafkaConsumerTest.java | 6 +++++- .../consumer/internals/AsyncKafkaConsumerTest.java | 10 +++++----- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPollTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPollTestUtils.java index d4a6b095764..116c47c4143 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPollTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPollTestUtils.java @@ -59,7 +59,7 @@ public class ConsumerPollTestUtils { } public static void waitForException(Consumer consumer, - Function testCondition, + Function testCondition, String conditionDetails) { try { TestUtils.waitForCondition( @@ -67,8 +67,8 @@ public class ConsumerPollTestUtils { try { consumer.poll(Duration.ZERO); return false; - } catch (KafkaException e) { - return testCondition.apply(e); + } catch (Throwable t) { + return testCondition.apply(t); } }, conditionDetails diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 3d864a9300a..91796885f2f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -3654,7 +3654,11 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { service.execute(() -> consumer.poll(Duration.ofSeconds(5))); try { TimeUnit.SECONDS.sleep(1); - assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ofSeconds(5))); + ConsumerPollTestUtils.waitForException( + consumer, + t -> t instanceof ConcurrentModificationException, + "Consumer did not throw ConcurrentModificationException within timeout" + ); client.wakeup(); consumer.wakeup(); } finally { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 98a7c34cc63..f0f799af995 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1490,9 +1490,9 @@ public class AsyncKafkaConsumerTest { Exception expectedException = expectedExceptionOpt.get(); ConsumerPollTestUtils.waitForException( consumer, - e -> Objects.equals(e.getClass(), expectedException.getClass()) && - Objects.equals(e.getMessage(), expectedException.getMessage()) && - Objects.equals(e.getCause(), expectedException.getCause()), + t -> Objects.equals(t.getClass(), expectedException.getClass()) && + Objects.equals(t.getMessage(), expectedException.getMessage()) && + Objects.equals(t.getCause(), expectedException.getCause()), "Consumer.poll() did not throw the expected exception " + expectedException ); } else { @@ -1561,7 +1561,7 @@ public class AsyncKafkaConsumerTest { markReconcileAndAutoCommitCompleteForPollEvent(); ConsumerPollTestUtils.waitForException( consumer, - e -> e.getMessage().equals(expectedException.getMessage()), + t -> t.getMessage().equals(expectedException.getMessage()), "Consumer.poll() did not fail with expected exception " + expectedException + " within timeout" ); } @@ -1582,7 +1582,7 @@ public class AsyncKafkaConsumerTest { markReconcileAndAutoCommitCompleteForPollEvent(); ConsumerPollTestUtils.waitForException( consumer, - e -> e.getMessage().equals(expectedException1.getMessage()), + t -> t.getMessage().equals(expectedException1.getMessage()), "Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout" ); assertTrue(backgroundEventQueue.isEmpty());