This commit is contained in:
Kirk True 2025-10-07 21:21:36 +00:00 committed by GitHub
commit 4305cfd51f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 833 additions and 376 deletions

View File

@ -122,7 +122,7 @@ public class ConsumerIntegrationTest {
}
});
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1,
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1,
5000,
"failed to poll data");
}
@ -266,10 +266,11 @@ public class ConsumerIntegrationTest {
consumer1.subscribe(List.of(topic));
consumer2.subscribe(List.of(topic));
Duration pollTimeout = Duration.ofMillis(100);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
consumer0.poll(pollTimeout);
consumer1.poll(pollTimeout);
consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().isEmpty() &&
consumer2.assignment().isEmpty();
@ -284,9 +285,9 @@ public class ConsumerIntegrationTest {
);
clusterInstance.waitTopicCreation(topic, 3);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
consumer0.poll(pollTimeout);
consumer1.poll(pollTimeout);
consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().isEmpty();
@ -301,9 +302,9 @@ public class ConsumerIntegrationTest {
);
clusterInstance.waitTopicCreation(topic, 6);
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
consumer0.poll(pollTimeout);
consumer1.poll(pollTimeout);
consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
@ -325,9 +326,9 @@ public class ConsumerIntegrationTest {
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
)).all().get();
TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000));
consumer1.poll(Duration.ofMillis(1000));
consumer2.poll(Duration.ofMillis(1000));
consumer0.poll(pollTimeout);
consumer1.poll(pollTimeout);
consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));

View File

@ -283,11 +283,13 @@ public class PlaintextConsumerCommitTest {
// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close.
// However, in the CONSUMER protocol, the assignment may be changed outside a poll, so
// we need to poll once to ensure the interceptor is called.
if (groupProtocol == GroupProtocol.CONSUMER) {
consumer.poll(Duration.ZERO);
}
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance);
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance;
},
"Consumer.poll() did not invoke onCommit() before timeout elapse"
);
// verify commits are intercepted on close
var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();

View File

