KAFKA-17439: Make polling for new records an explicit action/event in the new consumer (#17035)

Reviewers: Andrew Schofield <aschofield@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
Kirk True 2024-10-28 12:46:37 -07:00 committed by GitHub
parent 5f92f60bff
commit 9e424755d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 301 additions and 29 deletions

View File

@ -50,6 +50,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@ -708,6 +709,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
updateAssignmentMetadataIfNeeded(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
sendPrefetches(timer);
if (fetch.records().isEmpty()) {
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for at least one topic partition");
@ -1519,6 +1528,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return fetch;
}
// send any new fetches (won't resend pending fetches)
sendFetches(timer);
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
@ -1606,6 +1618,63 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
offsetAndMetadata.leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(topicPartition, epoch));
}
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*
* <p/>
*
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
* of the same name:
*
* <ul>
* <li>
* The method will wait for confirmation of the request creation before continuing.
* </li>
* <li>
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
* </li>
* <li>
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
* as it can handle requests that are created after the timeout.
* </li>
* </ul>
*
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
*/
private void sendFetches(Timer timer) {
try {
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException e) {
// Can be ignored, per above comments.
}
}
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread
* will not wait for confirmation of the request creation before continuing.
*
* <p/>
*
* At the point this method is called, {@link KafkaConsumer#poll(Duration)} has data ready to return to the user,
* which means the consumed position was already updated. In order to prevent potential gaps in records, this
* method is designed to suppress all exceptions.
*
* @param timer Provides an upper bound for the event and its {@link CompletableFuture future}
*/
private void sendPrefetches(Timer timer) {
try {
applicationEventHandler.add(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (Throwable t) {
// Any unexpected errors will be logged for troubleshooting, but not thrown.
log.warn("An unexpected error occurred while pre-fetching data in Consumer.poll(), but was suppressed", t);
}
}
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
maybeThrowFencedInstanceException();

View File

@ -19,8 +19,10 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.LogContext;
@ -29,6 +31,7 @@ import org.apache.kafka.common.utils.Time;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -41,6 +44,7 @@ import java.util.stream.Collectors;
public class FetchRequestManager extends AbstractFetch implements RequestManager {
private final NetworkClientDelegate networkClientDelegate;
private CompletableFuture<Void> pendingFetchRequestFuture;
FetchRequestManager(final LogContext logContext,
final Time time,
@ -65,13 +69,40 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
networkClientDelegate.maybeThrowAuthFailure(node);
}
/**
* Signals the {@link Consumer} wants requests be created for the broker nodes to fetch the next
* batch of records.
*
* @see CreateFetchRequestsEvent
* @return Future on which the caller can wait to ensure that the requests have been created
*/
public CompletableFuture<Void> createFetchRequests() {
CompletableFuture<Void> future = new CompletableFuture<>();
if (pendingFetchRequestFuture != null) {
// In this case, we have an outstanding fetch request, so chain the newly created future to be
// completed when the "pending" future is completed.
pendingFetchRequestFuture.whenComplete((value, exception) -> {
if (exception != null) {
future.completeExceptionally(exception);
} else {
future.complete(value);
}
});
} else {
pendingFetchRequestFuture = future;
}
return future;
}
/**
* {@inheritDoc}
*/
@Override
public PollResult poll(long currentTimeMs) {
return pollInternal(
prepareFetchRequests(),
this::prepareFetchRequests,
this::handleFetchSuccess,
this::handleFetchFailure
);
@ -82,9 +113,12 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
*/
@Override
public PollResult pollOnClose(long currentTimeMs) {
// There needs to be a pending fetch request for pollInternal to create the requests.
createFetchRequests();
// TODO: move the logic to poll to handle signal close
return pollInternal(
prepareCloseFetchSessionRequests(),
this::prepareCloseFetchSessionRequests,
this::handleCloseFetchSessionSuccess,
this::handleCloseFetchSessionFailure
);
@ -94,14 +128,23 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
* Creates the {@link PollResult poll result} that contains a list of zero or more
* {@link FetchRequest.Builder fetch requests}.
*
* @param fetchRequests {@link Map} of {@link Node nodes} to their {@link FetchSessionHandler.FetchRequestData}
* @param fetchRequestPreparer {@link FetchRequestPreparer} to generate a {@link Map} of {@link Node nodes}
* to their {@link FetchSessionHandler.FetchRequestData}
* @param successHandler {@link ResponseHandler Handler for successful responses}
* @param errorHandler {@link ResponseHandler Handler for failure responses}
* @return {@link PollResult}
*/
private PollResult pollInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests,
private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
ResponseHandler<ClientResponse> successHandler,
ResponseHandler<Throwable> errorHandler) {
if (pendingFetchRequestFuture == null) {
// If no explicit request for creating fetch requests was issued, just short-circuit.
return PollResult.EMPTY;
}
try {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = fetchRequestPreparer.prepare();
List<UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
@ -116,6 +159,25 @@ public class FetchRequestManager extends AbstractFetch implements RequestManager
return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler);
}).collect(Collectors.toList());
pendingFetchRequestFuture.complete(null);
return new PollResult(requests);
} catch (Throwable t) {
// A "dummy" poll result is returned here rather than rethrowing the error because any error
// that is thrown from any RequestManager.poll() method interrupts the polling of the other
// request managers.
pendingFetchRequestFuture.completeExceptionally(t);
return PollResult.EMPTY;
} finally {
pendingFetchRequestFuture = null;
}
}
/**
* Simple functional interface to all passing in a method reference for improved readability.
*/
@FunctionalInterface
protected interface FetchRequestPreparer {
Map<Node, FetchSessionHandler.FetchRequestData> prepare();
}
}

View File

@ -32,7 +32,7 @@ public abstract class ApplicationEvent {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,

View File

@ -124,6 +124,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
process((CommitOnCloseEvent) event);
return;
case CREATE_FETCH_REQUESTS:
process((CreateFetchRequestsEvent) event);
return;
case SHARE_FETCH:
process((ShareFetchEvent) event);
return;
@ -176,6 +180,11 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
}
private void process(final CreateFetchRequestsEvent event) {
CompletableFuture<Void> future = requestManagers.fetchRequestManager.createFetchRequests();
future.whenComplete(complete(event.future()));
}
private void process(final AsyncCommitEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
/**
* {@code CreateFetchRequestsEvent} signals that the {@link Consumer} wants to issue fetch requests to the nodes
* for the partitions to which the consumer is currently subscribed. The event is completed when the
* {@link FetchRequestManager} has finished <em>creating</em> (i.e. not enqueuing, sending, or receiving)
* fetch requests (if any) to send to the broker nodes.
*/
public class CreateFetchRequestsEvent extends CompletableApplicationEvent<Void> {
public CreateFetchRequestsEvent(final long deadlineMs) {
super(Type.CREATE_FETCH_REQUESTS, deadlineMs);
}
}

View File

@ -2566,11 +2566,6 @@ public class KafkaConsumerTest {
consumer.assign(singleton(tp0));
consumer.seek(tp0, 50L);
// For AsyncKafkaConsumer, FetchRequestManager sends FetchRequest in background thread.
// Wait for the first fetch request to avoid ListOffsetResponse mismatch.
TestUtils.waitForCondition(() -> groupProtocol == GroupProtocol.CLASSIC || requestGenerated(client, ApiKeys.FETCH),
"No fetch request sent");
client.prepareResponse(request -> request instanceof ListOffsetsRequest, listOffsetsResponse(singletonMap(tp0, 90L)));
assertEquals(singletonMap(tp0, 90L), consumer.endOffsets(Collections.singleton(tp0)));
// correct lag result should be returned as well

View File

@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableApplication
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
@ -1711,6 +1712,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singletonList("topic1"));
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
}
private Properties requiredConsumerConfigAndGroupId(final String groupId) {

View File

@ -40,6 +40,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
@ -120,6 +121,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -134,8 +136,10 @@ import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -237,8 +241,12 @@ public class FetchRequestManagerTest {
}
private int sendFetches() {
return sendFetches(true);
}
private int sendFetches(boolean requestFetch) {
offsetFetcher.validatePositionsOnMetadataChange();
return fetcher.sendFetches();
return fetcher.sendFetches(requestFetch);
}
@Test
@ -3386,6 +3394,71 @@ public class FetchRequestManagerTest {
assertTrue(subscriptions.isFetchable(tp1));
}
@Test
public void testPollWithoutCreateFetchRequests() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
assertEquals(0, sendFetches(false));
}
@Test
public void testPollWithCreateFetchRequests() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
CompletableFuture<Void> future = fetcher.createFetchRequests();
assertNotNull(future);
assertFalse(future.isDone());
assertEquals(1, sendFetches(false));
assertTrue(future.isDone());
assertEquals(0, sendFetches(false));
}
@Test
public void testPollWithCreateFetchRequestsError() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
fetcher.setAuthenticationException(new AuthenticationException("Intentional error"));
CompletableFuture<Void> future = fetcher.createFetchRequests();
assertNotNull(future);
assertFalse(future.isDone());
assertDoesNotThrow(() -> sendFetches(false));
assertFutureThrows(future, AuthenticationException.class);
}
@Test
public void testPollWithRedundantCreateFetchRequests() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
CompletableFuture<Void> future = fetcher.createFetchRequests();
assertNotNull(future);
futures.add(future);
}
assertEquals(0, futures.stream().filter(CompletableFuture::isDone).count());
assertEquals(1, sendFetches(false));
assertEquals(futures.size(), futures.stream().filter(CompletableFuture::isDone).count());
}
private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(
TopicPartition topicPartition,
Errors error,
@ -3639,6 +3712,7 @@ public class FetchRequestManagerTest {
private class TestableFetchRequestManager<K, V> extends FetchRequestManager {
private final FetchCollector<K, V> fetchCollector;
private AuthenticationException authenticationException;
public TestableFetchRequestManager(LogContext logContext,
Time time,
@ -3654,11 +3728,37 @@ public class FetchRequestManagerTest {
this.fetchCollector = fetchCollector;
}
public void setAuthenticationException(AuthenticationException authenticationException) {
this.authenticationException = authenticationException;
}
@Override
protected boolean isUnavailable(Node node) {
if (authenticationException != null)
return true;
return super.isUnavailable(node);
}
@Override
protected void maybeThrowAuthFailure(Node node) {
if (authenticationException != null) {
AuthenticationException e = authenticationException;
authenticationException = null;
throw e;
}
super.maybeThrowAuthFailure(node);
}
private Fetch<K, V> collectFetch() {
return fetchCollector.collectFetch(fetchBuffer);
}
private int sendFetches() {
private int sendFetches(boolean requestFetch) {
if (requestFetch)
createFetchRequests();
NetworkClientDelegate.PollResult pollResult = poll(time.milliseconds());
networkClientDelegate.addAll(pollResult.unsentRequests);
return pollResult.unsentRequests.size();

View File

@ -128,6 +128,8 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),