From 8ed53a15ee6c9be416717d1740e8e37252cc4991 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 6 Dec 2023 17:47:26 +0000 Subject: [PATCH] KAFKA-15932: Wait for responses in consumer operations (#14912) The Kafka consumer makes a variety of requests to brokers such as fetching committed offsets and updating metadata. In the LegacyKafkaConsumer, the approach is typically to prepare RPC requests and then poll the network to wait for responses. In the AsyncKafkaConsumer, the approach is to enqueue an ApplicationEvent for processing by one of the request managers on the background thread. However, it is still important to wait for responses rather than spinning enqueuing events for the request managers before they have had a chance to respond. In general, the behaviour will not be changed by this code. The PlaintextConsumerTest.testSeek test was flaky because operations such as KafkaConsumer.position were not properly waiting for a response which meant that subsequent operations were being attempted in the wrong state. This test is no longer flaky. Reviewers: Kirk True , Lianet Magrans , Bruno Cadonna --- .../internals/AsyncKafkaConsumer.java | 65 +++++++++------- .../internals/CommitRequestManager.java | 2 +- .../internals/OffsetsRequestManager.java | 68 +++++++++++------ .../internals/events/ApplicationEvent.java | 2 +- .../events/ApplicationEventProcessor.java | 30 ++++---- ...etchCommittedOffsetsApplicationEvent.java} | 8 +- .../NewTopicsMetadataUpdateRequestEvent.java | 2 +- .../internals/AsyncKafkaConsumerTest.java | 75 ++++++++++--------- .../kafka/api/PlaintextConsumerTest.scala | 3 +- 9 files changed, 149 insertions(+), 106 deletions(-) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{OffsetFetchApplicationEvent.java => FetchCommittedOffsetsApplicationEvent.java} (83%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 6a202e89402..0e93b5f6e21 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -45,10 +45,10 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; +import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; -import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent; @@ -771,7 +771,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { return Collections.emptyMap(); } - final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); + final FetchCommittedOffsetsApplicationEvent event = new FetchCommittedOffsetsApplicationEvent(partitions); wakeupTrigger.setActiveTask(event.future()); try { final Map committedOffsets = applicationEventHandler.addAndGet(event, @@ -1281,34 +1281,43 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { * @return true iff the operation completed without timing out */ private boolean updateFetchPositions(final Timer timer) { - // Validate positions using the partition leader end offsets, to detect if any partition - // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch - // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.add(new ValidatePositionsApplicationEvent()); + try { + cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHasAllFetchPositions) return true; - cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); - if (cachedSubscriptionHasAllFetchPositions) return true; + // Validate positions using the partition leader end offsets, to detect if any partition + // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch + // request, retrieve the partition end offsets, and validate the current position against it. + // If the timer is not expired, wait for the validation, otherwise, just request it. + applicationEventHandler.addAndGet(new ValidatePositionsApplicationEvent(), timer); - // Reset positions using committed offsets retrieved from the group coordinator, for any - // partitions which do not have a valid position and are not awaiting reset. This will - // trigger an OffsetFetch request and update positions with the offsets retrieved. This - // will only do a coordinator lookup if there are partitions which have missing - // positions, so a consumer with manually assigned partitions can avoid a coordinator - // dependence by always ensuring that assigned partitions have an initial position. - if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) + cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); + if (cachedSubscriptionHasAllFetchPositions) return true; + + // Reset positions using committed offsets retrieved from the group coordinator, for any + // partitions which do not have a valid position and are not awaiting reset. This will + // trigger an OffsetFetch request and update positions with the offsets retrieved. This + // will only do a coordinator lookup if there are partitions which have missing + // positions, so a consumer with manually assigned partitions can avoid a coordinator + // dependence by always ensuring that assigned partitions have an initial position. + if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) + return false; + + // If there are partitions still needing a position and a reset policy is defined, + // request reset using the default policy. If no reset strategy is defined and there + // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. + subscriptions.resetInitializingPositions(); + + // Reset positions using partition offsets retrieved from the leader, for any partitions + // which are awaiting reset. This will trigger a ListOffset request, retrieve the + // partition offsets according to the strategy (ex. earliest, latest), and update the + // positions. + // If the timer is not expired, wait for the reset, otherwise, just request it. + applicationEventHandler.addAndGet(new ResetPositionsApplicationEvent(), timer); + return true; + } catch (TimeoutException e) { return false; - - // If there are partitions still needing a position and a reset policy is defined, - // request reset using the default policy. If no reset strategy is defined and there - // are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception. - subscriptions.resetInitializingPositions(); - - // Reset positions using partition offsets retrieved from the leader, for any partitions - // which are awaiting reset. This will trigger a ListOffset request, retrieve the - // partition offsets according to the strategy (ex. earliest, latest), and update the - // positions. - applicationEventHandler.add(new ResetPositionsApplicationEvent()); - return true; + } } /** @@ -1334,7 +1343,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.debug("Refreshing committed offsets for partitions {}", initializingPartitions); try { - final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions); + final FetchCommittedOffsetsApplicationEvent event = new FetchCommittedOffsetsApplicationEvent(initializingPartitions); final Map offsets = applicationEventHandler.addAndGet(event, timer); refreshCommittedOffsets(offsets, metadata, subscriptions); return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 5caee5e5037..580373c4396 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -263,7 +263,7 @@ public class CommitRequestManager implements RequestManager { } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an + * Handles {@link org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent}. It creates an * {@link OffsetFetchRequestState} and enqueue it to send later. */ public CompletableFuture> addOffsetFetchRequest(final Set partitions) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 78f6bdb1aa1..34f4b30c44d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -193,22 +193,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis * an error is received in the response, it will be saved to be thrown on the next call to * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) */ - public void resetPositionsIfNeeded() { + public CompletableFuture resetPositionsIfNeeded() { Map offsetResetTimestamps; try { offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); } catch (Exception e) { backgroundEventHandler.add(new ErrorBackgroundEvent(e)); - return; + return CompletableFuture.completedFuture(null); } if (offsetResetTimestamps.isEmpty()) - return; + return CompletableFuture.completedFuture(null); - List unsentRequests = - buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps); - requestsToSend.addAll(unsentRequests); + return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps); } /** @@ -223,15 +221,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis * detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the * next call to this function. */ - public void validatePositionsIfNeeded() { + public CompletableFuture validatePositionsIfNeeded() { Map partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate(); if (partitionsToValidate.isEmpty()) { - return; + return CompletableFuture.completedFuture(null); } - List unsentRequests = - buildOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); - requestsToSend.addAll(unsentRequests); + + return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate); } /** @@ -380,19 +377,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis /** * Make asynchronous ListOffsets request to fetch offsets by target times for the specified - * partitions. - * Use the retrieved offsets to reset positions in the subscription state. + * partitions. Use the retrieved offsets to reset positions in the subscription state. + * This also adds the request to the list of unsentRequests. * * @param timestampsToSearch the mapping between partitions and target time - * @return A list of - * {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest} - * that can be polled to obtain the corresponding timestamps and offsets. + * @return A {@link CompletableFuture} which completes when the requests are + * complete. */ - private List buildListOffsetsRequestsAndResetPositions( + private CompletableFuture sendListOffsetsRequestsAndResetPositions( final Map timestampsToSearch) { Map> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch, Optional.empty()); + final AtomicInteger expectedResponses = new AtomicInteger(0); + final CompletableFuture globalResult = new CompletableFuture<>(); final List unsentRequests = new ArrayList<>(); timestampsToSearchByNode.forEach((node, resetTimestamps) -> { @@ -419,9 +417,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis } offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e); } + if (expectedResponses.decrementAndGet() == 0) { + globalResult.complete(null); + } }); }); - return unsentRequests; + + if (unsentRequests.size() > 0) { + expectedResponses.set(unsentRequests.size()); + requestsToSend.addAll(unsentRequests); + } else { + globalResult.complete(null); + } + + return globalResult; } /** @@ -429,14 +438,22 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis * for the partition with the epoch less than or equal to the epoch the partition last saw. *

* Requests are grouped by Node for efficiency. + * This also adds the request to the list of unsentRequests. + * + * @param partitionsToValidate a map of topic-partition positions to validate + * @return A {@link CompletableFuture} which completes when the requests are + * complete. + */ - private List buildOffsetsForLeaderEpochRequestsAndValidatePositions( + private CompletableFuture sendOffsetsForLeaderEpochRequestsAndValidatePositions( Map partitionsToValidate) { final Map> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); long nextResetTimeMs = time.milliseconds() + requestTimeoutMs; + final AtomicInteger expectedResponses = new AtomicInteger(0); + final CompletableFuture globalResult = new CompletableFuture<>(); final List unsentRequests = new ArrayList<>(); regrouped.forEach((node, fetchPositions) -> { @@ -480,11 +497,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis } offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e); } + if (expectedResponses.decrementAndGet() == 0) { + globalResult.complete(null); + } }); - }); - return unsentRequests; + if (unsentRequests.size() > 0) { + expectedResponses.set(unsentRequests.size()); + requestsToSend.addAll(unsentRequests); + } else { + globalResult.complete(null); + } + + return globalResult; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 552c283b43f..e6ff3b2fe5e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -24,7 +24,7 @@ import java.util.Objects; public abstract class ApplicationEvent { public enum Type { - COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, + COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, UNSUBSCRIBE } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index e076c46cd36..33233561a34 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -77,11 +77,11 @@ public class ApplicationEventProcessor extends EventProcessor process((PollApplicationEvent) event); return; - case FETCH_COMMITTED_OFFSET: - process((OffsetFetchApplicationEvent) event); + case FETCH_COMMITTED_OFFSETS: + process((FetchCommittedOffsetsApplicationEvent) event); return; - case METADATA_UPDATE: + case NEW_TOPICS_METADATA_UPDATE: process((NewTopicsMetadataUpdateRequestEvent) event); return; @@ -98,19 +98,19 @@ public class ApplicationEventProcessor extends EventProcessor return; case RESET_POSITIONS: - processResetPositionsEvent(); + process((ResetPositionsApplicationEvent) event); return; case VALIDATE_POSITIONS: - processValidatePositionsEvent(); + process((ValidatePositionsApplicationEvent) event); return; case SUBSCRIPTION_CHANGE: - processSubscriptionChangeEvent(); + process((SubscriptionChangeApplicationEvent) event); return; case UNSUBSCRIBE: - processUnsubscribeEvent((UnsubscribeApplicationEvent) event); + process((UnsubscribeApplicationEvent) event); return; default: @@ -145,7 +145,7 @@ public class ApplicationEventProcessor extends EventProcessor event.chain(manager.addOffsetCommitRequest(event.offsets())); } - private void process(final OffsetFetchApplicationEvent event) { + private void process(final FetchCommittedOffsetsApplicationEvent event) { if (!requestManagers.commitRequestManager.isPresent()) { event.future().completeExceptionally(new KafkaException("Unable to fetch committed " + "offset because the CommittedRequestManager is not available. Check if group.id was set correctly")); @@ -180,7 +180,7 @@ public class ApplicationEventProcessor extends EventProcessor * consumer join the group if it is not part of it yet, or send the updated subscription if * it is already a member. */ - private void processSubscriptionChangeEvent() { + private void process(final SubscriptionChangeApplicationEvent event) { if (!requestManagers.membershipManager.isPresent()) { throw new RuntimeException("Group membership manager not present when processing a " + "subscribe event"); @@ -197,7 +197,7 @@ public class ApplicationEventProcessor extends EventProcessor * execution for releasing the assignment completes, and the request to leave * the group is sent out. */ - private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) { + private void process(final UnsubscribeApplicationEvent event) { if (!requestManagers.membershipManager.isPresent()) { throw new RuntimeException("Group membership manager not present when processing an " + "unsubscribe event"); @@ -207,12 +207,14 @@ public class ApplicationEventProcessor extends EventProcessor event.chain(result); } - private void processResetPositionsEvent() { - requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); + private void process(final ResetPositionsApplicationEvent event) { + CompletableFuture result = requestManagers.offsetsRequestManager.resetPositionsIfNeeded(); + event.chain(result); } - private void processValidatePositionsEvent() { - requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + private void process(final ValidatePositionsApplicationEvent event) { + CompletableFuture result = requestManagers.offsetsRequestManager.validatePositionsIfNeeded(); + event.chain(result); } private void process(final TopicMetadataApplicationEvent event) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java similarity index 83% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java index 8b1a5492656..6765a408b60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java @@ -23,12 +23,12 @@ import java.util.Collections; import java.util.Map; import java.util.Set; -public class OffsetFetchApplicationEvent extends CompletableApplicationEvent> { +public class FetchCommittedOffsetsApplicationEvent extends CompletableApplicationEvent> { private final Set partitions; - public OffsetFetchApplicationEvent(final Set partitions) { - super(Type.FETCH_COMMITTED_OFFSET); + public FetchCommittedOffsetsApplicationEvent(final Set partitions) { + super(Type.FETCH_COMMITTED_OFFSETS); this.partitions = Collections.unmodifiableSet(partitions); } @@ -42,7 +42,7 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent> committedFuture = new CompletableFuture<>(); committedFuture.complete(offsets); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { + try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); } } @@ -271,12 +272,12 @@ public class AsyncKafkaConsumerTest { CompletableFuture> committedFuture = new CompletableFuture<>(); committedFuture.complete(topicPartitionOffsets); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { + try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000))); } verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2); verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t2, 3); - verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); } @Test @@ -285,9 +286,9 @@ public class AsyncKafkaConsumerTest { CompletableFuture> committedFuture = new CompletableFuture<>(); committedFuture.completeExceptionally(new KafkaException("Test exception")); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { + try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000))); - verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class)); } } @@ -298,7 +299,7 @@ public class AsyncKafkaConsumerTest { final TopicPartition tp = new TopicPartition(topicName, partition); doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class)); + doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); consumer.assign(singleton(tp)); consumer.wakeup(); @@ -317,7 +318,7 @@ public class AsyncKafkaConsumerTest { return Fetch.empty(); }).when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class)); + doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); consumer.assign(singleton(tp)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); @@ -338,7 +339,7 @@ public class AsyncKafkaConsumerTest { return Fetch.forPartition(tp, records, true); }).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class)); + doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); consumer.assign(singleton(tp)); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored @@ -359,7 +360,7 @@ public class AsyncKafkaConsumerTest { doReturn(Fetch.forPartition(tp, records, true)) .when(fetchCollector).collectFetch(any(FetchBuffer.class)); Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); - doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class)); + doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class)); consumer.assign(singleton(tp)); consumer.poll(Duration.ZERO); @@ -510,12 +511,12 @@ public class AsyncKafkaConsumerTest { *

* * Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an - * instance of {@link OffsetFetchApplicationEvent} that holds the partitions and internally holds a + * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds the partitions and internally holds a * {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as * returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that * is created, we can affect that behavior. */ - private static MockedConstruction offsetFetchEventMocker(CompletableFuture> future) { + private static MockedConstruction offsetFetchEventMocker(CompletableFuture> future) { // This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method Answer> getInvocationAnswer = invocation -> { // This argument captures the actual argument value that was passed to the event's get() method, so we @@ -524,18 +525,18 @@ public class AsyncKafkaConsumerTest { return ConsumerUtils.getResult(future, timer); }; - MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { + MockedConstruction.MockInitializer mockInitializer = (mock, ctx) -> { // When the event's get() method is invoked, we call the "answer" method just above when(mock.get(any())).thenAnswer(getInvocationAnswer); // When the event's type() method is invoked, we have to return the type as it will be null in the mock - when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSET); + when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS); // This is needed for the WakeupTrigger code that keeps track of the active task when(mock.future()).thenReturn(future); }; - return mockConstruction(OffsetFetchApplicationEvent.class, mockInitializer); + return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, mockInitializer); } private static MockedConstruction commitEventMocker(CompletableFuture future) { @@ -1008,22 +1009,25 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(new TopicPartition("t1", 1))); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 0 timeout to run a single iteration of the poll loop - consumer.poll(Duration.ofMillis(0)); + try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { + // Poll with 250ms timeout to give the background thread time to process the events without timing out + consumer.poll(Duration.ofMillis(250)); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); if (committedOffsetsEnabled) { - // Verify there was an OffsetFetch event and no ResetPositions event - verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); - verify(applicationEventHandler, - never()).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class)); + // Verify there was an FetchCommittedOffsets event and no ResetPositions event + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, never()) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); } else { - // Verify there was not any OffsetFetch event but there should be a ResetPositions - verify(applicationEventHandler, - never()).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class)); + // Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions + verify(applicationEventHandler, never()) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); } } } @@ -1033,13 +1037,16 @@ public class AsyncKafkaConsumerTest { CompletableFuture> committedFuture = new CompletableFuture<>(); committedFuture.complete(committedOffsets); consumer.assign(partitions); - try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { - // Poll with 0 timeout to run a single iteration of the poll loop - consumer.poll(Duration.ofMillis(0)); + try (MockedConstruction ignored = offsetFetchEventMocker(committedFuture)) { + // Poll with 250ms timeout to give the background thread time to process the events without timing out + consumer.poll(Duration.ofMillis(250)); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class)); - verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); - verify(applicationEventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); + verify(applicationEventHandler, atLeast(1)) + .addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class)); } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 338b64a0971..2306e181483 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -596,9 +596,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertThrows(classOf[InvalidTopicException], () => consumer.partitionsFor(";3# ads,{234")) } - // Temporarily do not run flaky test for consumer group protocol @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly")) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testSeek(quorum: String, groupProtocol: String): Unit = { val consumer = createConsumer() val totalRecords = 50L