@ -66,7 +66,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
@ -810,7 +809,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist.
@ -877,7 +876,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist.
@ -944,7 +943,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -958,7 +957,7 @@ public class PlaintextConsumerTest {
assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count());
consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2);
ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)));
}
}
@ -999,7 +998,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -1014,7 +1013,7 @@ public class PlaintextConsumerTest {
var expectedLag = numMessages - records.count();
assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag);
consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2);
ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags)));
assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)));
}
@ -1058,7 +1057,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
awaitNonEmptyRecords(consumer, TP);
ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -1203,12 +1202,21 @@ public class PlaintextConsumerTest {
consumer3.assign(List.of(TP));
consumer3.seek(TP, 1);
var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count();
TestUtils.waitForCondition(
() -> consumer1.poll(Duration.ofMillis(5000)).count() == 3,
"consumer1 did not consume from earliest offset"
);
assertThrows(InvalidGroupIdException.class, consumer1::commitSync);
assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP)));
var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count();
var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count();
TestUtils.waitForCondition(
() -> consumer2.poll(Duration.ofMillis(5000)).count() == 0,
"Expected consumer2 to consume from latest offset"
);
TestUtils.waitForCondition(
() -> consumer3.poll(Duration.ofMillis(5000)).count() == 2,
"Expected consumer3 to consume from offset 1"
);
consumer1.unsubscribe();
consumer2.unsubscribe();
@ -1217,14 +1225,6 @@ public class PlaintextConsumerTest {
assertTrue(consumer1.assignment().isEmpty());
assertTrue(consumer2.assignment().isEmpty());
assertTrue(consumer3.assignment().isEmpty());
consumer1.close();
consumer2.close();
consumer3.close();
assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset");
assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset");
assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1");
}
}
@ -1654,7 +1654,7 @@ public class PlaintextConsumerTest {
consumer.subscribe(List.of(testTopic));
// This is here to allow the consumer time to settle the group membership/assignment.
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
ConsumerPollTestUtils.waitForRecords(consumer);
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
// exceed the delay threshold defined above.
@ -1674,24 +1674,6 @@ public class PlaintextConsumerTest {
}
}
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp
) throws Exception {
AtomicReference<ConsumerRecords<byte[], byte[]>> result = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
var polledRecords = consumer.poll(Duration.ofSeconds(10));
boolean hasRecords = !polledRecords.isEmpty();
if (hasRecords) {
result.set(polledRecords);
}
return hasRecords;
}, "Timed out waiting for non-empty records from topic " + tp.topic() + " partition " + tp.partition());
return result.get();
}
public static class SerializerImpl implements Serializer<byte[]> {
private final ByteArraySerializer serializer = new ByteArraySerializer();

View File

@ -41,6 +41,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
@ -59,7 +60,6 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@ -325,8 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
private boolean cachedSubscriptionHasAllFetchPositions;
private AsyncPollEvent inflightPoll;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
@ -464,7 +463,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
subscriptions,
requestManagersSupplier);
requestManagersSupplier
);
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
time,
@ -623,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
new RebalanceCallbackMetricsManager(metrics)
);
ApiVersions apiVersions = new ApiVersions();
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(
time,
config,
logContext,
@ -833,22 +833,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
// of the fetches. A wakeup between returned fetches and returning records would lead to never
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();
updateAssignmentMetadataIfNeeded(timer);
checkInflightPoll(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
@ -876,6 +867,71 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
}
/**
* {@code checkInflightPoll()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is
* called when no event is currently processing, it will start a new event processing asynchronously. A check
* is made during each invocation to see if the <em>inflight</em> event has completed. If it has, it will be
* processed accordingly.
*/
public void checkInflightPoll(Timer timer) {
boolean newlySubmittedEvent = false;
if (inflightPoll == null) {
inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), time.milliseconds());
newlySubmittedEvent = true;
if (log.isTraceEnabled()) {
log.trace(
"Submitting new inflight event {} with {} remaining on timer",
inflightPoll,
timer.remainingMs()
);
}
applicationEventHandler.add(inflightPoll);
}
try {
// Note: this is calling user-supplied code, so make sure that any errors thrown here are caught and
// the inflight event is cleared.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
if (inflightPoll.isComplete()) {
Optional<KafkaException> errorOpt = inflightPoll.error();
// The async poll event has completed, either successfully or not. In either case, clear out the
// inflight request.
log.trace("Inflight event {} completed, clearing", inflightPoll);
inflightPoll = null;
if (errorOpt.isPresent()) {
throw errorOpt.get();
}
} else if (!newlySubmittedEvent) {
if (timer.isExpired()) {
// The inflight event is expired...
log.trace("Inflight event {} expired without completing, clearing", inflightPoll);
inflightPoll = null;
} else {
if (log.isTraceEnabled()) {
log.trace(
"Inflight event {} is incomplete with {} remaining on timer",
inflightPoll,
timer.remainingMs()
);
}
}
}
} catch (Throwable t) {
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
// because the error effectively renders it complete.
log.debug("Inflight event {} failed due to {}, clearing", inflightPoll, String.valueOf(t));
inflightPoll = null;
throw ConsumerUtils.maybeWrapAsKafkaException(t);
}
}
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.
@ -1771,15 +1827,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return fetch;
}
// send any new fetches (won't resend pending fetches)
sendFetches(timer);
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
if (pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
@ -1809,19 +1859,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
* of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and
* the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch}
* for returning.
*
* <p/>
*
* This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
*/
private Fetch<K, V> collectFetch() {
final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
// Notify the network thread to wake up and start the next round of fetching.
applicationEventHandler.wakeupNetworkThread();
return fetch;
return fetchCollector.collectFetch(fetchBuffer);
}
/**
@ -1834,11 +1874,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
* defined
*/
private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHasAllFetchPositions = false;
try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) {
return false;
} finally {
@ -1856,41 +1895,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return groupMetadata.get().isPresent();
}
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*
* <p/>
*
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
* of the same name:
*
* <ul>
* <li>
* The method will wait for confirmation of the request creation before continuing.
* </li>
* <li>
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
* </li>
* <li>
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
* as it can handle requests that are created after the timeout.
* </li>
* </ul>
*
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
*/
private void sendFetches(Timer timer) {
try {
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException swallow) {
// Can be ignored, per above comments.
}
}
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread

