Compare commits

...

5 Commits

Author SHA1 Message Date
Kirk True f660b15e52 Refactor regex subscription evaluation 2025-10-07 14:21:30 -07:00
Kirk True 5215030a4c Update AsyncKafkaConsumer.java 2025-10-07 08:57:08 -07:00
Kirk True 68f5bc19d3 Whitespace change 2025-10-07 08:54:06 -07:00
Kirk True 2e5a982c1d Revert change to AsyncKafkaConsumer 2025-10-07 08:52:34 -07:00
Kirk True 4985c7de17 Updates for MetadataErrorNotifiableEvent 2025-10-07 08:52:21 -07:00
9 changed files with 69 additions and 69 deletions

View File

@ -43,7 +43,6 @@ import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableList;
@ -488,16 +487,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}" * active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
*/ */
public void onConsumerPoll() { public void onConsumerPoll() {
if (subscriptions.hasPatternSubscription()) {
final Set<String> topicsToSubscribe = metadata.fetch().topics().stream()
.filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
metadata.requestUpdateForNewTopics();
}
subscriptionUpdated.compareAndSet(false, true);
}
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) { if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
transitionToJoining(); transitionToJoining();
} }

View File

@ -1827,7 +1827,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return fetch; return fetch;
} }
log.trace("Polling for fetches with timeout {} and {}", pollTimeout, timer.remainingMs()); // We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
if (pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
log.trace("Polling for fetches with timeout {}", pollTimeout);
Timer pollTimer = time.timer(pollTimeout); Timer pollTimer = time.timer(pollTimeout);
wakeupTrigger.setFetchAction(fetchBuffer); wakeupTrigger.setFetchAction(fetchBuffer);

View File

@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
// This call is meant to handle "immediately completed events" which may not enter the awaiting state, // This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away. // so metadata errors need to be checked and handled right away.
if (event instanceof MetadataErrorNotifiableEvent) { if (event instanceof MetadataErrorNotifiableEvent) {
if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event)) if (maybeFailOnMetadataError(List.of(event)))
continue; continue;
} }
applicationEventProcessor.process(event); applicationEventProcessor.process(event);
@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
/** /**
* If there is a metadata error, complete all uncompleted events that require subscription metadata. * If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/ */
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { private boolean maybeFailOnMetadataError(List<?> events) {
List<MetadataErrorNotifiableEvent> notifiables = new ArrayList<>(); List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
for (CompletableEvent<?> ce : events) { for (Object obj : events) {
if (ce instanceof MetadataErrorNotifiableEvent) { if (obj instanceof MetadataErrorNotifiableEvent) {
notifiables.add((MetadataErrorNotifiableEvent) ce); filteredEvents.add((MetadataErrorNotifiableEvent) obj);
} }
} }
if (notifiables.isEmpty()) // Don't get-and-clear the metadata error if there are no events that will be notified.
return; if (filteredEvents.isEmpty())
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return;
Exception metadataError = metadataErrorOpt.get();
notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError));
}
/**
* If there is a metadata error, complete this event exceptionally.
*/
private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) {
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
if (metadataErrorOpt.isEmpty())
return false; return false;
Exception metadataError = metadataErrorOpt.get(); Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
notifiable.completeExceptionallyWithMetadataError(metadataError);
return true; if (andClearMetadataError.isPresent()) {
Exception metadataError = andClearMetadataError.get();
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
return true;
} else {
return false;
}
} }
} }

View File

@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
} }
@Override @Override
public void completeExceptionallyWithMetadataError(Exception metadataException) { public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataException); future().completeExceptionally(metadataError);
} }
} }

View File

@ -20,12 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements; import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.IsolationLevel;
@ -228,7 +231,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true)); consumerMembershipManager.maybeReconcile(true));
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll(); ShareMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs()); hrm.resetPollTimer(event.pollTimeMs());
}); });
} }
@ -337,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (subscriptions.subscribe(event.topics(), event.listener())) { if (subscriptions.subscribe(event.topics(), event.listener())) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
} }
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated(); requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
event.future().complete(null); event.future().complete(null);
} catch (Exception e) { } catch (Exception e) {
event.future().completeExceptionally(e); event.future().completeExceptionally(e);
@ -360,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try { try {
subscriptions.subscribe(event.pattern(), event.listener()); subscriptions.subscribe(event.pattern(), event.listener());
metadata.requestUpdateForNewTopics(); metadata.requestUpdateForNewTopics();
updatePatternSubscription(metadata.fetch()); requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
});
event.future().complete(null); event.future().complete(null);
} catch (Exception e) { } catch (Exception e) {
event.future().completeExceptionally(e); event.future().completeExceptionally(e);
@ -394,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next poll. * This will make the consumer send the updated subscription on the next poll.
*/ */
private void process(final UpdatePatternSubscriptionEvent event) { private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) { requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
event.future().complete(null); event.future().complete(null);
} }
@ -724,11 +726,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll(); ConsumerMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs()); hrm.resetPollTimer(event.pollTimeMs());
}); });
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll(); StreamsMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs()); hrm.resetPollTimer(event.pollTimeMs());
}); });
} }
@ -807,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}; };
} }
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(callback, metadata.fetch());
}
}
/** /**
* This function evaluates the regex that the consumer subscribed to * This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates * against the list of topic names from metadata, and updates
@ -814,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* *
* @param cluster Cluster from which we get the topics * @param cluster Cluster from which we get the topics
*/ */
private void updatePatternSubscription(Cluster cluster) { private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn("Group membership manager not present when processing a subscribe event");
return;
}
final Set<String> topicsToSubscribe = cluster.topics().stream() final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern) .filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) { if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
} }
// Join the group if not already part of it, or just send the updated subscription // Join the group if not already part of it, or just send the updated subscription
// to the broker on the next poll. Note that this is done even if no topics matched // to the broker on the next poll. Note that this is done even if no topics matched
// the regex, to ensure the member joins the group if needed (with empty subscription). // the regex, to ensure the member joins the group if needed (with empty subscription).
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); callback.onSubscriptionUpdated();
} }
// Visible for testing // Visible for testing
int metadataVersionSnapshot() { int metadataVersionSnapshot() {
return metadataVersionSnapshot; return metadataVersionSnapshot;
} }
private interface OnSubscriptionUpdatedCallback {
void onSubscriptionUpdated();
}
} }

View File

@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
} }
@Override @Override
public void completeExceptionallyWithMetadataError(Exception metadataException) { public void onMetadataError(Exception metadataError) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException)); completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
} }
@Override @Override

View File

@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
* or {@link Consumer#position(TopicPartition) position} process. * or {@link Consumer#position(TopicPartition) position} process.
*/ */
@Override @Override
public void completeExceptionallyWithMetadataError(Exception metadataException) { public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataException); future().completeExceptionally(metadataError);
} }
} }

View File

@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
} }
@Override @Override
public void completeExceptionallyWithMetadataError(Exception metadataException) { public void onMetadataError(Exception metadataError) {
future().completeExceptionally(metadataException); future().completeExceptionally(metadataError);
} }
@Override @Override

View File

@ -50,7 +50,7 @@ public interface MetadataErrorNotifiableEvent {
* </li> * </li>
* </ul> * </ul>
* *
* @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()} * @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
*/ */
void completeExceptionallyWithMetadataError(Exception metadataException); void onMetadataError(Exception metadataError);
} }