mirror of https://github.com/apache/kafka.git
Refactor and cleanup
This commit is contained in:
parent
8235ed2256
commit
a157071a0d
|
@ -59,7 +59,7 @@ public class ConsumerPollTestUtils {
|
|||
}
|
||||
|
||||
public static void waitForException(Consumer<?, ?> consumer,
|
||||
Function<KafkaException, Boolean> testCondition,
|
||||
Function<Throwable, Boolean> testCondition,
|
||||
String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
|
@ -67,8 +67,8 @@ public class ConsumerPollTestUtils {
|
|||
try {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return false;
|
||||
} catch (KafkaException e) {
|
||||
return testCondition.apply(e);
|
||||
} catch (Throwable t) {
|
||||
return testCondition.apply(t);
|
||||
}
|
||||
},
|
||||
conditionDetails
|
||||
|
|
|
@ -3654,7 +3654,11 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
|
|||
service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ofSeconds(5)));
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t instanceof ConcurrentModificationException,
|
||||
"Consumer did not throw ConcurrentModificationException within timeout"
|
||||
);
|
||||
client.wakeup();
|
||||
consumer.wakeup();
|
||||
} finally {
|
||||
|
|
|
@ -1490,9 +1490,9 @@ public class AsyncKafkaConsumerTest {
|
|||
Exception expectedException = expectedExceptionOpt.get();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
e -> Objects.equals(e.getClass(), expectedException.getClass()) &&
|
||||
Objects.equals(e.getMessage(), expectedException.getMessage()) &&
|
||||
Objects.equals(e.getCause(), expectedException.getCause()),
|
||||
t -> Objects.equals(t.getClass(), expectedException.getClass()) &&
|
||||
Objects.equals(t.getMessage(), expectedException.getMessage()) &&
|
||||
Objects.equals(t.getCause(), expectedException.getCause()),
|
||||
"Consumer.poll() did not throw the expected exception " + expectedException
|
||||
);
|
||||
} else {
|
||||
|
@ -1561,7 +1561,7 @@ public class AsyncKafkaConsumerTest {
|
|||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
e -> e.getMessage().equals(expectedException.getMessage()),
|
||||
t -> t.getMessage().equals(expectedException.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
|
||||
);
|
||||
}
|
||||
|
@ -1582,7 +1582,7 @@ public class AsyncKafkaConsumerTest {
|
|||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
e -> e.getMessage().equals(expectedException1.getMessage()),
|
||||
t -> t.getMessage().equals(expectedException1.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
|
||||
);
|
||||
assertTrue(backgroundEventQueue.isEmpty());
|
||||
|
|
Loading…
Reference in New Issue