View File

@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
@ -193,10 +194,13 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
try {
if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
}
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
if (event instanceof MetadataErrorNotifiableEvent) {
if (maybeFailOnMetadataError(List.of(event)))
continue;
}
applicationEventProcessor.process(event);
} catch (Throwable t) {
@ -368,18 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>();
private boolean maybeFailOnMetadataError(List<?> events) {
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
for (CompletableEvent<?> ce : events) {
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
for (Object obj : events) {
if (obj instanceof MetadataErrorNotifiableEvent) {
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
}
}
if (subscriptionMetadataEvent.isEmpty())
return;
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
);
// Don't get-and-clear the metadata error if there are no events that will be notified.
if (filteredEvents.isEmpty())
return false;
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
if (andClearMetadataError.isPresent()) {
Exception metadataError = andClearMetadataError.get();
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
return true;
} else {
return false;
}
}
}

View File

@ -210,6 +210,30 @@ public class NetworkClientDelegate implements AutoCloseable {
}
ClientRequest request = makeClientRequest(r, node, currentTimeMs);
if (!client.ready(node, currentTimeMs)) {
AuthenticationException authenticationException = client.authenticationException(node);
// The client may not be ready because it hit an unrecoverable authentication error. In that case, there's
// no benefit from retrying, so propagate the error here.
if (authenticationException != null) {
request.callback().onComplete(
new ClientResponse(
request.makeHeader(
request.requestBuilder().latestAllowedVersion()
),
request.callback(),
request.destination(),
request.createdTimeMs(),
currentTimeMs,
true,
null,
authenticationException,
null
)
);
return false;
}
// enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration
// of the event loop
log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r);
@ -471,4 +495,33 @@ public class NetworkClientDelegate implements AutoCloseable {
}
};
}
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
*/
public static Supplier<NetworkClientDelegate> supplier(final Time time,
final ConsumerConfig config,
final LogContext logContext,
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
return new CachedSupplier<>() {
@Override
protected NetworkClientDelegate create() {
return new NetworkClientDelegate(
time,
config,
logContext,
client,
metadata,
backgroundEventHandler,
notifyMetadataErrorsViaErrorQueue,
asyncConsumerMetrics
);
}
};
}
}

View File

