From d4802c78e3b91f2d702ad88a76ca4764b7e8777d Mon Sep 17 00:00:00 2001 From: Kirk True Date: Wed, 10 Sep 2025 21:16:46 -0700 Subject: [PATCH] Re-enabling tests in AsyncKafkaConsumer --- .../internals/AsyncKafkaConsumer.java | 2 +- .../internals/AsyncKafkaConsumerTest.java | 52 ++++++++++--------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index b672eea2927..3a1af95bd9b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -892,7 +892,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { CompositePollEvent event = new CompositePollEvent(deadlineMs, pollTimeMs, nextStep); applicationEventHandler.add(event); - CompositePollResult result = ConsumerUtils.getResult(event.future()); + CompositePollResult result = ConsumerUtils.getResult(event.future(), defaultApiTimeoutMs.toMillis()); if (result == CompositePollResult.NEEDS_OFFSET_COMMIT_CALLBACKS) { offsetCommitCallbackInvoker.executeCallbacks(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 4875f6e00d3..b4a06b1f0f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -42,8 +42,9 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; +import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent; +import org.apache.kafka.clients.consumer.internals.events.CompositePollResult; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; -import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; @@ -92,7 +93,6 @@ import org.apache.kafka.test.MockConsumerInterceptor; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; @@ -411,7 +411,6 @@ public class AsyncKafkaConsumerTest { assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); } - @Disabled @Test public void testWakeupBeforeCallingPoll() { consumer = newConsumer(); @@ -427,11 +426,11 @@ public class AsyncKafkaConsumerTest { consumer.wakeup(); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Disabled @Test public void testWakeupAfterEmptyFetch() { consumer = newConsumer(); @@ -448,11 +447,11 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } - @Disabled @Test public void testWakeupAfterNonEmptyFetch() { consumer = newConsumer(); @@ -473,13 +472,13 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); // the previously ignored wake-up should not be ignored in the next call assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); } - @Disabled @Test public void testCommitInRebalanceCallback() { consumer = newConsumer(); @@ -511,11 +510,11 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList(topicName), listener); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING); consumer.poll(Duration.ZERO); assertTrue(callbackExecuted.get()); } - @Disabled @Test public void testClearWakeupTriggerAfterPoll() { consumer = newConsumer(); @@ -534,6 +533,7 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); consumer.poll(Duration.ZERO); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); @@ -668,7 +668,6 @@ public class AsyncKafkaConsumerTest { return allValues.get(allValues.size() - 1); } - @Disabled @Test public void testEnsurePollExecutedCommitAsyncCallbacks() { consumer = newConsumer(); @@ -681,6 +680,7 @@ public class AsyncKafkaConsumerTest { consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.NEEDS_OFFSET_COMMIT_CALLBACKS); assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback); } @@ -1202,14 +1202,12 @@ public class AsyncKafkaConsumerTest { assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get()); } - @Disabled @Test public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { consumer = newConsumer(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(); } - @Disabled @Test public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { // Create consumer without group id so committed offsets are not used for updating positions @@ -1455,7 +1453,6 @@ public class AsyncKafkaConsumerTest { * callback execution does not immediately errors. Instead, those errors are forwarded to the * application event thread for the {@link ConsumerMembershipManager} to handle. */ - @Disabled @ParameterizedTest @MethodSource("listenerCallbacksInvokeSource") public void testListenerCallbacksInvoke(List methodNames, @@ -1484,6 +1481,8 @@ public class AsyncKafkaConsumerTest { } markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING); + // This will trigger the background event queue to process our background event message. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. if (expectedException.isPresent()) { @@ -1543,7 +1542,6 @@ public class AsyncKafkaConsumerTest { ); } - @Disabled @Test public void testBackgroundError() { final String groupId = "consumerGroupA"; @@ -1555,12 +1553,12 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException.getMessage(), exception.getMessage()); } - @Disabled @Test public void testMultipleBackgroundErrors() { final String groupId = "consumerGroupA"; @@ -1575,6 +1573,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.NEEDS_BACKGROUND_EVENT_PROCESSING); final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); assertEquals(expectedException1.getMessage(), exception.getMessage()); @@ -1639,7 +1638,6 @@ public class AsyncKafkaConsumerTest { assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } - @Disabled @Test public void testEnsurePollEventSentOnConsumerPoll() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); @@ -1659,9 +1657,9 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(singletonList("topic1")); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); consumer.poll(Duration.ofMillis(100)); - verify(applicationEventHandler).add(any(PollEvent.class)); - verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class)); + verify(applicationEventHandler).add(any(CompositePollEvent.class)); } private Properties requiredConsumerConfigAndGroupId(final String groupId) { @@ -1678,13 +1676,10 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); consumer.poll(Duration.ZERO); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class)); } - @Disabled @Test public void testLongPollWaitIsLimited() { consumer = newConsumer(); @@ -1716,6 +1711,7 @@ public class AsyncKafkaConsumerTest { when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); assertEquals(2, returnedRecords.count()); @@ -1804,7 +1800,6 @@ public class AsyncKafkaConsumerTest { * Tests that calling {@link Thread#interrupt()} before {@link KafkaConsumer#poll(Duration)} * causes {@link InterruptException} to be thrown. */ - @Disabled @Test public void testPollThrowsInterruptExceptionIfInterrupted() { consumer = newConsumer(); @@ -1821,6 +1816,7 @@ public class AsyncKafkaConsumerTest { try { Thread.currentThread().interrupt(); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); } finally { // clear interrupted state again since this thread may be reused by JUnit @@ -1845,7 +1841,6 @@ public class AsyncKafkaConsumerTest { verify(backgroundEventReaper).reap(time.milliseconds()); } - @Disabled @Test void testReaperInvokedInPoll() { consumer = newConsumer(); @@ -1854,6 +1849,7 @@ public class AsyncKafkaConsumerTest { consumer.subscribe(Collections.singletonList("topic")); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); consumer.poll(Duration.ZERO); verify(backgroundEventReaper).reap(time.milliseconds()); } @@ -1905,7 +1901,6 @@ public class AsyncKafkaConsumerTest { assertEquals(AutoOffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } - @Disabled @Test public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { consumer = newConsumer(); @@ -1918,6 +1913,7 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(new TopicPartition("topic1", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); + markResultForCompositePollEvent(CompositePollResult.COMPLETE); consumer.poll(Duration.ZERO); verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); @@ -1925,7 +1921,6 @@ public class AsyncKafkaConsumerTest { consumer.subscribe(Pattern.compile("t*")); consumer.poll(Duration.ZERO); - verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class)); } @Test @@ -1967,7 +1962,6 @@ public class AsyncKafkaConsumerTest { // SubscriptionPattern is supported as of ConsumerGroupHeartbeatRequest v1. Clients using subscribe // (SubscribePattern) against older broker versions should get UnsupportedVersionException on poll after subscribe - @Disabled @Test public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException { final Properties props = requiredConsumerConfig(); @@ -2235,4 +2229,12 @@ public class AsyncKafkaConsumerTest { return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); } + + private void markResultForCompositePollEvent(CompositePollResult result) { + doAnswer(invocation -> { + CompositePollEvent event = invocation.getArgument(0); + event.future().complete(result); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); + } }