diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index d34316e142c..2d5906320b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -94,7 +94,6 @@ import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; @@ -287,12 +286,26 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { - this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>()); + this( + config, + keyDeserializer, + valueDeserializer, + Time.SYSTEM, + ApplicationEventHandler::new, + FetchCollector::new, + ConsumerMetadata::new, + new LinkedBlockingQueue<>() + ); } + // Visible for testing AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer, + final Time time, + final ApplicationEventHandlerFactory applicationEventHandlerFactory, + final FetchCollectorFactory fetchCollectorFactory, + final ConsumerMetadataFactory metadataFactory, final LinkedBlockingQueue backgroundEventQueue) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( @@ -305,7 +318,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.debug("Initializing the Kafka consumer"); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); - this.time = Time.SYSTEM; + this.time = time; List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); @@ -319,7 +332,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(), interceptorList, Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer)); - this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); + this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); @@ -360,7 +373,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { metadata, applicationEventQueue, requestManagersSupplier); - this.applicationEventHandler = new ApplicationEventHandler(logContext, + this.applicationEventHandler = applicationEventHandlerFactory.build( + logContext, time, applicationEventQueue, applicationEventProcessorSupplier, @@ -391,7 +405,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig); // The FetchCollector is only used on the application thread. - this.fetchCollector = new FetchCollector<>(logContext, + this.fetchCollector = fetchCollectorFactory.build(logContext, metadata, subscriptions, fetchConfig, @@ -420,49 +434,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } // Visible for testing - AsyncKafkaConsumer(LogContext logContext, - String clientId, - Deserializers deserializers, - FetchBuffer fetchBuffer, - FetchCollector fetchCollector, - ConsumerInterceptors interceptors, - Time time, - ApplicationEventHandler applicationEventHandler, - BlockingQueue backgroundEventQueue, - ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, - Metrics metrics, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - long retryBackoffMs, - int defaultApiTimeoutMs, - List assignors, - String groupId) { - this.log = logContext.logger(getClass()); - this.subscriptions = subscriptions; - this.clientId = clientId; - this.fetchBuffer = fetchBuffer; - this.fetchCollector = fetchCollector; - this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; - this.interceptors = Objects.requireNonNull(interceptors); - this.time = time; - this.backgroundEventProcessor = new BackgroundEventProcessor( - logContext, - backgroundEventQueue, - applicationEventHandler, - rebalanceListenerInvoker - ); - this.metrics = metrics; - this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty()); - this.metadata = metadata; - this.retryBackoffMs = retryBackoffMs; - this.defaultApiTimeoutMs = defaultApiTimeoutMs; - this.deserializers = deserializers; - this.applicationEventHandler = applicationEventHandler; - this.assignors = assignors; - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); - this.clientTelemetryReporter = Optional.empty(); - } - AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, @@ -563,6 +534,47 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ); } + // auxiliary interface for testing + interface ApplicationEventHandlerFactory { + + ApplicationEventHandler build( + final LogContext logContext, + final Time time, + final BlockingQueue applicationEventQueue, + final Supplier applicationEventProcessorSupplier, + final Supplier networkClientDelegateSupplier, + final Supplier requestManagersSupplier + ); + + } + + // auxiliary interface for testing + interface FetchCollectorFactory { + + FetchCollector build( + final LogContext logContext, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final Deserializers deserializers, + final FetchMetricsManager metricsManager, + final Time time + ); + + } + + // auxiliary interface for testing + interface ConsumerMetadataFactory { + + ConsumerMetadata build( + final ConsumerConfig config, + final SubscriptionState subscriptions, + final LogContext logContext, + final ClusterResourceListeners clusterResourceListeners + ); + + } + private Optional initializeGroupMetadata(final ConsumerConfig config, final GroupRebalanceConfig groupRebalanceConfig) { final Optional groupMetadata = initializeGroupMetadata( @@ -1756,8 +1768,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - // Visible for testing - void maybeInvokeCommitCallbacks() { + private void maybeInvokeCommitCallbacks() { if (callbacks() > 0) { invoker.executeCallbacks(); } @@ -1768,6 +1779,11 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { return invoker.callbackQueue.size(); } + // Visible for testing + SubscriptionState subscriptions() { + return subscriptions; + } + /** * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index cad3bc58a41..19734698afb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -27,11 +27,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; @@ -45,7 +45,6 @@ import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeAppl import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -53,22 +52,14 @@ import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.message.OffsetCommitResponseData; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.requests.OffsetCommitResponse; -import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; @@ -76,17 +67,12 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; -import org.mockito.MockedConstruction; import org.mockito.Mockito; -import org.mockito.stubbing.Answer; -import org.opentest4j.AssertionFailedError; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -94,11 +80,8 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -117,166 +100,180 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("unchecked") public class AsyncKafkaConsumerTest { - private AsyncKafkaConsumer consumer; - private FetchCollector fetchCollector; - private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder; - private ApplicationEventHandler applicationEventHandler; - private SubscriptionState subscriptions; - private BlockingQueue backgroundEventQueue; + private AsyncKafkaConsumer consumer = null; - @BeforeEach - public void setup() { - // By default, the consumer is part of a group and autoCommit is enabled. - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); - } - - private void setup(Optional groupInfo, boolean enableAutoCommit) { - testBuilder = new ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit, true); - applicationEventHandler = testBuilder.applicationEventHandler; - consumer = testBuilder.consumer; - fetchCollector = testBuilder.fetchCollector; - subscriptions = testBuilder.subscriptions; - backgroundEventQueue = testBuilder.backgroundEventQueue; - } + private final Time time = new MockTime(1); + private final FetchCollector fetchCollector = mock(FetchCollector.class); + private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); + private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); + private final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); @AfterEach - public void cleanup() { - if (testBuilder != null) { - shutDown(); + public void resetAll() { + backgroundEventQueue.clear(); + if (consumer != null) { + consumer.close(); } + consumer = null; + Mockito.framework().clearInlineMocks(); } - private void shutDown() { - prepAutocommitOnClose(); - testBuilder.close(); + private AsyncKafkaConsumer newConsumer() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); } - private void resetWithEmptyGroupId() { - // Create a consumer that is not configured as part of a group. - cleanup(); - setup(Optional.empty(), false); + private AsyncKafkaConsumer newConsumerWithoutGroupId() { + final Properties props = requiredConsumerProperties(); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); } - private void resetWithAutoCommitEnabled() { - cleanup(); - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); + @SuppressWarnings("UnusedReturnValue") + private AsyncKafkaConsumer newConsumerWithEmptyGroupId() { + final Properties props = requiredConsumerPropertiesAndGroupId(""); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); + } + + private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { + return new AsyncKafkaConsumer<>( + config, + new StringDeserializer(), + new StringDeserializer(), + time, + (a, b, c, d, e, f) -> applicationEventHandler, + (a, b, c, d, e, f, g) -> fetchCollector, + (a, b, c, d) -> metadata, + backgroundEventQueue + ); } @Test public void testSuccessfulStartupShutdown() { + consumer = newConsumer(); assertDoesNotThrow(() -> consumer.close()); } - @Test - public void testSuccessfulStartupShutdownWithAutoCommit() { - resetWithAutoCommitEnabled(); - TopicPartition tp = new TopicPartition("topic", 0); - consumer.assign(singleton(tp)); - consumer.seek(tp, 100); - prepAutocommitOnClose(); - } - @Test public void testInvalidGroupId() { - // Create consumer without group id - resetWithEmptyGroupId(); - assertThrows(InvalidGroupIdException.class, () -> consumer.committed(new HashSet<>())); + KafkaException e = assertThrows(KafkaException.class, this::newConsumerWithEmptyGroupId); + assertInstanceOf(InvalidGroupIdException.class, e.getCause()); } @Test public void testFailOnClosedConsumer() { + consumer = newConsumer(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); } @Test - public void testCommitAsync_NullCallback() throws InterruptedException { - CompletableFuture future = new CompletableFuture<>(); - Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L)); - offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + public void testCommitAsyncWithNullCallback() { + consumer = newConsumer(); + final TopicPartition t0 = new TopicPartition("t0", 2); + final TopicPartition t1 = new TopicPartition("t0", 3); + HashMap offsets = new HashMap<>(); + offsets.put(t0, new OffsetAndMetadata(10L)); + offsets.put(t1, new OffsetAndMetadata(20L)); - doReturn(future).when(consumer).commit(offsets, false); consumer.commitAsync(offsets, null); - future.complete(null); - TestUtils.waitForCondition(future::isDone, - 2000, - "commit future should complete"); - assertFalse(future.isCompletedExceptionally()); + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + assertEquals(offsets, commitEvent.offsets()); + assertDoesNotThrow(() -> commitEvent.future().complete(null)); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, null)); + } + + @Test + public void testCommitAsyncUserSuppliedCallbackNoException() { + consumer = newConsumer(); + + Map offsets = new HashMap<>(); + offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + completeCommitApplicationEventExceptionally(); + + MockCommitCallback callback = new MockCommitCallback(); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + forceCommitCallbackInvocation(); + + assertNull(callback.exception); } @ParameterizedTest @MethodSource("commitExceptionSupplier") - public void testCommitAsync_UserSuppliedCallback(Exception exception) { - CompletableFuture future = new CompletableFuture<>(); + public void testCommitAsyncUserSuppliedCallbackWithException(Exception exception) { + consumer = newConsumer(); Map offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + completeCommitApplicationEventExceptionally(exception); - doReturn(future).when(consumer).commit(offsets, false); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + forceCommitCallbackInvocation(); - if (exception == null) { - future.complete(null); - consumer.maybeInvokeCommitCallbacks(); - assertNull(callback.exception); - } else { - future.completeExceptionally(exception); - consumer.maybeInvokeCommitCallbacks(); - assertSame(exception.getClass(), callback.exception.getClass()); - } + assertSame(exception.getClass(), callback.exception.getClass()); } private static Stream commitExceptionSupplier() { return Stream.of( - null, // For the successful completion scenario new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")); } @Test - public void testFencedInstanceException() { - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync()); - future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); + public void testCommitAsyncWithFencedException() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); + MockCommitCallback callback = new MockCommitCallback(); + + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); + + assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); } @Test public void testCommitted() { - Map offsets = mockTopicPartitionOffset(); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(offsets); + consumer = newConsumer(); + Map topicPartitionOffsets = mockTopicPartitionOffset(); + completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); - } + assertEquals(topicPartitionOffsets, consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), any()); } @Test public void testCommittedLeaderEpochUpdate() { + consumer = newConsumer(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); final TopicPartition t2 = new TopicPartition("t0", 4); @@ -284,38 +281,38 @@ public class AsyncKafkaConsumerTest { topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, null); topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L, Optional.of(3), "")); + completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(topicPartitionOffsets); + assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t2, 3); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t2, 3); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), any()); } @Test - public void testCommitted_ExceptionThrown() { + public void testCommittedExceptionThrown() { + consumer = newConsumer(); Map offsets = mockTopicPartitionOffset(); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.completeExceptionally(new KafkaException("Test exception")); + when(applicationEventHandler.addAndGet(any(), any())).thenAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + throw new KafkaException("Test exception"); + }); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); - } + assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); } @Test public void testWakeupBeforeCallingPoll() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); consumer.wakeup(); @@ -326,6 +323,7 @@ public class AsyncKafkaConsumerTest { @Test public void testWakeupAfterEmptyFetch() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -334,7 +332,8 @@ public class AsyncKafkaConsumerTest { return Fetch.empty(); }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); @@ -343,6 +342,7 @@ public class AsyncKafkaConsumerTest { @Test public void testWakeupAfterNonEmptyFetch() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -355,7 +355,8 @@ public class AsyncKafkaConsumerTest { return Fetch.forPartition(tp, records, true); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored @@ -366,6 +367,7 @@ public class AsyncKafkaConsumerTest { @Test public void testClearWakeupTriggerAfterPoll() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -376,7 +378,8 @@ public class AsyncKafkaConsumerTest { doReturn(Fetch.forPartition(tp, records, true)) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); consumer.poll(Duration.ZERO); @@ -386,37 +389,29 @@ public class AsyncKafkaConsumerTest { @Test public void testEnsureCallbackExecutedByApplicationThread() { + consumer = newConsumer(); final String currentThread = Thread.currentThread().getName(); - ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); MockCommitCallback callback = new MockCommitCallback(); - CountDownLatch latch = new CountDownLatch(1); // Initialize the latch with a count of 1 - try { - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - // Simulating some background work - backgroundExecutor.submit(() -> { - future.complete(null); - latch.countDown(); - }); - latch.await(); - assertEquals(1, consumer.callbacks()); - consumer.maybeInvokeCommitCallbacks(); - assertEquals(currentThread, callback.completionThread); - } catch (Exception e) { - fail("Not expecting an exception"); - } finally { - backgroundExecutor.shutdown(); - } + completeCommitApplicationEventExceptionally(); + + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + assertEquals(1, consumer.callbacks()); + forceCommitCallbackInvocation(); + assertEquals(currentThread, callback.completionThread); } @Test public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.completeExceptionally(new NetworkException("Test exception")); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(new NetworkException("Test exception")); + assertMockCommitCallbackInvoked(() -> consumer.commitSync(), callback, Errors.NETWORK_EXCEPTION); @@ -425,6 +420,7 @@ public class AsyncKafkaConsumerTest { @Test @SuppressWarnings("deprecation") public void testPollLongThrowsException() { + consumer = newConsumer(); Exception e = assertThrows(UnsupportedOperationException.class, () -> consumer.poll(0L)); assertEquals("Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". " + "This method is deprecated and will be removed in the next major release.", e.getMessage()); @@ -432,27 +428,26 @@ public class AsyncKafkaConsumerTest { @Test public void testCommitSyncLeaderEpochUpdate() { + consumer = newConsumer(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); HashMap topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); + completeCommitApplicationEventExceptionally(); consumer.assign(Arrays.asList(t0, t1)); - CompletableFuture commitFuture = new CompletableFuture<>(); - commitFuture.complete(null); + assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); - try (MockedConstruction ignored = commitEventMocker(commitFuture)) { - assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t1, 1); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t1, 1); verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } @Test public void testCommitAsyncLeaderEpochUpdate() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -462,25 +457,23 @@ public class AsyncKafkaConsumerTest { consumer.assign(Arrays.asList(t0, t1)); - CompletableFuture commitFuture = new CompletableFuture<>(); - commitFuture.complete(null); + assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); - try (MockedConstruction ignored = commitEventMocker(commitFuture)) { - assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t1, 1); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t1, 1); verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } @Test public void testEnsurePollExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); + completeCommitApplicationEventExceptionally(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); + consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); - doReturn(future).when(consumer).commit(new HashMap<>(), false); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.complete(null); assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback, null); @@ -488,11 +481,10 @@ public class AsyncKafkaConsumerTest { @Test public void testEnsureShutdownExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); + completeCommitApplicationEventExceptionally(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.complete(null); assertMockCommitCallbackInvoked(() -> consumer.close(), callback, null); @@ -506,7 +498,7 @@ public class AsyncKafkaConsumerTest { if (errors == null) assertNull(callback.exception); else if (errors.exception() instanceof RetriableException) - assertTrue(callback.exception instanceof RetriableCommitFailedException); + assertInstanceOf(RetriableCommitFailedException.class, callback.exception); } private static class MockCommitCallback implements OffsetCommitCallback { @@ -521,57 +513,10 @@ public class AsyncKafkaConsumerTest { this.exception = exception; } } - /** - * This is a rather ugly bit of code. Not my choice :( - * - *

- * - * Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an - * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds the partitions and internally holds a - * {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as - * returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that - * is created, we can affect that behavior. - */ - private static MockedConstruction offsetFetchEventMocker(CompletableFuture> future) { - // This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method - Answer> getInvocationAnswer = invocation -> { - // This argument captures the actual argument value that was passed to the event's get() method, so we - // just "forward" that value to our mocked call - Timer timer = invocation.getArgument(0); - return ConsumerUtils.getResult(future, timer); - }; - - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { - // When the event's get() method is invoked, we call the "answer" method just above - when(mock.get(any())).thenAnswer(getInvocationAnswer); - - // When the event's type() method is invoked, we have to return the type as it will be null in the mock - when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS); - - // This is needed for the WakeupTrigger code that keeps track of the active task - when(mock.future()).thenReturn(future); - }; - - return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, mockInitializer); - } - - private static MockedConstruction commitEventMocker(CompletableFuture future) { - Answer getInvocationAnswer = invocation -> { - Timer timer = invocation.getArgument(0); - return ConsumerUtils.getResult(future, timer); - }; - - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { - when(mock.get(any())).thenAnswer(getInvocationAnswer); - when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT); - when(mock.future()).thenReturn(future); - }; - - return mockConstruction(CommitApplicationEvent.class, mockInitializer); - } @Test public void testAssign() { + consumer = newConsumer(); final TopicPartition tp = new TopicPartition("foo", 3); consumer.assign(singleton(tp)); assertTrue(consumer.subscription().isEmpty()); @@ -582,11 +527,15 @@ public class AsyncKafkaConsumerTest { @Test public void testAssignOnNullTopicPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(null)); } @Test public void testAssignOnEmptyTopicPartition() { + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); + consumer.assign(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -594,68 +543,77 @@ public class AsyncKafkaConsumerTest { @Test public void testAssignOnNullTopicInPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0)))); } @Test public void testAssignOnEmptyTopicInPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0)))); } @Test public void testBeginningOffsetsFailsIfNullPartitions() { + consumer = newConsumer(); assertThrows(NullPointerException.class, () -> consumer.beginningOffsets(null, - Duration.ofMillis(1))); + Duration.ofMillis(1))); } @Test public void testBeginningOffsets() { + consumer = newConsumer(); Map expectedOffsetsAndTimestamp = - mockOffsetAndTimestamp(); + mockOffsetAndTimestamp(); Set partitions = expectedOffsetsAndTimestamp.keySet(); doReturn(expectedOffsetsAndTimestamp).when(applicationEventHandler).addAndGet(any(), any()); Map result = - assertDoesNotThrow(() -> consumer.beginningOffsets(partitions, - Duration.ofMillis(1))); + assertDoesNotThrow(() -> consumer.beginningOffsets(partitions, + Duration.ofMillis(1))); Map expectedOffsets = expectedOffsetsAndTimestamp.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); assertEquals(expectedOffsets, result); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() { + consumer = newConsumer(); Set partitions = mockTopicPartitionOffset().keySet(); Throwable eventProcessingFailure = new KafkaException("Unexpected failure " + - "processing List Offsets event"); + "processing List Offsets event"); doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(any(), any()); Throwable consumerError = assertThrows(KafkaException.class, - () -> consumer.beginningOffsets(partitions, - Duration.ofMillis(1))); + () -> consumer.beginningOffsets(partitions, + Duration.ofMillis(1))); assertEquals(eventProcessingFailure, consumerError); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), + ArgumentMatchers.isA(Timer.class)); } @Test public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() { + consumer = newConsumer(); doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any(), any()); assertThrows(TimeoutException.class, - () -> consumer.beginningOffsets( - Collections.singletonList(new TopicPartition("t1", 0)), - Duration.ofMillis(1))); + () -> consumer.beginningOffsets( + Collections.singletonList(new TopicPartition("t1", 0)), + Duration.ofMillis(1))); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test public void testOffsetsForTimesOnNullPartitions() { + consumer = newConsumer(); assertThrows(NullPointerException.class, () -> consumer.offsetsForTimes(null, - Duration.ofMillis(1))); + Duration.ofMillis(1))); } @Test public void testOffsetsForTimesFailsOnNegativeTargetTimes() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition( "topic1", 1), ListOffsetsRequest.EARLIEST_TIMESTAMP), @@ -674,6 +632,7 @@ public class AsyncKafkaConsumerTest { @Test public void testOffsetsForTimes() { + consumer = newConsumer(); Map expectedResult = mockOffsetAndTimestamp(); Map timestampToSearch = mockTimestampToSearch(); @@ -690,6 +649,7 @@ public class AsyncKafkaConsumerTest { // OffsetAndTimestamp as value. @Test public void testOffsetsForTimesWithZeroTimeout() { + consumer = newConsumer(); TopicPartition tp = new TopicPartition("topic1", 0); Map expectedResult = Collections.singletonMap(tp, null); @@ -700,18 +660,32 @@ public class AsyncKafkaConsumerTest { Duration.ofMillis(0))); assertEquals(expectedResult, result); verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test - public void testWakeup_committed() { + public void testWakeupCommitted() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); + doAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + Timer timer = invocation.getArgument(1); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + assertTrue(event.future().isCompletedExceptionally()); + return ConsumerUtils.getResult(event.future(), timer); + }) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + consumer.wakeup(); - assertThrows(WakeupException.class, () -> consumer.committed(mockTopicPartitionOffset().keySet())); - assertNoPendingWakeup(consumer.wakeupTrigger()); + assertThrows(WakeupException.class, () -> consumer.committed(offsets.keySet())); + assertNull(consumer.wakeupTrigger().getPendingTask()); } + @Test public void testRefreshCommittedOffsetsSuccess() { + consumer = newConsumer(); TopicPartition partition = new TopicPartition("t1", 1); Set partitions = Collections.singleton(partition); Map committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L)); @@ -720,6 +694,7 @@ public class AsyncKafkaConsumerTest { @Test public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() { + consumer = newConsumer(); TopicPartition partition = new TopicPartition("t1", 1); Set partitions = Collections.singleton(partition); Map committedOffsets = Collections.emptyMap(); @@ -728,18 +703,20 @@ public class AsyncKafkaConsumerTest { @Test public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { + consumer = newConsumer(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true); } @Test public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { // Create consumer without group id so committed offsets are not used for updating positions - resetWithEmptyGroupId(); + consumer = newConsumerWithoutGroupId(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false); } @Test public void testSubscribeGeneratesEvent() { + consumer = newConsumer(); String topic = "topic1"; consumer.subscribe(singletonList(topic)); assertEquals(singleton(topic), consumer.subscription()); @@ -749,20 +726,21 @@ public class AsyncKafkaConsumerTest { @Test public void testUnsubscribeGeneratesUnsubscribeEvent() { - consumer.unsubscribe(); + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); - // Verify the unsubscribe event was generated and mock its completion. - final ArgumentCaptor captor = ArgumentCaptor.forClass(UnsubscribeApplicationEvent.class); - verify(applicationEventHandler).add(captor.capture()); - UnsubscribeApplicationEvent unsubscribeApplicationEvent = captor.getValue(); - unsubscribeApplicationEvent.future().complete(null); + consumer.unsubscribe(); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); + verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class)); } @Test public void testSubscribeToEmptyListActsAsUnsubscribe() { + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); + consumer.subscribe(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -771,16 +749,19 @@ public class AsyncKafkaConsumerTest { @Test public void testSubscribeToNullTopicCollection() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((List) null)); } @Test public void testSubscriptionOnNullTopic() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(null))); } @Test public void testSubscriptionOnEmptyTopic() { + consumer = newConsumer(); String emptyTopic = " "; assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } @@ -789,34 +770,30 @@ public class AsyncKafkaConsumerTest { public void testGroupMetadataAfterCreationWithGroupIdIsNull() { final Properties props = requiredConsumerProperties(); final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata); - assertEquals( - "To use the group management or offset commit APIs, you must " + - "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.", - exception.getMessage() - ); - } + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata); + assertEquals( + "To use the group management or offset commit APIs, you must " + + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.", + exception.getMessage() + ); } @Test public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.empty(), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } + assertEquals(groupId, groupMetadata.groupId()); + assertEquals(Optional.empty(), groupMetadata.groupInstanceId()); + assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); } @Test @@ -826,49 +803,48 @@ public class AsyncKafkaConsumerTest { final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } + assertEquals(groupId, groupMetadata.groupId()); + assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); + assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final int generation = 1; - final String memberId = "newMemberId"; - final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata( - groupId, - generation, - memberId, - Optional.empty() - ); - final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent( - generation, - memberId - ); - backgroundEventQueue.add(groupMetadataUpdateEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - consumer.poll(Duration.ZERO); + consumer = newConsumer(config); - final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); - assertEquals(expectedGroupMetadata, actualGroupMetadata); + final int generation = 1; + final String memberId = "newMemberId"; + final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata( + groupId, + generation, + memberId, + Optional.empty() + ); + final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent( + generation, + memberId + ); + backgroundEventQueue.add(groupMetadataUpdateEvent); + consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer.poll(Duration.ZERO); - final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata(); + final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata(); - assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate); - } + assertEquals(expectedGroupMetadata, actualGroupMetadata); + + final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata(); + + assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate); } /** @@ -890,11 +866,13 @@ public class AsyncKafkaConsumerTest { int expectedRevokedCount, int expectedAssignedCount, int expectedLostCount) { + consumer = newConsumer(); CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( revokedError, assignedError, lostError ); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.subscribe(Collections.singletonList("topic"), consumerRebalanceListener); SortedSet partitions = Collections.emptySortedSet(); @@ -947,40 +925,36 @@ public class AsyncKafkaConsumerTest { public void testBackgroundError() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); - backgroundEventQueue.add(errorBackgroundEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer = newConsumer(config); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition"); + final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); + backgroundEventQueue.add(errorBackgroundEvent); + consumer.assign(singletonList(new TopicPartition("topic", 0))); - assertEquals(expectedException.getMessage(), exception.getMessage()); - } + final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + + assertEquals(expectedException.getMessage(), exception.getMessage()); } @Test public void testMultipleBackgroundErrors() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1); - backgroundEventQueue.add(errorBackgroundEvent1); - final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); - final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); - backgroundEventQueue.add(errorBackgroundEvent2); - consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer = newConsumer(config); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition"); + final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1); + backgroundEventQueue.add(errorBackgroundEvent1); + final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); + final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); + backgroundEventQueue.add(errorBackgroundEvent2); + consumer.assign(singletonList(new TopicPartition("topic", 0))); - assertEquals(expectedException1.getMessage(), exception.getMessage()); - assertTrue(backgroundEventQueue.isEmpty()); - } + final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + + assertEquals(expectedException1.getMessage(), exception.getMessage()); + assertTrue(backgroundEventQueue.isEmpty()); } @Test @@ -988,11 +962,9 @@ public class AsyncKafkaConsumerTest { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } @Test @@ -1002,11 +974,9 @@ public class AsyncKafkaConsumerTest { props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } @Test @@ -1016,44 +986,33 @@ public class AsyncKafkaConsumerTest { props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } - + @Test public void testGroupIdNull() { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } - @Disabled("Flaky test temporarily disabled - in review") @Test public void testGroupIdNotNullAndValid() { final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } @Test @@ -1072,7 +1031,7 @@ public class AsyncKafkaConsumerTest { final Exception exception = assertThrows( KafkaException.class, - () -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()) + () -> consumer = newConsumer(config) ); assertEquals("Failed to construct kafka consumer", exception.getMessage()); @@ -1093,54 +1052,52 @@ public class AsyncKafkaConsumerTest { } private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { - // Uncompleted future that will time out if used - CompletableFuture> committedFuture = new CompletableFuture<>(); + completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.assign(singleton(new TopicPartition("t1", 1))); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 250ms timeout to give the background thread time to process the events without timing out - consumer.poll(Duration.ofMillis(250)); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + + if (committedOffsetsEnabled) { + // Verify there was an FetchCommittedOffsets event and no ResetPositions event verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - - if (committedOffsetsEnabled) { - // Verify there was an FetchCommittedOffsets event and no ResetPositions event - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - } else { - // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - } - } - } - - private void testRefreshCommittedOffsetsSuccess(Set partitions, - Map committedOffsets) { - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(committedOffsets); - consumer.assign(partitions); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 250ms timeout to give the background thread time to process the events without timing out - consumer.poll(Duration.ofMillis(250)); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, never()) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + } else { + // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions + verify(applicationEventHandler, never()) .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); verify(applicationEventHandler, atLeast(1)) .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); } } + private void testRefreshCommittedOffsetsSuccess(Set partitions, + Map committedOffsets) { + completeFetchedCommittedOffsetApplicationEventSuccessfully(committedOffsets); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + + consumer.assign(partitions); + + consumer.poll(Duration.ZERO); + + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + } + @Test public void testLongPollWaitIsLimited() { + consumer = newConsumer(); String topicName = "topic1"; consumer.subscribe(singletonList(topicName)); @@ -1157,7 +1114,7 @@ public class AsyncKafkaConsumerTest { // On the first iteration, return no data; on the second, return two records doAnswer(invocation -> { // Mock the subscription being assigned as the first fetch is collected - subscriptions.assignFromSubscribed(Collections.singleton(tp)); + consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp)); return Fetch.empty(); }).doAnswer(invocation -> { return Fetch.forPartition(tp, records, true); @@ -1177,6 +1134,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsWithInitialDelay() throws Exception { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); CompletableFuture future = mock(CompletableFuture.class); @@ -1212,6 +1170,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsWithoutDelay() { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); @@ -1233,6 +1192,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsTimesOut() throws Exception { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); CompletableFuture future = mock(CompletableFuture.class); @@ -1251,10 +1211,6 @@ public class AsyncKafkaConsumerTest { } } - private void assertNoPendingWakeup(final WakeupTrigger wakeupTrigger) { - assertNull(wakeupTrigger.getPendingTask()); - } - private HashMap mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -1282,45 +1238,45 @@ public class AsyncKafkaConsumerTest { return timestampToSearch; } - private void prepAutocommitOnClose() { - Node node = testBuilder.metadata.fetch().nodes().get(0); - testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - if (!testBuilder.subscriptions.allConsumed().isEmpty()) { - List topicPartitions = new ArrayList<>(testBuilder.subscriptions.assignedPartitionsList()); - testBuilder.client.prepareResponse(mockAutocommitResponse( - topicPartitions, - (short) 1, - Errors.NONE).responseBody()); - } + private void completeCommitApplicationEventExceptionally(Exception ex) { + doAnswer(invocation -> { + CommitApplicationEvent event = invocation.getArgument(0); + event.future().completeExceptionally(ex); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } - private ClientResponse mockAutocommitResponse(final List topicPartitions, - final short apiKeyVersion, - final Errors error) { - OffsetCommitResponseData responseData = new OffsetCommitResponseData(); - List responseTopics = new ArrayList<>(); - topicPartitions.forEach(tp -> { - responseTopics.add(new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName(tp.topic()) - .setPartitions(Collections.singletonList( - new OffsetCommitResponseData.OffsetCommitResponsePartition() - .setErrorCode(error.code()) - .setPartitionIndex(tp.partition())))); - }); - responseData.setTopics(responseTopics); - OffsetCommitResponse response = mock(OffsetCommitResponse.class); - when(response.data()).thenReturn(responseData); - return new ClientResponse( - new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1), - null, - "-1", - testBuilder.time.milliseconds(), - testBuilder.time.milliseconds(), - false, - null, - null, - new OffsetCommitResponse(responseData) - ); + private void completeCommitApplicationEventExceptionally() { + doAnswer(invocation -> { + CommitApplicationEvent event = invocation.getArgument(0); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + } + + private void completeFetchedCommittedOffsetApplicationEventSuccessfully(final Map committedOffsets) { + doReturn(committedOffsets) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + } + + private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) { + doThrow(ex) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + } + + private void completeUnsubscribeApplicationEventSuccessfully() { + doAnswer(invocation -> { + UnsubscribeApplicationEvent event = invocation.getArgument(0); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class)); + } + + private void forceCommitCallbackInvocation() { + // Invokes callback + consumer.commitAsync(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 716100613b5..f0d42a8ed3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -21,9 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; @@ -38,10 +36,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import java.io.Closeable; -import java.time.Duration; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.BlockingQueue; @@ -101,10 +96,6 @@ public class ConsumerTestBuilder implements Closeable { final MockClient client; final Optional groupInfo; - public ConsumerTestBuilder() { - this(Optional.empty()); - } - public ConsumerTestBuilder(Optional groupInfo) { this(groupInfo, true, true); } @@ -315,78 +306,6 @@ public class ConsumerTestBuilder implements Closeable { } } - public static class ApplicationEventHandlerTestBuilder extends ConsumerTestBuilder { - - public final ApplicationEventHandler applicationEventHandler; - - public ApplicationEventHandlerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) { - super(groupInfo, enableAutoCommit, enableAutoTick); - this.applicationEventHandler = spy(new ApplicationEventHandler( - logContext, - time, - applicationEventQueue, - () -> applicationEventProcessor, - () -> networkClientDelegate, - () -> requestManagers)); - } - - @Override - public void close() { - closeQuietly(applicationEventHandler, ApplicationEventHandler.class.getSimpleName()); - } - } - - public static class AsyncKafkaConsumerTestBuilder extends ApplicationEventHandlerTestBuilder { - - final AsyncKafkaConsumer consumer; - - final FetchCollector fetchCollector; - - public AsyncKafkaConsumerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) { - super(groupInfo, enableAutoCommit, enableAutoTick); - String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); - List assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) - ); - Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); - this.fetchCollector = spy(new FetchCollector<>(logContext, - metadata, - subscriptions, - fetchConfig, - deserializers, - metricsManager, - time)); - this.consumer = spy(new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - new FetchBuffer(logContext), - fetchCollector, - new ConsumerInterceptors<>(Collections.emptyList()), - time, - applicationEventHandler, - backgroundEventQueue, - rebalanceListenerInvoker, - metrics, - subscriptions, - metadata, - retryBackoffMs, - 60000, - assignors, - groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null))); - } - - @Override - public void close() { - consumer.close(); - } - - public void close(final Duration timeout) { - consumer.close(timeout); - } - } - public static class GroupInformation { final GroupState groupState;