@ -50,6 +50,16 @@ public class OffsetCommitCallbackInvoker {
}
}
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return callbackQueue.size();
}
public void enqueueUserCallbackInvocation(final OffsetCommitCallback callback,
final Map<TopicPartition, OffsetAndMetadata> offsets,
final Exception exception) {

View File

@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@ -384,7 +384,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
@ -583,7 +583,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
do {
// Make sure the network thread can tell the application is actively polling
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
processBackgroundEvents();

View File

@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements MetadataErrorNotifiableEvent {
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs);
}
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -28,14 +28,14 @@ import java.util.Objects;
public abstract class ApplicationEvent {
public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,

View File

@ -20,11 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@ -45,6 +49,7 @@ import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -53,6 +58,7 @@ import java.util.stream.Collectors;
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
* which processes {@link ApplicationEvent application events} generated by the application thread.
*/
@SuppressWarnings({"ClassFanOutComplexity"})
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {
private final Logger log;
@ -76,6 +82,14 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
@Override
public void process(ApplicationEvent event) {
switch (event.type()) {
case ASYNC_POLL:
process((AsyncPollEvent) event);
return;
case SHARE_POLL:
process((SharePollEvent) event);
return;
case COMMIT_ASYNC:
process((AsyncCommitEvent) event);
return;
@ -84,10 +98,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
process((SyncCommitEvent) event);
return;
case POLL:
process((PollEvent) event);
return;
case FETCH_COMMITTED_OFFSETS:
process((FetchCommittedOffsetsEvent) event);
return;
@ -217,35 +227,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
}
private void process(final PollEvent event) {
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts in the app thread
private void process(final SharePollEvent event) {
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
event.markReconcileAndAutoCommitComplete();
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
event.markReconcileAndAutoCommitComplete();
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
ShareMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
private void process(final CreateFetchRequestsEvent event) {
@ -352,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (subscriptions.subscribe(event.topics(), event.listener())) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
}
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -375,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try {
subscriptions.subscribe(event.pattern(), event.listener());
metadata.requestUpdateForNewTopics();
updatePatternSubscription(metadata.fetch());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
});
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -409,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
event.future().complete(null);
}
@ -726,6 +713,75 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
}
private void process(final AsyncPollEvent event) {
log.trace("Processing poll logic for {}", event);
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
StreamsMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
log.trace("Processing check and update positions logic for {}", event);
CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError))
return;
log.trace("Processing create fetch requests logic for {}", event);
// Create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, fetchError))
return;
event.completeSuccessfully();
log.trace("Completed event processing for {}", event);
});
});
}
/**
* If there's an error to report to the user, the current event will be completed and this method will
* return {@code true}. Otherwise, it will return {@code false}.
*/
private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) {
if (t == null)
return false;
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
return false;
}
if (t instanceof CompletionException) {
t = t.getCause();
}
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
event.completeExceptionally(e);
log.trace("Failing event processing for {}", event, e);
return true;
}
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
@ -757,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
};
}
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(callback, metadata.fetch());
}
}
/**
* This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates
@ -764,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
*
* @param cluster Cluster from which we get the topics
*/
private void updatePatternSubscription(Cluster cluster) {
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn("Group membership manager not present when processing a subscribe event");
return;
}
private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
}
// Join the group if not already part of it, or just send the updated subscription
// to the broker on the next poll. Note that this is done even if no topics matched
// the regex, to ensure the member joins the group if needed (with empty subscription).
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
callback.onSubscriptionUpdated();
}
// Visible for testing
int metadataVersionSnapshot() {
return metadataVersionSnapshot;
}
private interface OnSubscriptionUpdatedCallback {
void onSubscriptionUpdated();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import java.time.Duration;
import java.util.Optional;
/**
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
*
* <ul>
* <li>Polling</li>
* <li>{@link CheckAndUpdatePositionsEvent}</li>
* <li>{@link CreateFetchRequestsEvent}</li>
* </ul>
*
* {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a non-blocking design to ensure performance is
* at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event is submitted in {@code poll()}, but
* there are no blocking waits for the "result" of the event. Checks are made for the result at certain points, but
* they do not block. The logic for the previously-mentioned events is executed sequentially on the background thread.
*/
public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiableEvent {
private final long deadlineMs;
private final long pollTimeMs;
private volatile KafkaException error;
private volatile boolean isComplete;
/**
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
*
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
* {@link Duration} passed to {@link Consumer#poll(Duration)}
* @param pollTimeMs Time, in milliseconds, at which point the event was created
*/
public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
super(Type.ASYNC_POLL);
this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
}
public long deadlineMs() {
return deadlineMs;
}
public long pollTimeMs() {
return pollTimeMs;
}
public Optional<KafkaException> error() {
return Optional.ofNullable(error);
}
public boolean isComplete() {
return isComplete;
}
public void completeSuccessfully() {
isComplete = true;
}
public void completeExceptionally(KafkaException e) {
isComplete = true;
this.error = e;
}
@Override
public void onMetadataError(Exception metadataError) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
}
@Override
protected String toStringBase() {
return super.toStringBase() +
", deadlineMs=" + deadlineMs +
", pollTimeMs=" + pollTimeMs +
", error=" + error +
", isComplete=" + isComplete;
}
}

View File

