Refactoring

This commit is contained in:
Kirk True 2025-10-03 16:35:46 -07:00
parent 461ffdd9b0
commit 9fb9ee9e6e
9 changed files with 113 additions and 72 deletions

View File

@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiable;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
@ -191,13 +192,17 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
for (ApplicationEvent event : events) {
asyncConsumerMetrics.recordApplicationEventQueueTime(time.milliseconds() - event.enqueuedMs());
try {
if (event instanceof CompletableEvent) {
if (event instanceof CompletableEvent)
applicationEventReaper.add((CompletableEvent<?>) event);
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
if (event instanceof MetadataErrorNotifiable) {
if (maybeFailOnMetadataError((MetadataErrorNotifiable) event))
continue;
}
applicationEventProcessor.process(event);
} catch (Throwable t) {
log.warn("Error processing event {}", t.getMessage(), t);
@ -369,17 +374,37 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>();
List<MetadataErrorNotifiable> notifiables = new ArrayList<>();
for (CompletableEvent<?> ce : events) {
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
if (ce instanceof MetadataErrorNotifiable) {
notifiables.add((MetadataErrorNotifiable) ce);
}
}
if (subscriptionMetadataEvent.isEmpty())
if (notifiables.isEmpty())
return;
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
);
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return;
Exception metadataError = metadataErrorOpt.get();
notifiables.forEach(n -> n.metadataError(metadataError));
}
/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private boolean maybeFailOnMetadataError(MetadataErrorNotifiable notifiable) {
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return false;
Exception metadataError = metadataErrorOpt.get();
notifiable.metadataError(metadataError);
return true;
}
}

View File

@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements MetadataErrorNotifiable {
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs);
}
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void metadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
}
}

View File

@ -732,7 +732,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
AsyncPollEventProcessorContext context = asyncPollContext.orElseThrow(IllegalArgumentException::new);
ApplicationEvent.Type nextEventType = event.startingEventType();
if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType))
if (context.maybeCompleteWithCallbackRequired(event, nextEventType))
return;
if (nextEventType == ApplicationEvent.Type.ASYNC_POLL) {
@ -759,7 +759,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType))
if (context.maybeCompleteWithCallbackRequired(event, nextEventType))
return;
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import java.time.Duration;
@ -58,7 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
* application thread. The background thread is able to detect when it needs to complete processing so that the
* application thread can execute the awaiting callbacks.
*/
public class AsyncPollEvent extends ApplicationEvent {
public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiable {
public enum State {
@ -201,6 +202,11 @@ public class AsyncPollEvent extends ApplicationEvent {
result.compareAndSet(Result.STARTED, r);
}
@Override
public void metadataError(Exception metadataException) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
}
@Override
protected String toStringBase() {
return super.toStringBase() +

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
@ -30,7 +31,7 @@ import java.time.Duration;
* The event completes with a boolean indicating if all assigned partitions have valid fetch positions
* (based on {@link SubscriptionState#hasAllFetchPositions()}).
*/
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> {
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiable {
public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
/**
* Indicates that this event requires subscription metadata to be present
* for its execution. This is used to ensure that metadata errors are
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll}
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process.
* handled correctly during the {@link Consumer#poll(Duration) poll}
* or {@link Consumer#position(TopicPartition) position} process.
*/
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void metadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
}
}

View File

@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
protected String toStringBase() {
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
}
public boolean requireSubscriptionMetadata() {
return false;
}
}

View File

@ -32,7 +32,7 @@ import java.util.Map;
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
* or equals to the target timestamp)
*/
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> implements MetadataErrorNotifiable {
private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps;
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
}
@Override
public boolean requireSubscriptionMetadata() {
return true;
public void metadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
}
@Override

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
/**
* This interface is used for events that need to be notified when the
* {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
*/
public interface MetadataErrorNotifiable {
void metadataError(Exception metadataException);
}

View File

@ -1063,7 +1063,7 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(value = GroupProtocol.class)
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException {
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
@ -1079,15 +1079,14 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId, false);
consumer.assign(List.of(tp0));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException
assertPollEventuallyThrows(consumer, NoOffsetForPartitionException.class,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
} else {
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
}
// Consumer.poll(0) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException.
ConsumerPollTestUtils.waitForException(
consumer,
NoOffsetForPartitionException.class::isInstance,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
);
}
@ParameterizedTest
@ -2267,19 +2266,18 @@ public class KafkaConsumerTest {
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws InterruptedException {
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
consumer.subscribe(Set.of(topic));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class,
"this consumer was not able to discover metadata errors during continuous polling.");
} else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
}
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException.
ConsumerPollTestUtils.waitForException(
consumer,
AuthenticationException.class::isInstance,
"this consumer was not able to discover metadata errors during continuous polling."
);
}
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
@ -3194,27 +3192,14 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(Set.of(invalidTopicName), getConsumerRebalanceListener(consumer));
if (groupProtocol == GroupProtocol.CONSUMER) {
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is invalid topics and then
// throw the InvalidTopicException
assertPollEventuallyThrows(consumer, InvalidTopicException.class,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
} else {
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
}
}
private static <T extends Throwable> void assertPollEventuallyThrows(KafkaConsumer<?, ?> consumer,
Class<T> expectedException, String errMsg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (Throwable exception) {
return expectedException.isInstance(exception);
}
}, errMsg);
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is invalid topics and then
// throw the InvalidTopicException.
ConsumerPollTestUtils.waitForException(
consumer,
InvalidTopicException.class::isInstance,
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
);
}
@ParameterizedTest