Compare commits

...

5 Commits

Author SHA1 Message Date
Kirk True f660b15e52 Refactor regex subscription evaluation 2025-10-07 14:21:30 -07:00
Kirk True 5215030a4c Update AsyncKafkaConsumer.java 2025-10-07 08:57:08 -07:00
Kirk True 68f5bc19d3 Whitespace change 2025-10-07 08:54:06 -07:00
Kirk True 2e5a982c1d Revert change to AsyncKafkaConsumer 2025-10-07 08:52:34 -07:00
Kirk True 4985c7de17 Updates for MetadataErrorNotifiableEvent 2025-10-07 08:52:21 -07:00
9 changed files with 69 additions and 69 deletions

View File

@ -43,7 +43,6 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableList;
@ -488,16 +487,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
*/
public void onConsumerPoll() {
if (subscriptions.hasPatternSubscription()) {
final Set<String> topicsToSubscribe = metadata.fetch().topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
metadata.requestUpdateForNewTopics();
}
subscriptionUpdated.compareAndSet(false, true);
}
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
transitionToJoining();
}

View File

@ -1827,7 +1827,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return fetch;
}
log.trace("Polling for fetches with timeout {} and {}", pollTimeout, timer.remainingMs());
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
if (pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
log.trace("Polling for fetches with timeout {}", pollTimeout);
Timer pollTimer = time.timer(pollTimeout);
wakeupTrigger.setFetchAction(fetchBuffer);

View File

@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
if (event instanceof MetadataErrorNotifiableEvent) {
if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event))
if (maybeFailOnMetadataError(List.of(event)))
continue;
}
applicationEventProcessor.process(event);
@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<MetadataErrorNotifiableEvent> notifiables = new ArrayList<>();
private boolean maybeFailOnMetadataError(List<?> events) {
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
for (CompletableEvent<?> ce : events) {
if (ce instanceof MetadataErrorNotifiableEvent) {
notifiables.add((MetadataErrorNotifiableEvent) ce);
for (Object obj : events) {
if (obj instanceof MetadataErrorNotifiableEvent) {
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
}
}
if (notifiables.isEmpty())
return;
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return;
Exception metadataError = metadataErrorOpt.get();
notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError));
}
/**
* If there is a metadata error, complete this event exceptionally.
*/
private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) {
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
// Don't get-and-clear the metadata error if there are no events that will be notified.
if (filteredEvents.isEmpty())
return false;
Exception metadataError = metadataErrorOpt.get();
notifiable.completeExceptionallyWithMetadataError(metadataError);
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
if (andClearMetadataError.isPresent()) {
Exception metadataError = andClearMetadataError.get();
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
return true;
} else {
return false;
}
}
}

View File

@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -20,12 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@ -228,7 +231,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
ShareMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
@ -337,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (subscriptions.subscribe(event.topics(), event.listener())) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
}
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -360,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try {
subscriptions.subscribe(event.pattern(), event.listener());
metadata.requestUpdateForNewTopics();
updatePatternSubscription(metadata.fetch());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
});
event.future().complete(null);
} catch (Exception e) {
event.future().completeExceptionally(e);
@ -394,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
event.future().complete(null);
}
@ -724,11 +726,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
ConsumerMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
StreamsMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
@ -807,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
};
}
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(callback, metadata.fetch());
}
}
/**
* This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates
@ -814,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
*
* @param cluster Cluster from which we get the topics
*/
private void updatePatternSubscription(Cluster cluster) {
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn("Group membership manager not present when processing a subscribe event");
return;
}
private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
}
// Join the group if not already part of it, or just send the updated subscription
// to the broker on the next poll. Note that this is done even if no topics matched
// the regex, to ensure the member joins the group if needed (with empty subscription).
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
callback.onSubscriptionUpdated();
}
// Visible for testing
int metadataVersionSnapshot() {
return metadataVersionSnapshot;
}
private interface OnSubscriptionUpdatedCallback {
void onSubscriptionUpdated();
}
}

View File

@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
public void onMetadataError(Exception metadataError) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
}
@Override

View File

@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
* or {@link Consumer#position(TopicPartition) position} process.
*/
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
}

View File

@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
}
@Override
public void completeExceptionallyWithMetadataError(Exception metadataException) {
future().completeExceptionally(metadataException);
public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataError);
}
@Override

View File

@ -50,7 +50,7 @@ public interface MetadataErrorNotifiableEvent {
* </li>
* </ul>
*
* @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()}
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
*/
void completeExceptionallyWithMetadataError(Exception metadataException);
void onMetadataError(Exception metadataError);
}