@ -45,6 +45,16 @@ public class BackgroundEventHandler {
this.asyncConsumerMetrics = asyncConsumerMetrics;
}
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return backgroundEventQueue.size();
}
/**
* Add a {@link BackgroundEvent} to the handler.
*

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
@ -30,7 +31,7 @@ import java.time.Duration;
* The event completes with a boolean indicating if all assigned partitions have valid fetch positions
* (based on {@link SubscriptionState#hasAllFetchPositions()}).
*/
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> {
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
/**
* Indicates that this event requires subscription metadata to be present
* for its execution. This is used to ensure that metadata errors are
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll}
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process.
* handled correctly during the {@link Consumer#poll(Duration) poll}
* or {@link Consumer#position(TopicPartition) position} process.
*/
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
protected String toStringBase() {
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
}
public boolean requireSubscriptionMetadata() {
return false;
}
}

View File

@ -32,7 +32,7 @@ import java.util.Map;
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
* or equals to the target timestamp)
*/
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> implements MetadataErrorNotifiableEvent {
private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps;
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
}
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
@Override

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
/**
* This interface is used for events that need to be notified when the
* {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
*/
public interface MetadataErrorNotifiableEvent {
/**
* The background thread detects metadata errors on every call to {@link NetworkClientDelegate#poll(long, long)}.
* {@link NetworkClientDelegate} calls {@link Metadata#maybeThrowAnyException()} and stores the result.
* The presence of a metadata error is checked in the {@link ConsumerNetworkThread}'s loop by calling
* {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two places in the loop in which the
* metadata error is checked:
*
* <ul>
* <li>
* At the very top of the {@link ConsumerNetworkThread}'s loop, the {@link ApplicationEventHandler}'s
* queue is drained. Before processing each event via
* {@link ApplicationEventProcessor#process(ApplicationEvent)}, if a metadata error occurred, this method
* will be invoked on the event if it implements this interface.
* <p/>
* <em>Note</em>: for an event on which this method is invoked, it will <em>not</em> be passed to the
* {@link ApplicationEventProcessor#process(ApplicationEvent)} method.
* </li>
* <li>
* At the very bottom of the {@link ConsumerNetworkThread}'s loop, the {@link CompletableEventReaper}
* is executed and any outstanding event is returned. If a metadata error occurred, this method
* will be invoked on all unexpired events if it implements this interface.
* </li>
* </ul>
*
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
*/
void onMetadataError(Exception metadataError);
}

View File

