From 0cb7d747fbd94ae2387cdd1da5729b4bbbec6f2a Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 18 Dec 2023 22:41:57 +0100 Subject: [PATCH] KAFKA-15913: Migrate async consumer tests to mocks (#14930) Use mocks to test the AsyncKafkaConsumer Eliminate the use of ConsumerTestBuilder Mock all resources that were previously retrieved via leaking the background thread with mockito spys Always use the default constructor of AsyncKafkaConsumer as much as possible, inject mocks via factories. Timeouts are mocked directly by timeout exceptions instead of waiting for futures to time out. I did not port the autocommit mocking code, because it was mostly testing the integration of foreground and background thread (or making the spy's work which broke during the autocommit on close) and is currently being reimplemented anyway. New test runs 10x faster. Reviewers: Bruno Cadonna --- .../internals/AsyncKafkaConsumer.java | 118 +-- .../internals/AsyncKafkaConsumerTest.java | 796 +++++++++--------- .../internals/ConsumerTestBuilder.java | 81 -- 3 files changed, 443 insertions(+), 552 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index d34316e142c..2d5906320b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -94,7 +94,6 @@ import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; @@ -287,12 +286,26 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { - this(config, keyDeserializer, valueDeserializer, new LinkedBlockingQueue<>()); + this( + config, + keyDeserializer, + valueDeserializer, + Time.SYSTEM, + ApplicationEventHandler::new, + FetchCollector::new, + ConsumerMetadata::new, + new LinkedBlockingQueue<>() + ); } + // Visible for testing AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer, + final Time time, + final ApplicationEventHandlerFactory applicationEventHandlerFactory, + final FetchCollectorFactory fetchCollectorFactory, + final ConsumerMetadataFactory metadataFactory, final LinkedBlockingQueue backgroundEventQueue) { try { GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( @@ -305,7 +318,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.debug("Initializing the Kafka consumer"); this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); - this.time = Time.SYSTEM; + this.time = time; List reporters = CommonClientConfigs.metricsReporters(clientId, config); this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(clientId, config); this.clientTelemetryReporter.ifPresent(reporters::add); @@ -319,7 +332,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(), interceptorList, Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer)); - this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners); + this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners); final List addresses = ClientUtils.parseAndValidateAddresses(config); metadata.bootstrap(addresses); @@ -360,7 +373,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { metadata, applicationEventQueue, requestManagersSupplier); - this.applicationEventHandler = new ApplicationEventHandler(logContext, + this.applicationEventHandler = applicationEventHandlerFactory.build( + logContext, time, applicationEventQueue, applicationEventProcessorSupplier, @@ -391,7 +405,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.groupMetadata = initializeGroupMetadata(config, groupRebalanceConfig); // The FetchCollector is only used on the application thread. - this.fetchCollector = new FetchCollector<>(logContext, + this.fetchCollector = fetchCollectorFactory.build(logContext, metadata, subscriptions, fetchConfig, @@ -420,49 +434,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } // Visible for testing - AsyncKafkaConsumer(LogContext logContext, - String clientId, - Deserializers deserializers, - FetchBuffer fetchBuffer, - FetchCollector fetchCollector, - ConsumerInterceptors interceptors, - Time time, - ApplicationEventHandler applicationEventHandler, - BlockingQueue backgroundEventQueue, - ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, - Metrics metrics, - SubscriptionState subscriptions, - ConsumerMetadata metadata, - long retryBackoffMs, - int defaultApiTimeoutMs, - List assignors, - String groupId) { - this.log = logContext.logger(getClass()); - this.subscriptions = subscriptions; - this.clientId = clientId; - this.fetchBuffer = fetchBuffer; - this.fetchCollector = fetchCollector; - this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; - this.interceptors = Objects.requireNonNull(interceptors); - this.time = time; - this.backgroundEventProcessor = new BackgroundEventProcessor( - logContext, - backgroundEventQueue, - applicationEventHandler, - rebalanceListenerInvoker - ); - this.metrics = metrics; - this.groupMetadata = initializeGroupMetadata(groupId, Optional.empty()); - this.metadata = metadata; - this.retryBackoffMs = retryBackoffMs; - this.defaultApiTimeoutMs = defaultApiTimeoutMs; - this.deserializers = deserializers; - this.applicationEventHandler = applicationEventHandler; - this.assignors = assignors; - this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); - this.clientTelemetryReporter = Optional.empty(); - } - AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig config, @@ -563,6 +534,47 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { ); } + // auxiliary interface for testing + interface ApplicationEventHandlerFactory { + + ApplicationEventHandler build( + final LogContext logContext, + final Time time, + final BlockingQueue applicationEventQueue, + final Supplier applicationEventProcessorSupplier, + final Supplier networkClientDelegateSupplier, + final Supplier requestManagersSupplier + ); + + } + + // auxiliary interface for testing + interface FetchCollectorFactory { + + FetchCollector build( + final LogContext logContext, + final ConsumerMetadata metadata, + final SubscriptionState subscriptions, + final FetchConfig fetchConfig, + final Deserializers deserializers, + final FetchMetricsManager metricsManager, + final Time time + ); + + } + + // auxiliary interface for testing + interface ConsumerMetadataFactory { + + ConsumerMetadata build( + final ConsumerConfig config, + final SubscriptionState subscriptions, + final LogContext logContext, + final ClusterResourceListeners clusterResourceListeners + ); + + } + private Optional initializeGroupMetadata(final ConsumerConfig config, final GroupRebalanceConfig groupRebalanceConfig) { final Optional groupMetadata = initializeGroupMetadata( @@ -1756,8 +1768,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - // Visible for testing - void maybeInvokeCommitCallbacks() { + private void maybeInvokeCommitCallbacks() { if (callbacks() > 0) { invoker.executeCallbacks(); } @@ -1768,6 +1779,11 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { return invoker.callbackQueue.size(); } + // Visible for testing + SubscriptionState subscriptions() { + return subscriptions; + } + /** * Utility class that helps the application thread to invoke user registered {@link OffsetCommitCallback}. This is * achieved by having the background thread register a {@link OffsetCommitCallbackTask} to the invoker upon the diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index cad3bc58a41..19734698afb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata.LeaderAndEpoch; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -27,11 +27,11 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.RetriableCommitFailedException; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; @@ -45,7 +45,6 @@ import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeAppl import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidGroupIdException; @@ -53,22 +52,14 @@ import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.message.OffsetCommitResponseData; -import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; -import org.apache.kafka.common.requests.OffsetCommitResponse; -import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; @@ -76,17 +67,12 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; -import org.mockito.MockedConstruction; import org.mockito.Mockito; -import org.mockito.stubbing.Answer; -import org.opentest4j.AssertionFailedError; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -94,11 +80,8 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -117,166 +100,180 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("unchecked") public class AsyncKafkaConsumerTest { - private AsyncKafkaConsumer consumer; - private FetchCollector fetchCollector; - private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder; - private ApplicationEventHandler applicationEventHandler; - private SubscriptionState subscriptions; - private BlockingQueue backgroundEventQueue; + private AsyncKafkaConsumer consumer = null; - @BeforeEach - public void setup() { - // By default, the consumer is part of a group and autoCommit is enabled. - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); - } - - private void setup(Optional groupInfo, boolean enableAutoCommit) { - testBuilder = new ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit, true); - applicationEventHandler = testBuilder.applicationEventHandler; - consumer = testBuilder.consumer; - fetchCollector = testBuilder.fetchCollector; - subscriptions = testBuilder.subscriptions; - backgroundEventQueue = testBuilder.backgroundEventQueue; - } + private final Time time = new MockTime(1); + private final FetchCollector fetchCollector = mock(FetchCollector.class); + private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class); + private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); + private final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); @AfterEach - public void cleanup() { - if (testBuilder != null) { - shutDown(); + public void resetAll() { + backgroundEventQueue.clear(); + if (consumer != null) { + consumer.close(); } + consumer = null; + Mockito.framework().clearInlineMocks(); } - private void shutDown() { - prepAutocommitOnClose(); - testBuilder.close(); + private AsyncKafkaConsumer newConsumer() { + final Properties props = requiredConsumerProperties(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); } - private void resetWithEmptyGroupId() { - // Create a consumer that is not configured as part of a group. - cleanup(); - setup(Optional.empty(), false); + private AsyncKafkaConsumer newConsumerWithoutGroupId() { + final Properties props = requiredConsumerProperties(); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); } - private void resetWithAutoCommitEnabled() { - cleanup(); - setup(ConsumerTestBuilder.createDefaultGroupInformation(), true); + @SuppressWarnings("UnusedReturnValue") + private AsyncKafkaConsumer newConsumerWithEmptyGroupId() { + final Properties props = requiredConsumerPropertiesAndGroupId(""); + final ConsumerConfig config = new ConsumerConfig(props); + return newConsumer(config); + } + + private AsyncKafkaConsumer newConsumer(ConsumerConfig config) { + return new AsyncKafkaConsumer<>( + config, + new StringDeserializer(), + new StringDeserializer(), + time, + (a, b, c, d, e, f) -> applicationEventHandler, + (a, b, c, d, e, f, g) -> fetchCollector, + (a, b, c, d) -> metadata, + backgroundEventQueue + ); } @Test public void testSuccessfulStartupShutdown() { + consumer = newConsumer(); assertDoesNotThrow(() -> consumer.close()); } - @Test - public void testSuccessfulStartupShutdownWithAutoCommit() { - resetWithAutoCommitEnabled(); - TopicPartition tp = new TopicPartition("topic", 0); - consumer.assign(singleton(tp)); - consumer.seek(tp, 100); - prepAutocommitOnClose(); - } - @Test public void testInvalidGroupId() { - // Create consumer without group id - resetWithEmptyGroupId(); - assertThrows(InvalidGroupIdException.class, () -> consumer.committed(new HashSet<>())); + KafkaException e = assertThrows(KafkaException.class, this::newConsumerWithEmptyGroupId); + assertInstanceOf(InvalidGroupIdException.class, e.getCause()); } @Test public void testFailOnClosedConsumer() { + consumer = newConsumer(); consumer.close(); final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); assertEquals("This consumer has already been closed.", res.getMessage()); } @Test - public void testCommitAsync_NullCallback() throws InterruptedException { - CompletableFuture future = new CompletableFuture<>(); - Map offsets = new HashMap<>(); - offsets.put(new TopicPartition("my-topic", 0), new OffsetAndMetadata(100L)); - offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + public void testCommitAsyncWithNullCallback() { + consumer = newConsumer(); + final TopicPartition t0 = new TopicPartition("t0", 2); + final TopicPartition t1 = new TopicPartition("t0", 3); + HashMap offsets = new HashMap<>(); + offsets.put(t0, new OffsetAndMetadata(10L)); + offsets.put(t1, new OffsetAndMetadata(20L)); - doReturn(future).when(consumer).commit(offsets, false); consumer.commitAsync(offsets, null); - future.complete(null); - TestUtils.waitForCondition(future::isDone, - 2000, - "commit future should complete"); - assertFalse(future.isCompletedExceptionally()); + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + assertEquals(offsets, commitEvent.offsets()); + assertDoesNotThrow(() -> commitEvent.future().complete(null)); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, null)); + } + + @Test + public void testCommitAsyncUserSuppliedCallbackNoException() { + consumer = newConsumer(); + + Map offsets = new HashMap<>(); + offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + completeCommitApplicationEventExceptionally(); + + MockCommitCallback callback = new MockCommitCallback(); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + forceCommitCallbackInvocation(); + + assertNull(callback.exception); } @ParameterizedTest @MethodSource("commitExceptionSupplier") - public void testCommitAsync_UserSuppliedCallback(Exception exception) { - CompletableFuture future = new CompletableFuture<>(); + public void testCommitAsyncUserSuppliedCallbackWithException(Exception exception) { + consumer = newConsumer(); Map offsets = new HashMap<>(); offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L)); + completeCommitApplicationEventExceptionally(exception); - doReturn(future).when(consumer).commit(offsets, false); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + forceCommitCallbackInvocation(); - if (exception == null) { - future.complete(null); - consumer.maybeInvokeCommitCallbacks(); - assertNull(callback.exception); - } else { - future.completeExceptionally(exception); - consumer.maybeInvokeCommitCallbacks(); - assertSame(exception.getClass(), callback.exception.getClass()); - } + assertSame(exception.getClass(), callback.exception.getClass()); } private static Stream commitExceptionSupplier() { return Stream.of( - null, // For the successful completion scenario new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")); } @Test - public void testFencedInstanceException() { - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync()); - future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); + public void testCommitAsyncWithFencedException() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); + MockCommitCallback callback = new MockCommitCallback(); + + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception()); + + assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> consumer.commitAsync()); } @Test public void testCommitted() { - Map offsets = mockTopicPartitionOffset(); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(offsets); + consumer = newConsumer(); + Map topicPartitionOffsets = mockTopicPartitionOffset(); + completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); - } + assertEquals(topicPartitionOffsets, consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), any()); } @Test public void testCommittedLeaderEpochUpdate() { + consumer = newConsumer(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); final TopicPartition t2 = new TopicPartition("t0", 4); @@ -284,38 +281,38 @@ public class AsyncKafkaConsumerTest { topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, null); topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L, Optional.of(3), "")); + completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(topicPartitionOffsets); + assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t2, 3); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t2, 3); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), any()); } @Test - public void testCommitted_ExceptionThrown() { + public void testCommittedExceptionThrown() { + consumer = newConsumer(); Map offsets = mockTopicPartitionOffset(); - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.completeExceptionally(new KafkaException("Test exception")); + when(applicationEventHandler.addAndGet(any(), any())).thenAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + throw new KafkaException("Test exception"); + }); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); - } + assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); } @Test public void testWakeupBeforeCallingPoll() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); consumer.wakeup(); @@ -326,6 +323,7 @@ public class AsyncKafkaConsumerTest { @Test public void testWakeupAfterEmptyFetch() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -334,7 +332,8 @@ public class AsyncKafkaConsumerTest { return Fetch.empty(); }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); @@ -343,6 +342,7 @@ public class AsyncKafkaConsumerTest { @Test public void testWakeupAfterNonEmptyFetch() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -355,7 +355,8 @@ public class AsyncKafkaConsumerTest { return Fetch.forPartition(tp, records, true); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored @@ -366,6 +367,7 @@ public class AsyncKafkaConsumerTest { @Test public void testClearWakeupTriggerAfterPoll() { + consumer = newConsumer(); final String topicName = "foo"; final int partition = 3; final TopicPartition tp = new TopicPartition(topicName, partition); @@ -376,7 +378,8 @@ public class AsyncKafkaConsumerTest { doReturn(Fetch.forPartition(tp, records, true)) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); consumer.assign(singleton(tp)); consumer.poll(Duration.ZERO); @@ -386,37 +389,29 @@ public class AsyncKafkaConsumerTest { @Test public void testEnsureCallbackExecutedByApplicationThread() { + consumer = newConsumer(); final String currentThread = Thread.currentThread().getName(); - ExecutorService backgroundExecutor = Executors.newSingleThreadExecutor(); MockCommitCallback callback = new MockCommitCallback(); - CountDownLatch latch = new CountDownLatch(1); // Initialize the latch with a count of 1 - try { - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - // Simulating some background work - backgroundExecutor.submit(() -> { - future.complete(null); - latch.countDown(); - }); - latch.await(); - assertEquals(1, consumer.callbacks()); - consumer.maybeInvokeCommitCallbacks(); - assertEquals(currentThread, callback.completionThread); - } catch (Exception e) { - fail("Not expecting an exception"); - } finally { - backgroundExecutor.shutdown(); - } + completeCommitApplicationEventExceptionally(); + + assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); + assertEquals(1, consumer.callbacks()); + forceCommitCallbackInvocation(); + assertEquals(currentThread, callback.completionThread); } @Test public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); - assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.completeExceptionally(new NetworkException("Test exception")); + assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(CommitApplicationEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final CommitApplicationEvent commitEvent = commitEventCaptor.getValue(); + commitEvent.future().completeExceptionally(new NetworkException("Test exception")); + assertMockCommitCallbackInvoked(() -> consumer.commitSync(), callback, Errors.NETWORK_EXCEPTION); @@ -425,6 +420,7 @@ public class AsyncKafkaConsumerTest { @Test @SuppressWarnings("deprecation") public void testPollLongThrowsException() { + consumer = newConsumer(); Exception e = assertThrows(UnsupportedOperationException.class, () -> consumer.poll(0L)); assertEquals("Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". " + "This method is deprecated and will be removed in the next major release.", e.getMessage()); @@ -432,27 +428,26 @@ public class AsyncKafkaConsumerTest { @Test public void testCommitSyncLeaderEpochUpdate() { + consumer = newConsumer(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); HashMap topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), "")); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), "")); + completeCommitApplicationEventExceptionally(); consumer.assign(Arrays.asList(t0, t1)); - CompletableFuture commitFuture = new CompletableFuture<>(); - commitFuture.complete(null); + assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); - try (MockedConstruction ignored = commitEventMocker(commitFuture)) { - assertDoesNotThrow(() -> consumer.commitSync(topicPartitionOffsets)); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t1, 1); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t1, 1); verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } @Test public void testCommitAsyncLeaderEpochUpdate() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -462,25 +457,23 @@ public class AsyncKafkaConsumerTest { consumer.assign(Arrays.asList(t0, t1)); - CompletableFuture commitFuture = new CompletableFuture<>(); - commitFuture.complete(null); + assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); - try (MockedConstruction ignored = commitEventMocker(commitFuture)) { - assertDoesNotThrow(() -> consumer.commitAsync(topicPartitionOffsets, callback)); - } - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); - verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t1, 1); + verify(metadata).updateLastSeenEpochIfNewer(t0, 2); + verify(metadata).updateLastSeenEpochIfNewer(t1, 1); verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } @Test public void testEnsurePollExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); + completeCommitApplicationEventExceptionally(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); + consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); - doReturn(future).when(consumer).commit(new HashMap<>(), false); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.complete(null); assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback, null); @@ -488,11 +481,10 @@ public class AsyncKafkaConsumerTest { @Test public void testEnsureShutdownExecutedCommitAsyncCallbacks() { + consumer = newConsumer(); MockCommitCallback callback = new MockCommitCallback(); - CompletableFuture future = new CompletableFuture<>(); - doReturn(future).when(consumer).commit(new HashMap<>(), false); + completeCommitApplicationEventExceptionally(); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - future.complete(null); assertMockCommitCallbackInvoked(() -> consumer.close(), callback, null); @@ -506,7 +498,7 @@ public class AsyncKafkaConsumerTest { if (errors == null) assertNull(callback.exception); else if (errors.exception() instanceof RetriableException) - assertTrue(callback.exception instanceof RetriableCommitFailedException); + assertInstanceOf(RetriableCommitFailedException.class, callback.exception); } private static class MockCommitCallback implements OffsetCommitCallback { @@ -521,57 +513,10 @@ public class AsyncKafkaConsumerTest { this.exception = exception; } } - /** - * This is a rather ugly bit of code. Not my choice :( - * - *

- * - * Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an - * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds the partitions and internally holds a - * {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as - * returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that - * is created, we can affect that behavior. - */ - private static MockedConstruction offsetFetchEventMocker(CompletableFuture> future) { - // This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method - Answer> getInvocationAnswer = invocation -> { - // This argument captures the actual argument value that was passed to the event's get() method, so we - // just "forward" that value to our mocked call - Timer timer = invocation.getArgument(0); - return ConsumerUtils.getResult(future, timer); - }; - - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { - // When the event's get() method is invoked, we call the "answer" method just above - when(mock.get(any())).thenAnswer(getInvocationAnswer); - - // When the event's type() method is invoked, we have to return the type as it will be null in the mock - when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS); - - // This is needed for the WakeupTrigger code that keeps track of the active task - when(mock.future()).thenReturn(future); - }; - - return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, mockInitializer); - } - - private static MockedConstruction commitEventMocker(CompletableFuture future) { - Answer getInvocationAnswer = invocation -> { - Timer timer = invocation.getArgument(0); - return ConsumerUtils.getResult(future, timer); - }; - - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { - when(mock.get(any())).thenAnswer(getInvocationAnswer); - when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT); - when(mock.future()).thenReturn(future); - }; - - return mockConstruction(CommitApplicationEvent.class, mockInitializer); - } @Test public void testAssign() { + consumer = newConsumer(); final TopicPartition tp = new TopicPartition("foo", 3); consumer.assign(singleton(tp)); assertTrue(consumer.subscription().isEmpty()); @@ -582,11 +527,15 @@ public class AsyncKafkaConsumerTest { @Test public void testAssignOnNullTopicPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(null)); } @Test public void testAssignOnEmptyTopicPartition() { + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); + consumer.assign(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -594,68 +543,77 @@ public class AsyncKafkaConsumerTest { @Test public void testAssignOnNullTopicInPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(null, 0)))); } @Test public void testAssignOnEmptyTopicInPartition() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.assign(singleton(new TopicPartition(" ", 0)))); } @Test public void testBeginningOffsetsFailsIfNullPartitions() { + consumer = newConsumer(); assertThrows(NullPointerException.class, () -> consumer.beginningOffsets(null, - Duration.ofMillis(1))); + Duration.ofMillis(1))); } @Test public void testBeginningOffsets() { + consumer = newConsumer(); Map expectedOffsetsAndTimestamp = - mockOffsetAndTimestamp(); + mockOffsetAndTimestamp(); Set partitions = expectedOffsetsAndTimestamp.keySet(); doReturn(expectedOffsetsAndTimestamp).when(applicationEventHandler).addAndGet(any(), any()); Map result = - assertDoesNotThrow(() -> consumer.beginningOffsets(partitions, - Duration.ofMillis(1))); + assertDoesNotThrow(() -> consumer.beginningOffsets(partitions, + Duration.ofMillis(1))); Map expectedOffsets = expectedOffsetsAndTimestamp.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); assertEquals(expectedOffsets, result); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() { + consumer = newConsumer(); Set partitions = mockTopicPartitionOffset().keySet(); Throwable eventProcessingFailure = new KafkaException("Unexpected failure " + - "processing List Offsets event"); + "processing List Offsets event"); doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(any(), any()); Throwable consumerError = assertThrows(KafkaException.class, - () -> consumer.beginningOffsets(partitions, - Duration.ofMillis(1))); + () -> consumer.beginningOffsets(partitions, + Duration.ofMillis(1))); assertEquals(eventProcessingFailure, consumerError); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), + ArgumentMatchers.isA(Timer.class)); } @Test public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() { + consumer = newConsumer(); doThrow(new TimeoutException()).when(applicationEventHandler).addAndGet(any(), any()); assertThrows(TimeoutException.class, - () -> consumer.beginningOffsets( - Collections.singletonList(new TopicPartition("t1", 0)), - Duration.ofMillis(1))); + () -> consumer.beginningOffsets( + Collections.singletonList(new TopicPartition("t1", 0)), + Duration.ofMillis(1))); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test public void testOffsetsForTimesOnNullPartitions() { + consumer = newConsumer(); assertThrows(NullPointerException.class, () -> consumer.offsetsForTimes(null, - Duration.ofMillis(1))); + Duration.ofMillis(1))); } @Test public void testOffsetsForTimesFailsOnNegativeTargetTimes() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition( "topic1", 1), ListOffsetsRequest.EARLIEST_TIMESTAMP), @@ -674,6 +632,7 @@ public class AsyncKafkaConsumerTest { @Test public void testOffsetsForTimes() { + consumer = newConsumer(); Map expectedResult = mockOffsetAndTimestamp(); Map timestampToSearch = mockTimestampToSearch(); @@ -690,6 +649,7 @@ public class AsyncKafkaConsumerTest { // OffsetAndTimestamp as value. @Test public void testOffsetsForTimesWithZeroTimeout() { + consumer = newConsumer(); TopicPartition tp = new TopicPartition("topic1", 0); Map expectedResult = Collections.singletonMap(tp, null); @@ -700,18 +660,32 @@ public class AsyncKafkaConsumerTest { Duration.ofMillis(0))); assertEquals(expectedResult, result); verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), - ArgumentMatchers.isA(Timer.class)); + ArgumentMatchers.isA(Timer.class)); } @Test - public void testWakeup_committed() { + public void testWakeupCommitted() { + consumer = newConsumer(); + final HashMap offsets = mockTopicPartitionOffset(); + doAnswer(invocation -> { + CompletableApplicationEvent event = invocation.getArgument(0); + Timer timer = invocation.getArgument(1); + assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, event); + assertTrue(event.future().isCompletedExceptionally()); + return ConsumerUtils.getResult(event.future(), timer); + }) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + consumer.wakeup(); - assertThrows(WakeupException.class, () -> consumer.committed(mockTopicPartitionOffset().keySet())); - assertNoPendingWakeup(consumer.wakeupTrigger()); + assertThrows(WakeupException.class, () -> consumer.committed(offsets.keySet())); + assertNull(consumer.wakeupTrigger().getPendingTask()); } + @Test public void testRefreshCommittedOffsetsSuccess() { + consumer = newConsumer(); TopicPartition partition = new TopicPartition("t1", 1); Set partitions = Collections.singleton(partition); Map committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L)); @@ -720,6 +694,7 @@ public class AsyncKafkaConsumerTest { @Test public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() { + consumer = newConsumer(); TopicPartition partition = new TopicPartition("t1", 1); Set partitions = Collections.singleton(partition); Map committedOffsets = Collections.emptyMap(); @@ -728,18 +703,20 @@ public class AsyncKafkaConsumerTest { @Test public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() { + consumer = newConsumer(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true); } @Test public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() { // Create consumer without group id so committed offsets are not used for updating positions - resetWithEmptyGroupId(); + consumer = newConsumerWithoutGroupId(); testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false); } @Test public void testSubscribeGeneratesEvent() { + consumer = newConsumer(); String topic = "topic1"; consumer.subscribe(singletonList(topic)); assertEquals(singleton(topic), consumer.subscription()); @@ -749,20 +726,21 @@ public class AsyncKafkaConsumerTest { @Test public void testUnsubscribeGeneratesUnsubscribeEvent() { - consumer.unsubscribe(); + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); - // Verify the unsubscribe event was generated and mock its completion. - final ArgumentCaptor captor = ArgumentCaptor.forClass(UnsubscribeApplicationEvent.class); - verify(applicationEventHandler).add(captor.capture()); - UnsubscribeApplicationEvent unsubscribeApplicationEvent = captor.getValue(); - unsubscribeApplicationEvent.future().complete(null); + consumer.unsubscribe(); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); + verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class)); } @Test public void testSubscribeToEmptyListActsAsUnsubscribe() { + consumer = newConsumer(); + completeUnsubscribeApplicationEventSuccessfully(); + consumer.subscribe(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -771,16 +749,19 @@ public class AsyncKafkaConsumerTest { @Test public void testSubscribeToNullTopicCollection() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((List) null)); } @Test public void testSubscriptionOnNullTopic() { + consumer = newConsumer(); assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(null))); } @Test public void testSubscriptionOnEmptyTopic() { + consumer = newConsumer(); String emptyTopic = " "; assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } @@ -789,34 +770,30 @@ public class AsyncKafkaConsumerTest { public void testGroupMetadataAfterCreationWithGroupIdIsNull() { final Properties props = requiredConsumerProperties(); final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata); - assertEquals( - "To use the group management or offset commit APIs, you must " + - "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.", - exception.getMessage() - ); - } + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); + final Throwable exception = assertThrows(InvalidGroupIdException.class, consumer::groupMetadata); + assertEquals( + "To use the group management or offset commit APIs, you must " + + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.", + exception.getMessage() + ); } @Test public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.empty(), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } + assertEquals(groupId, groupMetadata.groupId()); + assertEquals(Optional.empty(), groupMetadata.groupInstanceId()); + assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); } @Test @@ -826,49 +803,48 @@ public class AsyncKafkaConsumerTest { final Properties props = requiredConsumerPropertiesAndGroupId(groupId); props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); final ConsumerConfig config = new ConsumerConfig(props); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + consumer = newConsumer(config); - final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); + final ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); - assertEquals(groupId, groupMetadata.groupId()); - assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); - assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); - assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); - } + assertEquals(groupId, groupMetadata.groupId()); + assertEquals(Optional.of(groupInstanceId), groupMetadata.groupInstanceId()); + assertEquals(JoinGroupRequest.UNKNOWN_GENERATION_ID, groupMetadata.generationId()); + assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, groupMetadata.memberId()); } @Test public void testGroupMetadataUpdateSingleCall() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final int generation = 1; - final String memberId = "newMemberId"; - final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata( - groupId, - generation, - memberId, - Optional.empty() - ); - final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent( - generation, - memberId - ); - backgroundEventQueue.add(groupMetadataUpdateEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); - consumer.poll(Duration.ZERO); + consumer = newConsumer(config); - final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap()); - assertEquals(expectedGroupMetadata, actualGroupMetadata); + final int generation = 1; + final String memberId = "newMemberId"; + final ConsumerGroupMetadata expectedGroupMetadata = new ConsumerGroupMetadata( + groupId, + generation, + memberId, + Optional.empty() + ); + final GroupMetadataUpdateEvent groupMetadataUpdateEvent = new GroupMetadataUpdateEvent( + generation, + memberId + ); + backgroundEventQueue.add(groupMetadataUpdateEvent); + consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer.poll(Duration.ZERO); - final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata(); + final ConsumerGroupMetadata actualGroupMetadata = consumer.groupMetadata(); - assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate); - } + assertEquals(expectedGroupMetadata, actualGroupMetadata); + + final ConsumerGroupMetadata secondActualGroupMetadataWithoutUpdate = consumer.groupMetadata(); + + assertEquals(expectedGroupMetadata, secondActualGroupMetadataWithoutUpdate); } /** @@ -890,11 +866,13 @@ public class AsyncKafkaConsumerTest { int expectedRevokedCount, int expectedAssignedCount, int expectedLostCount) { + consumer = newConsumer(); CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( revokedError, assignedError, lostError ); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.subscribe(Collections.singletonList("topic"), consumerRebalanceListener); SortedSet partitions = Collections.emptySortedSet(); @@ -947,40 +925,36 @@ public class AsyncKafkaConsumerTest { public void testBackgroundError() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); - backgroundEventQueue.add(errorBackgroundEvent); - consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer = newConsumer(config); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + final KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition"); + final ErrorBackgroundEvent errorBackgroundEvent = new ErrorBackgroundEvent(expectedException); + backgroundEventQueue.add(errorBackgroundEvent); + consumer.assign(singletonList(new TopicPartition("topic", 0))); - assertEquals(expectedException.getMessage(), exception.getMessage()); - } + final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + + assertEquals(expectedException.getMessage(), exception.getMessage()); } @Test public void testMultipleBackgroundErrors() { final String groupId = "consumerGroupA"; final ConsumerConfig config = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(groupId)); - final LinkedBlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer(), backgroundEventQueue)) { - final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition"); - final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1); - backgroundEventQueue.add(errorBackgroundEvent1); - final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); - final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); - backgroundEventQueue.add(errorBackgroundEvent2); - consumer.assign(singletonList(new TopicPartition("topic", 0))); + consumer = newConsumer(config); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + final KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition"); + final ErrorBackgroundEvent errorBackgroundEvent1 = new ErrorBackgroundEvent(expectedException1); + backgroundEventQueue.add(errorBackgroundEvent1); + final KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam"); + final ErrorBackgroundEvent errorBackgroundEvent2 = new ErrorBackgroundEvent(expectedException2); + backgroundEventQueue.add(errorBackgroundEvent2); + consumer.assign(singletonList(new TopicPartition("topic", 0))); - assertEquals(expectedException1.getMessage(), exception.getMessage()); - assertTrue(backgroundEventQueue.isEmpty()); - } + final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); + + assertEquals(expectedException1.getMessage(), exception.getMessage()); + assertTrue(backgroundEventQueue.isEmpty()); } @Test @@ -988,11 +962,9 @@ public class AsyncKafkaConsumerTest { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } @Test @@ -1002,11 +974,9 @@ public class AsyncKafkaConsumerTest { props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertTrue(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } @Test @@ -1016,44 +986,33 @@ public class AsyncKafkaConsumerTest { props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); props.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "someAssignor"); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (AsyncKafkaConsumer ignored = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); - } + assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } - + @Test public void testGroupIdNull() { final Properties props = requiredConsumerProperties(); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } - @Disabled("Flaky test temporarily disabled - in review") @Test public void testGroupIdNotNullAndValid() { final Properties props = requiredConsumerPropertiesAndGroupId("consumerGroupA"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); final ConsumerConfig config = new ConsumerConfig(props); + consumer = newConsumer(config); - try (final AsyncKafkaConsumer consumer = - new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { - assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); - assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); - } catch (final Exception exception) { - throw new AssertionFailedError("The following exception was not expected:", exception); - } + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); } @Test @@ -1072,7 +1031,7 @@ public class AsyncKafkaConsumerTest { final Exception exception = assertThrows( KafkaException.class, - () -> new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer()) + () -> consumer = newConsumer(config) ); assertEquals("Failed to construct kafka consumer", exception.getMessage()); @@ -1093,54 +1052,52 @@ public class AsyncKafkaConsumerTest { } private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) { - // Uncompleted future that will time out if used - CompletableFuture> committedFuture = new CompletableFuture<>(); + completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException()); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); consumer.assign(singleton(new TopicPartition("t1", 1))); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 250ms timeout to give the background thread time to process the events without timing out - consumer.poll(Duration.ofMillis(250)); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + + if (committedOffsetsEnabled) { + // Verify there was an FetchCommittedOffsets event and no ResetPositions event verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - - if (committedOffsetsEnabled) { - // Verify there was an FetchCommittedOffsets event and no ResetPositions event - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - } else { - // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions - verify(applicationEventHandler, never()) - .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - } - } - } - - private void testRefreshCommittedOffsetsSuccess(Set partitions, - Map committedOffsets) { - CompletableFuture> committedFuture = new CompletableFuture<>(); - committedFuture.complete(committedOffsets); - consumer.assign(partitions); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 250ms timeout to give the background thread time to process the events without timing out - consumer.poll(Duration.ofMillis(250)); - - verify(applicationEventHandler, atLeast(1)) - .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); - verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, never()) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + } else { + // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions + verify(applicationEventHandler, never()) .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); verify(applicationEventHandler, atLeast(1)) .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); } } + private void testRefreshCommittedOffsetsSuccess(Set partitions, + Map committedOffsets) { + completeFetchedCommittedOffsetApplicationEventSuccessfully(committedOffsets); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + + consumer.assign(partitions); + + consumer.poll(Duration.ZERO); + + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + } + @Test public void testLongPollWaitIsLimited() { + consumer = newConsumer(); String topicName = "topic1"; consumer.subscribe(singletonList(topicName)); @@ -1157,7 +1114,7 @@ public class AsyncKafkaConsumerTest { // On the first iteration, return no data; on the second, return two records doAnswer(invocation -> { // Mock the subscription being assigned as the first fetch is collected - subscriptions.assignFromSubscribed(Collections.singleton(tp)); + consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp)); return Fetch.empty(); }).doAnswer(invocation -> { return Fetch.forPartition(tp, records, true); @@ -1177,6 +1134,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsWithInitialDelay() throws Exception { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); CompletableFuture future = mock(CompletableFuture.class); @@ -1212,6 +1170,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsWithoutDelay() { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); @@ -1233,6 +1192,7 @@ public class AsyncKafkaConsumerTest { */ @Test public void testProcessBackgroundEventsTimesOut() throws Exception { + consumer = newConsumer(); Time time = new MockTime(); Timer timer = time.timer(1000); CompletableFuture future = mock(CompletableFuture.class); @@ -1251,10 +1211,6 @@ public class AsyncKafkaConsumerTest { } } - private void assertNoPendingWakeup(final WakeupTrigger wakeupTrigger) { - assertNull(wakeupTrigger.getPendingTask()); - } - private HashMap mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -1282,45 +1238,45 @@ public class AsyncKafkaConsumerTest { return timestampToSearch; } - private void prepAutocommitOnClose() { - Node node = testBuilder.metadata.fetch().nodes().get(0); - testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node)); - if (!testBuilder.subscriptions.allConsumed().isEmpty()) { - List topicPartitions = new ArrayList<>(testBuilder.subscriptions.assignedPartitionsList()); - testBuilder.client.prepareResponse(mockAutocommitResponse( - topicPartitions, - (short) 1, - Errors.NONE).responseBody()); - } + private void completeCommitApplicationEventExceptionally(Exception ex) { + doAnswer(invocation -> { + CommitApplicationEvent event = invocation.getArgument(0); + event.future().completeExceptionally(ex); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); } - private ClientResponse mockAutocommitResponse(final List topicPartitions, - final short apiKeyVersion, - final Errors error) { - OffsetCommitResponseData responseData = new OffsetCommitResponseData(); - List responseTopics = new ArrayList<>(); - topicPartitions.forEach(tp -> { - responseTopics.add(new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName(tp.topic()) - .setPartitions(Collections.singletonList( - new OffsetCommitResponseData.OffsetCommitResponsePartition() - .setErrorCode(error.code()) - .setPartitionIndex(tp.partition())))); - }); - responseData.setTopics(responseTopics); - OffsetCommitResponse response = mock(OffsetCommitResponse.class); - when(response.data()).thenReturn(responseData); - return new ClientResponse( - new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1), - null, - "-1", - testBuilder.time.milliseconds(), - testBuilder.time.milliseconds(), - false, - null, - null, - new OffsetCommitResponse(responseData) - ); + private void completeCommitApplicationEventExceptionally() { + doAnswer(invocation -> { + CommitApplicationEvent event = invocation.getArgument(0); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class)); + } + + private void completeFetchedCommittedOffsetApplicationEventSuccessfully(final Map committedOffsets) { + doReturn(committedOffsets) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + } + + private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) { + doThrow(ex) + .when(applicationEventHandler) + .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); + } + + private void completeUnsubscribeApplicationEventSuccessfully() { + doAnswer(invocation -> { + UnsubscribeApplicationEvent event = invocation.getArgument(0); + event.future().complete(null); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class)); + } + + private void forceCommitCallbackInvocation() { + // Invokes callback + consumer.commitAsync(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java index 716100613b5..f0d42a8ed3b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java @@ -21,9 +21,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.GroupRebalanceConfig; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; @@ -38,10 +36,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import java.io.Closeable; -import java.time.Duration; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.BlockingQueue; @@ -101,10 +96,6 @@ public class ConsumerTestBuilder implements Closeable { final MockClient client; final Optional groupInfo; - public ConsumerTestBuilder() { - this(Optional.empty()); - } - public ConsumerTestBuilder(Optional groupInfo) { this(groupInfo, true, true); } @@ -315,78 +306,6 @@ public class ConsumerTestBuilder implements Closeable { } } - public static class ApplicationEventHandlerTestBuilder extends ConsumerTestBuilder { - - public final ApplicationEventHandler applicationEventHandler; - - public ApplicationEventHandlerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) { - super(groupInfo, enableAutoCommit, enableAutoTick); - this.applicationEventHandler = spy(new ApplicationEventHandler( - logContext, - time, - applicationEventQueue, - () -> applicationEventProcessor, - () -> networkClientDelegate, - () -> requestManagers)); - } - - @Override - public void close() { - closeQuietly(applicationEventHandler, ApplicationEventHandler.class.getSimpleName()); - } - } - - public static class AsyncKafkaConsumerTestBuilder extends ApplicationEventHandlerTestBuilder { - - final AsyncKafkaConsumer consumer; - - final FetchCollector fetchCollector; - - public AsyncKafkaConsumerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) { - super(groupInfo, enableAutoCommit, enableAutoTick); - String clientId = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); - List assignors = ConsumerPartitionAssignor.getAssignorInstances( - config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), - config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) - ); - Deserializers deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer()); - this.fetchCollector = spy(new FetchCollector<>(logContext, - metadata, - subscriptions, - fetchConfig, - deserializers, - metricsManager, - time)); - this.consumer = spy(new AsyncKafkaConsumer<>( - logContext, - clientId, - deserializers, - new FetchBuffer(logContext), - fetchCollector, - new ConsumerInterceptors<>(Collections.emptyList()), - time, - applicationEventHandler, - backgroundEventQueue, - rebalanceListenerInvoker, - metrics, - subscriptions, - metadata, - retryBackoffMs, - 60000, - assignors, - groupInfo.map(groupInformation -> groupInformation.groupState.groupId).orElse(null))); - } - - @Override - public void close() { - consumer.close(); - } - - public void close(final Duration timeout) { - consumer.close(timeout); - } - } - public static class GroupInformation { final GroupState groupState;