mirror of https://github.com/apache/kafka.git
KAFKA-15932: Wait for responses in consumer operations (#14912)
The Kafka consumer makes a variety of requests to brokers such as fetching committed offsets and updating metadata. In the LegacyKafkaConsumer, the approach is typically to prepare RPC requests and then poll the network to wait for responses. In the AsyncKafkaConsumer, the approach is to enqueue an ApplicationEvent for processing by one of the request managers on the background thread. However, it is still important to wait for responses rather than spinning enqueuing events for the request managers before they have had a chance to respond. In general, the behaviour will not be changed by this code. The PlaintextConsumerTest.testSeek test was flaky because operations such as KafkaConsumer.position were not properly waiting for a response which meant that subsequent operations were being attempted in the wrong state. This test is no longer flaky. Reviewers: Kirk True <ktrue@confluent.io>, Lianet Magrans <lianetmr@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
bdf6d46b41
commit
8ed53a15ee
|
@ -45,10 +45,10 @@ import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
|
||||
|
@ -771,7 +771,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
|
||||
final FetchCommittedOffsetsApplicationEvent event = new FetchCommittedOffsetsApplicationEvent(partitions);
|
||||
wakeupTrigger.setActiveTask(event.future());
|
||||
try {
|
||||
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event,
|
||||
|
@ -1281,10 +1281,15 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
* @return true iff the operation completed without timing out
|
||||
*/
|
||||
private boolean updateFetchPositions(final Timer timer) {
|
||||
try {
|
||||
cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
|
||||
if (cachedSubscriptionHasAllFetchPositions) return true;
|
||||
|
||||
// Validate positions using the partition leader end offsets, to detect if any partition
|
||||
// has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch
|
||||
// request, retrieve the partition end offsets, and validate the current position against it.
|
||||
applicationEventHandler.add(new ValidatePositionsApplicationEvent());
|
||||
// If the timer is not expired, wait for the validation, otherwise, just request it.
|
||||
applicationEventHandler.addAndGet(new ValidatePositionsApplicationEvent(), timer);
|
||||
|
||||
cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions();
|
||||
if (cachedSubscriptionHasAllFetchPositions) return true;
|
||||
|
@ -1307,8 +1312,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// which are awaiting reset. This will trigger a ListOffset request, retrieve the
|
||||
// partition offsets according to the strategy (ex. earliest, latest), and update the
|
||||
// positions.
|
||||
applicationEventHandler.add(new ResetPositionsApplicationEvent());
|
||||
// If the timer is not expired, wait for the reset, otherwise, just request it.
|
||||
applicationEventHandler.addAndGet(new ResetPositionsApplicationEvent(), timer);
|
||||
return true;
|
||||
} catch (TimeoutException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1334,7 +1343,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
|
||||
log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);
|
||||
try {
|
||||
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(initializingPartitions);
|
||||
final FetchCommittedOffsetsApplicationEvent event = new FetchCommittedOffsetsApplicationEvent(initializingPartitions);
|
||||
final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event, timer);
|
||||
refreshCommittedOffsets(offsets, metadata, subscriptions);
|
||||
return true;
|
||||
|
|
|
@ -263,7 +263,7 @@ public class CommitRequestManager implements RequestManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Handles {@link org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}. It creates an
|
||||
* Handles {@link org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent}. It creates an
|
||||
* {@link OffsetFetchRequestState} and enqueue it to send later.
|
||||
*/
|
||||
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(final Set<TopicPartition> partitions) {
|
||||
|
|
|
@ -193,22 +193,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
* an error is received in the response, it will be saved to be thrown on the next call to
|
||||
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
|
||||
*/
|
||||
public void resetPositionsIfNeeded() {
|
||||
public CompletableFuture<Void> resetPositionsIfNeeded() {
|
||||
Map<TopicPartition, Long> offsetResetTimestamps;
|
||||
|
||||
try {
|
||||
offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
|
||||
} catch (Exception e) {
|
||||
backgroundEventHandler.add(new ErrorBackgroundEvent(e));
|
||||
return;
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
if (offsetResetTimestamps.isEmpty())
|
||||
return;
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
||||
List<NetworkClientDelegate.UnsentRequest> unsentRequests =
|
||||
buildListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
|
||||
requestsToSend.addAll(unsentRequests);
|
||||
return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,15 +221,14 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
* detected, a {@link LogTruncationException} will be saved in memory, to be thrown on the
|
||||
* next call to this function.
|
||||
*/
|
||||
public void validatePositionsIfNeeded() {
|
||||
public CompletableFuture<Void> validatePositionsIfNeeded() {
|
||||
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate =
|
||||
offsetFetcherUtils.getPartitionsToValidate();
|
||||
if (partitionsToValidate.isEmpty()) {
|
||||
return;
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
List<NetworkClientDelegate.UnsentRequest> unsentRequests =
|
||||
buildOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
|
||||
requestsToSend.addAll(unsentRequests);
|
||||
|
||||
return sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -380,19 +377,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
|
||||
/**
|
||||
* Make asynchronous ListOffsets request to fetch offsets by target times for the specified
|
||||
* partitions.
|
||||
* Use the retrieved offsets to reset positions in the subscription state.
|
||||
* partitions. Use the retrieved offsets to reset positions in the subscription state.
|
||||
* This also adds the request to the list of unsentRequests.
|
||||
*
|
||||
* @param timestampsToSearch the mapping between partitions and target time
|
||||
* @return A list of
|
||||
* {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
|
||||
* that can be polled to obtain the corresponding timestamps and offsets.
|
||||
* @return A {@link CompletableFuture} which completes when the requests are
|
||||
* complete.
|
||||
*/
|
||||
private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequestsAndResetPositions(
|
||||
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
|
||||
final Map<TopicPartition, Long> timestampsToSearch) {
|
||||
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
|
||||
groupListOffsetRequests(timestampsToSearch, Optional.empty());
|
||||
|
||||
final AtomicInteger expectedResponses = new AtomicInteger(0);
|
||||
final CompletableFuture<Void> globalResult = new CompletableFuture<>();
|
||||
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
|
||||
|
||||
timestampsToSearchByNode.forEach((node, resetTimestamps) -> {
|
||||
|
@ -419,9 +417,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
}
|
||||
offsetFetcherUtils.onFailedResponseForResettingPositions(resetTimestamps, e);
|
||||
}
|
||||
if (expectedResponses.decrementAndGet() == 0) {
|
||||
globalResult.complete(null);
|
||||
}
|
||||
});
|
||||
});
|
||||
return unsentRequests;
|
||||
|
||||
if (unsentRequests.size() > 0) {
|
||||
expectedResponses.set(unsentRequests.size());
|
||||
requestsToSend.addAll(unsentRequests);
|
||||
} else {
|
||||
globalResult.complete(null);
|
||||
}
|
||||
|
||||
return globalResult;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -429,14 +438,22 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
* for the partition with the epoch less than or equal to the epoch the partition last saw.
|
||||
* <p/>
|
||||
* Requests are grouped by Node for efficiency.
|
||||
* This also adds the request to the list of unsentRequests.
|
||||
*
|
||||
* @param partitionsToValidate a map of topic-partition positions to validate
|
||||
* @return A {@link CompletableFuture} which completes when the requests are
|
||||
* complete.
|
||||
|
||||
*/
|
||||
private List<NetworkClientDelegate.UnsentRequest> buildOffsetsForLeaderEpochRequestsAndValidatePositions(
|
||||
private CompletableFuture<Void> sendOffsetsForLeaderEpochRequestsAndValidatePositions(
|
||||
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
|
||||
|
||||
final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
|
||||
regroupFetchPositionsByLeader(partitionsToValidate);
|
||||
|
||||
long nextResetTimeMs = time.milliseconds() + requestTimeoutMs;
|
||||
final AtomicInteger expectedResponses = new AtomicInteger(0);
|
||||
final CompletableFuture<Void> globalResult = new CompletableFuture<>();
|
||||
final List<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<>();
|
||||
regrouped.forEach((node, fetchPositions) -> {
|
||||
|
||||
|
@ -480,11 +497,20 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis
|
|||
}
|
||||
offsetFetcherUtils.onFailedResponseForValidatingPositions(fetchPositions, e);
|
||||
}
|
||||
if (expectedResponses.decrementAndGet() == 0) {
|
||||
globalResult.complete(null);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
if (unsentRequests.size() > 0) {
|
||||
expectedResponses.set(unsentRequests.size());
|
||||
requestsToSend.addAll(unsentRequests);
|
||||
} else {
|
||||
globalResult.complete(null);
|
||||
}
|
||||
|
||||
return unsentRequests;
|
||||
return globalResult;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.Objects;
|
|||
public abstract class ApplicationEvent {
|
||||
|
||||
public enum Type {
|
||||
COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE,
|
||||
UNSUBSCRIBE
|
||||
}
|
||||
|
|
|
@ -77,11 +77,11 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
process((PollApplicationEvent) event);
|
||||
return;
|
||||
|
||||
case FETCH_COMMITTED_OFFSET:
|
||||
process((OffsetFetchApplicationEvent) event);
|
||||
case FETCH_COMMITTED_OFFSETS:
|
||||
process((FetchCommittedOffsetsApplicationEvent) event);
|
||||
return;
|
||||
|
||||
case METADATA_UPDATE:
|
||||
case NEW_TOPICS_METADATA_UPDATE:
|
||||
process((NewTopicsMetadataUpdateRequestEvent) event);
|
||||
return;
|
||||
|
||||
|
@ -98,19 +98,19 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
return;
|
||||
|
||||
case RESET_POSITIONS:
|
||||
processResetPositionsEvent();
|
||||
process((ResetPositionsApplicationEvent) event);
|
||||
return;
|
||||
|
||||
case VALIDATE_POSITIONS:
|
||||
processValidatePositionsEvent();
|
||||
process((ValidatePositionsApplicationEvent) event);
|
||||
return;
|
||||
|
||||
case SUBSCRIPTION_CHANGE:
|
||||
processSubscriptionChangeEvent();
|
||||
process((SubscriptionChangeApplicationEvent) event);
|
||||
return;
|
||||
|
||||
case UNSUBSCRIBE:
|
||||
processUnsubscribeEvent((UnsubscribeApplicationEvent) event);
|
||||
process((UnsubscribeApplicationEvent) event);
|
||||
return;
|
||||
|
||||
default:
|
||||
|
@ -145,7 +145,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
event.chain(manager.addOffsetCommitRequest(event.offsets()));
|
||||
}
|
||||
|
||||
private void process(final OffsetFetchApplicationEvent event) {
|
||||
private void process(final FetchCommittedOffsetsApplicationEvent event) {
|
||||
if (!requestManagers.commitRequestManager.isPresent()) {
|
||||
event.future().completeExceptionally(new KafkaException("Unable to fetch committed " +
|
||||
"offset because the CommittedRequestManager is not available. Check if group.id was set correctly"));
|
||||
|
@ -180,7 +180,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
* consumer join the group if it is not part of it yet, or send the updated subscription if
|
||||
* it is already a member.
|
||||
*/
|
||||
private void processSubscriptionChangeEvent() {
|
||||
private void process(final SubscriptionChangeApplicationEvent event) {
|
||||
if (!requestManagers.membershipManager.isPresent()) {
|
||||
throw new RuntimeException("Group membership manager not present when processing a " +
|
||||
"subscribe event");
|
||||
|
@ -197,7 +197,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
* execution for releasing the assignment completes, and the request to leave
|
||||
* the group is sent out.
|
||||
*/
|
||||
private void processUnsubscribeEvent(UnsubscribeApplicationEvent event) {
|
||||
private void process(final UnsubscribeApplicationEvent event) {
|
||||
if (!requestManagers.membershipManager.isPresent()) {
|
||||
throw new RuntimeException("Group membership manager not present when processing an " +
|
||||
"unsubscribe event");
|
||||
|
@ -207,12 +207,14 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
event.chain(result);
|
||||
}
|
||||
|
||||
private void processResetPositionsEvent() {
|
||||
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
|
||||
private void process(final ResetPositionsApplicationEvent event) {
|
||||
CompletableFuture<Void> result = requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
|
||||
event.chain(result);
|
||||
}
|
||||
|
||||
private void processValidatePositionsEvent() {
|
||||
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
|
||||
private void process(final ValidatePositionsApplicationEvent event) {
|
||||
CompletableFuture<Void> result = requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
|
||||
event.chain(result);
|
||||
}
|
||||
|
||||
private void process(final TopicMetadataApplicationEvent event) {
|
||||
|
|
|
@ -23,12 +23,12 @@ import java.util.Collections;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
|
||||
public class FetchCommittedOffsetsApplicationEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
|
||||
|
||||
private final Set<TopicPartition> partitions;
|
||||
|
||||
public OffsetFetchApplicationEvent(final Set<TopicPartition> partitions) {
|
||||
super(Type.FETCH_COMMITTED_OFFSET);
|
||||
public FetchCommittedOffsetsApplicationEvent(final Set<TopicPartition> partitions) {
|
||||
super(Type.FETCH_COMMITTED_OFFSETS);
|
||||
this.partitions = Collections.unmodifiableSet(partitions);
|
||||
}
|
||||
|
||||
|
@ -42,7 +42,7 @@ public class OffsetFetchApplicationEvent extends CompletableApplicationEvent<Map
|
|||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (!super.equals(o)) return false;
|
||||
|
||||
OffsetFetchApplicationEvent that = (OffsetFetchApplicationEvent) o;
|
||||
FetchCommittedOffsetsApplicationEvent that = (FetchCommittedOffsetsApplicationEvent) o;
|
||||
|
||||
return partitions.equals(that.partitions);
|
||||
}
|
|
@ -19,7 +19,7 @@ package org.apache.kafka.clients.consumer.internals.events;
|
|||
public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
|
||||
|
||||
public NewTopicsMetadataUpdateRequestEvent() {
|
||||
super(Type.METADATA_UPDATE);
|
||||
super(Type.NEW_TOPICS_METADATA_UPDATE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -33,10 +33,10 @@ import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplic
|
|||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
|
||||
|
@ -109,6 +109,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
@ -252,9 +253,9 @@ public class AsyncKafkaConsumerTest {
|
|||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
|
||||
committedFuture.complete(offsets);
|
||||
|
||||
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -271,12 +272,12 @@ public class AsyncKafkaConsumerTest {
|
|||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
|
||||
committedFuture.complete(topicPartitionOffsets);
|
||||
|
||||
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
assertDoesNotThrow(() -> consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
|
||||
}
|
||||
verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t0, 2);
|
||||
verify(testBuilder.metadata).updateLastSeenEpochIfNewer(t2, 3);
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -285,9 +286,9 @@ public class AsyncKafkaConsumerTest {
|
|||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
|
||||
committedFuture.completeExceptionally(new KafkaException("Test exception"));
|
||||
|
||||
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
assertThrows(KafkaException.class, () -> consumer.committed(offsets.keySet(), Duration.ofMillis(1000)));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -298,7 +299,7 @@ public class AsyncKafkaConsumerTest {
|
|||
final TopicPartition tp = new TopicPartition(topicName, partition);
|
||||
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class));
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
consumer.wakeup();
|
||||
|
@ -317,7 +318,7 @@ public class AsyncKafkaConsumerTest {
|
|||
return Fetch.empty();
|
||||
}).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class));
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
|
||||
|
@ -338,7 +339,7 @@ public class AsyncKafkaConsumerTest {
|
|||
return Fetch.forPartition(tp, records, true);
|
||||
}).when(fetchCollector).collectFetch(Mockito.any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class));
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
|
||||
|
@ -359,7 +360,7 @@ public class AsyncKafkaConsumerTest {
|
|||
doReturn(Fetch.forPartition(tp, records, true))
|
||||
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
Map<TopicPartition, OffsetAndMetadata> offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1)));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(OffsetFetchApplicationEvent.class), any(Timer.class));
|
||||
doReturn(offsets).when(applicationEventHandler).addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), any(Timer.class));
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
consumer.poll(Duration.ZERO);
|
||||
|
@ -510,12 +511,12 @@ public class AsyncKafkaConsumerTest {
|
|||
* <p/>
|
||||
*
|
||||
* Inside the {@link org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we create an
|
||||
* instance of {@link OffsetFetchApplicationEvent} that holds the partitions and internally holds a
|
||||
* instance of {@link FetchCommittedOffsetsApplicationEvent} that holds the partitions and internally holds a
|
||||
* {@link CompletableFuture}. We want to test different behaviours of the {@link Future#get()}, such as
|
||||
* returning normally, timing out, throwing an error, etc. By mocking the construction of the event object that
|
||||
* is created, we can affect that behavior.
|
||||
*/
|
||||
private static MockedConstruction<OffsetFetchApplicationEvent> offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
|
||||
private static MockedConstruction<FetchCommittedOffsetsApplicationEvent> offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
|
||||
// This "answer" is where we pass the future to be invoked by the ConsumerUtils.getResult() method
|
||||
Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer = invocation -> {
|
||||
// This argument captures the actual argument value that was passed to the event's get() method, so we
|
||||
|
@ -524,18 +525,18 @@ public class AsyncKafkaConsumerTest {
|
|||
return ConsumerUtils.getResult(future, timer);
|
||||
};
|
||||
|
||||
MockedConstruction.MockInitializer<OffsetFetchApplicationEvent> mockInitializer = (mock, ctx) -> {
|
||||
MockedConstruction.MockInitializer<FetchCommittedOffsetsApplicationEvent> mockInitializer = (mock, ctx) -> {
|
||||
// When the event's get() method is invoked, we call the "answer" method just above
|
||||
when(mock.get(any())).thenAnswer(getInvocationAnswer);
|
||||
|
||||
// When the event's type() method is invoked, we have to return the type as it will be null in the mock
|
||||
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSET);
|
||||
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS);
|
||||
|
||||
// This is needed for the WakeupTrigger code that keeps track of the active task
|
||||
when(mock.future()).thenReturn(future);
|
||||
};
|
||||
|
||||
return mockConstruction(OffsetFetchApplicationEvent.class, mockInitializer);
|
||||
return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, mockInitializer);
|
||||
}
|
||||
|
||||
private static MockedConstruction<CommitApplicationEvent> commitEventMocker(CompletableFuture<Void> future) {
|
||||
|
@ -1008,22 +1009,25 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
consumer.assign(singleton(new TopicPartition("t1", 1)));
|
||||
|
||||
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
// Poll with 0 timeout to run a single iteration of the poll loop
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
// Poll with 250ms timeout to give the background thread time to process the events without timing out
|
||||
consumer.poll(Duration.ofMillis(250));
|
||||
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
|
||||
if (committedOffsetsEnabled) {
|
||||
// Verify there was an OffsetFetch event and no ResetPositions event
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler,
|
||||
never()).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
|
||||
// Verify there was an FetchCommittedOffsets event and no ResetPositions event
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler, never())
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
} else {
|
||||
// Verify there was not any OffsetFetch event but there should be a ResetPositions
|
||||
verify(applicationEventHandler,
|
||||
never()).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
|
||||
// Verify there was not any FetchCommittedOffsets event but there should be a ResetPositions
|
||||
verify(applicationEventHandler, never())
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1033,13 +1037,16 @@ public class AsyncKafkaConsumerTest {
|
|||
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> committedFuture = new CompletableFuture<>();
|
||||
committedFuture.complete(committedOffsets);
|
||||
consumer.assign(partitions);
|
||||
try (MockedConstruction<OffsetFetchApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
// Poll with 0 timeout to run a single iteration of the poll loop
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored = offsetFetchEventMocker(committedFuture)) {
|
||||
// Poll with 250ms timeout to give the background thread time to process the events without timing out
|
||||
consumer.poll(Duration.ofMillis(250));
|
||||
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class));
|
||||
verify(applicationEventHandler).add(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), ArgumentMatchers.isA(Timer.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -596,9 +596,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
|||
assertThrows(classOf[InvalidTopicException], () => consumer.partitionsFor(";3# ads,{234"))
|
||||
}
|
||||
|
||||
// Temporarily do not run flaky test for consumer group protocol
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly"))
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||
def testSeek(quorum: String, groupProtocol: String): Unit = {
|
||||
val consumer = createConsumer()
|
||||
val totalRecords = 50L
|
||||
|
|
Loading…
Reference in New Issue