Re-enabling tests in AsyncKafkaConsumer

This commit is contained in:
Kirk True 2025-09-10 21:16:46 -07:00
parent b5d7d01dbc
commit d4802c78e3
2 changed files with 28 additions and 26 deletions

View File

@ -892,7 +892,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
CompositePollEvent event = new CompositePollEvent(deadlineMs, pollTimeMs, nextStep); CompositePollEvent event = new CompositePollEvent(deadlineMs, pollTimeMs, nextStep);
applicationEventHandler.add(event); applicationEventHandler.add(event);
CompositePollResult result = ConsumerUtils.getResult(event.future()); CompositePollResult result = ConsumerUtils.getResult(event.future(), defaultApiTimeoutMs.toMillis());
if (result == CompositePollResult.NEEDS_OFFSET_COMMIT_CALLBACKS) { if (result == CompositePollResult.NEEDS_OFFSET_COMMIT_CALLBACKS) {
offsetCommitCallbackInvoker.executeCallbacks(); offsetCommitCallbackInvoker.executeCallbacks();

View File

@ -42,8 +42,9 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent;
import org.apache.kafka.clients.consumer.internals.events.CompositePollResult;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@ -92,7 +93,6 @@ import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -411,7 +411,6 @@ public class AsyncKafkaConsumerTest {
assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
} }
@Disabled
@Test @Test
public void testWakeupBeforeCallingPoll() { public void testWakeupBeforeCallingPoll() {
consumer = newConsumer(); consumer = newConsumer();
@ -427,11 +426,11 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup(); consumer.wakeup();
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
} }
@Disabled
@Test @Test
public void testWakeupAfterEmptyFetch() { public void testWakeupAfterEmptyFetch() {
consumer = newConsumer(); consumer = newConsumer();
@ -448,11 +447,11 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
} }
@Disabled
@Test @Test
public void testWakeupAfterNonEmptyFetch() { public void testWakeupAfterNonEmptyFetch() {
consumer = newConsumer(); consumer = newConsumer();
@ -473,13 +472,13 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call // the previously ignored wake-up should not be ignored in the next call
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
} }
@Disabled
@Test @Test
public void testCommitInRebalanceCallback() { public void testCommitInRebalanceCallback() {
consumer = newConsumer(); consumer = newConsumer();
@ -511,11 +510,11 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully(); completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener); consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING);
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get()); assertTrue(callbackExecuted.get());
} }
@Disabled
@Test @Test
public void testClearWakeupTriggerAfterPoll() { public void testClearWakeupTriggerAfterPoll() {
consumer = newConsumer(); consumer = newConsumer();
@ -534,6 +533,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -668,7 +668,6 @@ public class AsyncKafkaConsumerTest {
return allValues.get(allValues.size() - 1); return allValues.get(allValues.size() - 1);
} }
@Disabled
@Test @Test
public void testEnsurePollExecutedCommitAsyncCallbacks() { public void testEnsurePollExecutedCommitAsyncCallbacks() {
consumer = newConsumer(); consumer = newConsumer();
@ -681,6 +680,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.NEEDS_OFFSET_COMMIT_CALLBACKS);
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback); assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
} }
@ -1202,14 +1202,12 @@ public class AsyncKafkaConsumerTest {
assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
} }
@Disabled
@Test @Test
public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
consumer = newConsumer(); consumer = newConsumer();
testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
} }
@Disabled
@Test @Test
public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
// Create consumer without group id so committed offsets are not used for updating positions // Create consumer without group id so committed offsets are not used for updating positions
@ -1455,7 +1453,6 @@ public class AsyncKafkaConsumerTest {
* callback execution does <em>not</em> immediately errors. Instead, those errors are forwarded to the * callback execution does <em>not</em> immediately errors. Instead, those errors are forwarded to the
* application event thread for the {@link ConsumerMembershipManager} to handle. * application event thread for the {@link ConsumerMembershipManager} to handle.
*/ */
@Disabled
@ParameterizedTest @ParameterizedTest
@MethodSource("listenerCallbacksInvokeSource") @MethodSource("listenerCallbacksInvokeSource")
public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> methodNames, public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> methodNames,
@ -1484,6 +1481,8 @@ public class AsyncKafkaConsumerTest {
} }
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING);
// This will trigger the background event queue to process our background event message. // This will trigger the background event queue to process our background event message.
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
if (expectedException.isPresent()) { if (expectedException.isPresent()) {
@ -1543,7 +1542,6 @@ public class AsyncKafkaConsumerTest {
); );
} }
@Disabled
@Test @Test
public void testBackgroundError() { public void testBackgroundError() {
final String groupId = "consumerGroupA"; final String groupId = "consumerGroupA";
@ -1555,12 +1553,12 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0))); consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING);
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage()); assertEquals(expectedException.getMessage(), exception.getMessage());
} }
@Disabled
@Test @Test
public void testMultipleBackgroundErrors() { public void testMultipleBackgroundErrors() {
final String groupId = "consumerGroupA"; final String groupId = "consumerGroupA";
@ -1575,6 +1573,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0))); consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING);
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage()); assertEquals(expectedException1.getMessage(), exception.getMessage());
@ -1639,7 +1638,6 @@ public class AsyncKafkaConsumerTest {
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
} }
@Disabled
@Test @Test
public void testEnsurePollEventSentOnConsumerPoll() { public void testEnsurePollEventSentOnConsumerPoll() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
@ -1659,9 +1657,9 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully(); completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1")); consumer.subscribe(singletonList("topic1"));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
consumer.poll(Duration.ofMillis(100)); consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class)); verify(applicationEventHandler).add(any(CompositePollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
} }
private Properties requiredConsumerConfigAndGroupId(final String groupId) { private Properties requiredConsumerConfigAndGroupId(final String groupId) {
@ -1678,13 +1676,10 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1))); consumer.assign(singleton(new TopicPartition("t1", 1)));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
} }
@Disabled
@Test @Test
public void testLongPollWaitIsLimited() { public void testLongPollWaitIsLimited() {
consumer = newConsumer(); consumer = newConsumer();
@ -1716,6 +1711,7 @@ public class AsyncKafkaConsumerTest {
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
// And then poll for up to 10000ms, which should return 2 records without timing out // And then poll for up to 10000ms, which should return 2 records without timing out
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000)); ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count()); assertEquals(2, returnedRecords.count());
@ -1804,7 +1800,6 @@ public class AsyncKafkaConsumerTest {
* Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)}
* causes {@link InterruptException} to be thrown. * causes {@link InterruptException} to be thrown.
*/ */
@Disabled
@Test @Test
public void testPollThrowsInterruptExceptionIfInterrupted() { public void testPollThrowsInterruptExceptionIfInterrupted() {
consumer = newConsumer(); consumer = newConsumer();
@ -1821,6 +1816,7 @@ public class AsyncKafkaConsumerTest {
try { try {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
} finally { } finally {
// clear interrupted state again since this thread may be reused by JUnit // clear interrupted state again since this thread may be reused by JUnit
@ -1845,7 +1841,6 @@ public class AsyncKafkaConsumerTest {
verify(backgroundEventReaper).reap(time.milliseconds()); verify(backgroundEventReaper).reap(time.milliseconds());
} }
@Disabled
@Test @Test
void testReaperInvokedInPoll() { void testReaperInvokedInPoll() {
consumer = newConsumer(); consumer = newConsumer();
@ -1854,6 +1849,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Collections.singletonList("topic")); consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(backgroundEventReaper).reap(time.milliseconds()); verify(backgroundEventReaper).reap(time.milliseconds());
} }
@ -1905,7 +1901,6 @@ public class AsyncKafkaConsumerTest {
assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
} }
@Disabled
@Test @Test
public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() {
consumer = newConsumer(); consumer = newConsumer();
@ -1918,6 +1913,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(new TopicPartition("topic1", 0))); consumer.assign(singleton(new TopicPartition("topic1", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollResult.COMPLETE);
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@ -1925,7 +1921,6 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Pattern.compile("t*")); consumer.subscribe(Pattern.compile("t*"));
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
} }
@Test @Test
@ -1967,7 +1962,6 @@ public class AsyncKafkaConsumerTest {
// SubscriptionPattern is supported as of ConsumerGroupHeartbeatRequest v1. Clients using subscribe // SubscriptionPattern is supported as of ConsumerGroupHeartbeatRequest v1. Clients using subscribe
// (SubscribePattern) against older broker versions should get UnsupportedVersionException on poll after subscribe // (SubscribePattern) against older broker versions should get UnsupportedVersionException on poll after subscribe
@Disabled
@Test @Test
public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException { public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException {
final Properties props = requiredConsumerConfig(); final Properties props = requiredConsumerConfig();
@ -2235,4 +2229,12 @@ public class AsyncKafkaConsumerTest {
return null; return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); }).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
} }
private void markResultForCompositePollEvent(CompositePollResult result) {
doAnswer(invocation -> {
CompositePollEvent event = invocation.getArgument(0);
event.future().complete(result);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
}
} }