@ -16,28 +16,12 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.concurrent.CompletableFuture;
public class PollEvent extends ApplicationEvent {
public class SharePollEvent extends ApplicationEvent {
private final long pollTimeMs;
/**
* A future that represents the completion of reconciliation and auto-commit
* processing.
* This future is completed when all commit request generation points have
* been passed, including:
* <ul>
* <li>auto-commit on rebalance</li>
* <li>auto-commit on the interval</li>
* </ul>
* Once completed, it signals that it's safe for the consumer to proceed with
* fetching new records.
*/
private final CompletableFuture<Void> reconcileAndAutoCommit = new CompletableFuture<>();
public PollEvent(final long pollTimeMs) {
super(Type.POLL);
public SharePollEvent(final long pollTimeMs) {
super(Type.SHARE_POLL);
this.pollTimeMs = pollTimeMs;
}
@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
return pollTimeMs;
}
public CompletableFuture<Void> reconcileAndAutoCommit() {
return reconcileAndAutoCommit;
}
public void markReconcileAndAutoCommitComplete() {
reconcileAndAutoCommit.complete(null);
}
@Override
public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
/**
* This class provides utilities for tests to wait for a call to {@link Consumer#poll(Duration)} to produce a
* result (error, records, specific condition, etc.). This is mostly due to the subtle difference in behavior
* of the non-blocking {@link AsyncKafkaConsumer}. A single pass of {@link AsyncKafkaConsumer#poll(Duration)}
* may not be sufficient to provide an immediate result.
*/
public class ConsumerPollTestUtils {
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} to return records from the given {@link Consumer}.
*/
public static <T> ConsumerRecords<T, T> waitForRecords(Consumer<?, ?> consumer) {
Timer timer = Time.SYSTEM.timer(DEFAULT_MAX_WAIT_MS);
while (timer.notExpired()) {
@SuppressWarnings("unchecked")
ConsumerRecords<T, T> records = (ConsumerRecords<T, T>) consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty())
return records;
timer.update();
}
throw new TimeoutException("no records to return");
}
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to produce the side effect
* that causes {@link Supplier condition} to evaluate to {@code true}.
*/
public static void waitForCondition(Consumer<?, ?> consumer,
Supplier<Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return testCondition.get();
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to throw an exception that,
* when tested against the {@link Function condition}, will evaluate to {@code true}.
*/
public static void waitForException(Consumer<?, ?> consumer,
Function<Throwable, Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (Throwable t) {
return testCondition.apply(t);
}
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
}

View File

@ -935,7 +935,6 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@ -951,7 +950,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 50L)));
client.prepareResponse(fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -1045,8 +1044,7 @@ public class KafkaConsumerTest {
}, fetchResponse(tp0, 50L, 5));
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(5, records.count());
assertEquals(Set.of(tp0), records.partitions());
assertEquals(1, records.nextOffsets().size());
@ -1065,7 +1063,7 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException {
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@ -1081,15 +1079,14 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId, false);
consumer.assign(List.of(tp0));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException
assertPollEventuallyThrows(consumer, NoOffsetForPartitionException.class,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
} else {
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
}
// Consumer.poll(0) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException.
ConsumerPollTestUtils.waitForException(
consumer,
NoOffsetForPartitionException.class::isInstance,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
);
}
@ParameterizedTest
@ -1731,7 +1728,6 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@ -1766,7 +1762,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
@ -1825,8 +1821,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1));
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -2121,7 +2116,7 @@ public class KafkaConsumerTest {
time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertFalse(records.isEmpty());
assertFalse(records.nextOffsets().isEmpty());
}
@ -2271,19 +2266,18 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws InterruptedException {
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
consumer.subscribe(Set.of(topic));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
"this consumer was not able to discover metadata errors during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException.
ConsumerPollTestUtils.waitForException(
consumer,
AuthenticationException.class::isInstance,
"this consumer was not able to discover metadata errors during continuous polling."
);
}
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
@ -2655,7 +2649,6 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata);
@ -2670,9 +2663,11 @@ public class KafkaConsumerTest {
consumer.assign(Set.of(tp0));
// poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0));
TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent");
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent"
);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
// no error for no current position
@ -2685,13 +2680,12 @@ public class KafkaConsumerTest {
}
// poll once again, which should send the list-offset request
consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0));
// requests: list-offset, fetch
TestUtils.waitForCondition(() -> {
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
return hasListOffsetRequest && hasFetchRequest;
}, "No list-offset & fetch request sent");
// requests: list-offset
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
"No list-offset sent"
);
// no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
@ -2700,7 +2694,12 @@ public class KafkaConsumerTest {
// and hence next call would return correct lag result
ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(tp0, 90L)));
consumer.poll(Duration.ofMillis(0));
// requests: fetch
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> requestGenerated(client, ApiKeys.FETCH),
"No fetch sent"
);
// For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated.
TestUtils.waitForCondition(() -> {
@ -2715,7 +2714,7 @@ public class KafkaConsumerTest {
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
client.respondToRequest(fetchRequest, fetchResponse(Map.of(tp0, fetchInfo)));
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());
@ -3194,27 +3193,14 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(Set.of(invalidTopicName), getConsumerRebalanceListener(consumer));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is invalid topics and then
// throw the InvalidTopicException
assertPollEventuallyThrows(consumer, InvalidTopicException.class,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
} else {
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
}
}
private static <T extends Throwable> void assertPollEventuallyThrows(KafkaConsumer<?, ?> consumer,
Class<T> expectedException, String errMsg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (Throwable exception) {
return expectedException.isInstance(exception);
}
}, errMsg);
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is invalid topics and then
// throw the InvalidTopicException.
ConsumerPollTestUtils.waitForException(
consumer,
InvalidTopicException.class::isInstance,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
);
}
@ParameterizedTest
@ -3654,7 +3640,11 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
try {
TimeUnit.SECONDS.sleep(1);
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ZERO));
ConsumerPollTestUtils.waitForException(
consumer,
t -> t instanceof ConcurrentModificationException,
"Consumer did not throw ConcurrentModificationException within timeout"
);
client.wakeup();
consumer.wakeup();
} finally {

View File

@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
asyncConsumerMetrics
)) {
// add event
applicationEventHandler.add(new PollEvent(time.milliseconds()));
applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds()));
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPollTestUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
@ -43,13 +45,11 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableApplication
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@ -112,6 +112,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@ -154,6 +155,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@ -424,7 +426,7 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -444,7 +446,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -468,7 +470,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
@ -505,9 +507,12 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
assertTrue(callbackExecuted.get());
completeAsyncPollEventSuccessfully();
ConsumerPollTestUtils.waitForCondition(
consumer,
callbackExecuted::get,
"Consumer.poll() did not execute callback within timeout"
);
}
@Test
@ -527,7 +532,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -673,8 +678,12 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent();
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
completeAsyncPollEventSuccessfully();
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> callback.invoked == 1 && callback.exception == null,
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
);
}
@Test
@ -1455,7 +1464,7 @@ public class AsyncKafkaConsumerTest {
int expectedRevokedCount,
int expectedAssignedCount,
int expectedLostCount,
Optional<RuntimeException> expectedException
Optional<RuntimeException> expectedExceptionOpt
) {
consumer = newConsumer();
CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(
@ -1473,13 +1482,18 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(e);
}
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
// This will trigger the background event queue to process our background event message.
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
if (expectedException.isPresent()) {
Exception exception = assertThrows(expectedException.get().getClass(), () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException.get().getMessage(), exception.getMessage());
assertEquals(expectedException.get().getCause(), exception.getCause());
if (expectedExceptionOpt.isPresent()) {
Exception expectedException = expectedExceptionOpt.get();
ConsumerPollTestUtils.waitForException(
consumer,
t -> Objects.equals(t.getClass(), expectedException.getClass()) &&
Objects.equals(t.getMessage(), expectedException.getMessage()) &&
Objects.equals(t.getCause(), expectedException.getCause()),
"Consumer.poll() did not throw the expected exception " + expectedException
);
} else {
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -1543,10 +1557,12 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage());
completeAsyncPollEventSuccessfully();
ConsumerPollTestUtils.waitForException(
consumer,
t -> t.getMessage().equals(expectedException.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
);
}
@Test
@ -1562,10 +1578,12 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent2);
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage());
completeAsyncPollEventSuccessfully();
ConsumerPollTestUtils.waitForException(
consumer,
t -> t.getMessage().equals(expectedException1.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
);
assertTrue(backgroundEventQueue.isEmpty());
}
@ -1645,10 +1663,9 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class));
}
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
@ -1664,11 +1681,8 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1)));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
}
@Test
@ -1701,7 +1715,7 @@ public class AsyncKafkaConsumerTest {
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
// And then poll for up to 10000ms, which should return 2 records without timing out
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count());
@ -1805,7 +1819,7 @@ public class AsyncKafkaConsumerTest {
// interrupt the thread and call poll
try {
Thread.currentThread().interrupt();
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
} finally {
// clear interrupted state again since this thread may be reused by JUnit
@ -1837,8 +1851,13 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
consumer.poll(Duration.ZERO);
completeAsyncPollEventSuccessfully();
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> backgroundEventReaper.size() == 0,
"Consumer.poll() did not reap background events within timeout"
);
verify(backgroundEventReaper).reap(time.milliseconds());
}
@ -1900,7 +1919,7 @@ public class AsyncKafkaConsumerTest {
completeUnsubscribeApplicationEventSuccessfully();
consumer.assign(singleton(new TopicPartition("topic1", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@ -1908,7 +1927,6 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Pattern.compile("t*"));
consumer.poll(Duration.ZERO);
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
}
@Test
@ -2275,11 +2293,11 @@ public class AsyncKafkaConsumerTest {
}
}
private void markReconcileAndAutoCommitCompleteForPollEvent() {
private void completeAsyncPollEventSuccessfully() {
doAnswer(invocation -> {
PollEvent event = invocation.getArgument(0);
event.markReconcileAndAutoCommitComplete();
AsyncPollEvent event = invocation.getArgument(0);
event.completeSuccessfully();
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
}
}

View File

@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics;
@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
)) {
consumerNetworkThread.initializeResources();
PollEvent event = new PollEvent(0);
AsyncPollEvent event = new AsyncPollEvent(10, 0);
event.setEnqueuedMs(time.milliseconds());
applicationEventQueue.add(event);
asyncConsumerMetrics.recordApplicationEventQueueSize(1);

View File

@ -24,11 +24,11 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@ -680,7 +680,7 @@ public class ShareConsumerImplTest {
consumer.subscribe(subscriptionTopic);
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class));
verify(applicationEventHandler).add(any(SharePollEvent.class));
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();

View File

@ -171,7 +171,7 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() {
return Stream.of(
Arguments.of(new PollEvent(100)),
Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)),
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
@ -265,12 +265,12 @@ public class ApplicationEventProcessorTest {
@Test
public void testPollEvent() {
PollEvent event = new PollEvent(12345);
AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(new CompletableFuture<>());
processor.process(event);
assertTrue(event.reconcileAndAutoCommit().isDone());
verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
verify(membershipManager).onConsumerPoll();
verify(heartbeatRequestManager).resetPollTimer(12345);

View File

@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val consumer = createConsumer()
consumer.assign(java.util.List.of(tp))
assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)

View File

@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig}
@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try {
consumer.assign(util.Set.of(tp))
consumer.seekToBeginning(util.Set.of(tp))
val records = consumer.poll(time.Duration.ofSeconds(3))
assertEquals(expectedNumber, records.count())
def verifyRecordCount(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
expectedNumber == records.count()
}
TestUtils.pollRecordsUntilTrue(
consumer,
verifyRecordCount,
s"Consumer.poll() did not return the expected number of records ($expectedNumber) within the timeout",
pollTimeoutMs = 3000
)
} finally consumer.close()
}
@ -4585,7 +4592,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4594,18 +4603,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
var counter = 0
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
TestUtils.waitUntilTrue(() => {
@ -4645,7 +4664,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4654,18 +4675,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
var counter = 0
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
@ -4722,7 +4753,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4731,18 +4764,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic")
}
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
var counter = 0
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets

View File

@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
}
private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = {
verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000)))
val startMs = System.currentTimeMillis
TestUtils.pollUntilException(
consumer,
_ => true,
s"Consumer.poll() did not throw an exception within the timeout",
pollTimeoutMs = 1000
)
val elapsedMs = System.currentTimeMillis - startMs
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
verifyAuthenticationException(consumer.partitionsFor(topic))
createClientCredential()
val producer = createProducer()
verifyWithRetry(sendOneRecord(producer))()
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
TestUtils.waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(1000)).count() == 1
} catch {
case _:Throwable => false
}
}, msg = s"Consumer.poll() did not read the expected number of records within the timeout")
}
@Test

View File

@ -19,7 +19,6 @@
package kafka.server
import java.net.InetSocketAddress
import java.time.Duration
import java.util.Properties
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import javax.security.auth.login.LoginContext
@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
consumer.assign(java.util.List.of(tp))
val startMs = System.currentTimeMillis()
assertThrows(classOf[SaslAuthenticationException], () => consumer.poll(Duration.ofMillis(50)))
TestUtils.pollUntilException(
consumer,
t => t.isInstanceOf[SaslAuthenticationException],
"Consumer.poll() did not trigger a SaslAuthenticationException within timeout",
pollTimeoutMs = 50
)
val endMs = System.currentTimeMillis()
require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client")
consumer.close()

View File

@ -690,6 +690,21 @@ object TestUtils extends Logging {
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
}
def pollUntilException(consumer: Consumer[_, _],
action: Throwable => Boolean,
msg: => String,
waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
pollTimeoutMs: Long = 100): Unit = {
waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(pollTimeoutMs))
false
} catch {
case t: Throwable => action(t)
}
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
}
def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
action: ConsumerRecords[K, V] => Boolean,
msg: => String,