mirror of https://github.com/apache/kafka.git
Compare commits
5 Commits
e7b53865fd
...
f660b15e52
Author | SHA1 | Date |
---|---|---|
|
f660b15e52 | |
|
5215030a4c | |
|
68f5bc19d3 | |
|
2e5a982c1d | |
|
4985c7de17 |
|
@ -43,7 +43,6 @@ import java.util.SortedSet;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
|
@ -488,16 +487,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
|
|||
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
|
||||
*/
|
||||
public void onConsumerPoll() {
|
||||
if (subscriptions.hasPatternSubscription()) {
|
||||
final Set<String> topicsToSubscribe = metadata.fetch().topics().stream()
|
||||
.filter(subscriptions::matchesSubscribedPattern)
|
||||
.collect(Collectors.toSet());
|
||||
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
||||
metadata.requestUpdateForNewTopics();
|
||||
}
|
||||
subscriptionUpdated.compareAndSet(false, true);
|
||||
}
|
||||
|
||||
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
|
||||
transitionToJoining();
|
||||
}
|
||||
|
|
|
@ -1827,7 +1827,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return fetch;
|
||||
}
|
||||
|
||||
log.trace("Polling for fetches with timeout {} and {}", pollTimeout, timer.remainingMs());
|
||||
// We do not want to be stuck blocking in poll if we are missing some positions
|
||||
// since the offset lookup may be backing off after a failure
|
||||
if (pollTimeout > retryBackoffMs) {
|
||||
pollTimeout = retryBackoffMs;
|
||||
}
|
||||
|
||||
log.trace("Polling for fetches with timeout {}", pollTimeout);
|
||||
|
||||
Timer pollTimer = time.timer(pollTimeout);
|
||||
wakeupTrigger.setFetchAction(fetchBuffer);
|
||||
|
|
|
@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
||||
// so metadata errors need to be checked and handled right away.
|
||||
if (event instanceof MetadataErrorNotifiableEvent) {
|
||||
if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event))
|
||||
if (maybeFailOnMetadataError(List.of(event)))
|
||||
continue;
|
||||
}
|
||||
applicationEventProcessor.process(event);
|
||||
|
@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
/**
|
||||
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
|
||||
*/
|
||||
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
|
||||
List<MetadataErrorNotifiableEvent> notifiables = new ArrayList<>();
|
||||
private boolean maybeFailOnMetadataError(List<?> events) {
|
||||
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
|
||||
|
||||
for (CompletableEvent<?> ce : events) {
|
||||
if (ce instanceof MetadataErrorNotifiableEvent) {
|
||||
notifiables.add((MetadataErrorNotifiableEvent) ce);
|
||||
for (Object obj : events) {
|
||||
if (obj instanceof MetadataErrorNotifiableEvent) {
|
||||
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
|
||||
}
|
||||
}
|
||||
|
||||
if (notifiables.isEmpty())
|
||||
return;
|
||||
|
||||
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
|
||||
|
||||
if (metadataErrorOpt.isEmpty())
|
||||
return;
|
||||
|
||||
Exception metadataError = metadataErrorOpt.get();
|
||||
notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError));
|
||||
}
|
||||
|
||||
/**
|
||||
* If there is a metadata error, complete this event exceptionally.
|
||||
*/
|
||||
private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) {
|
||||
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
|
||||
|
||||
if (metadataErrorOpt.isEmpty())
|
||||
// Don't get-and-clear the metadata error if there are no events that will be notified.
|
||||
if (filteredEvents.isEmpty())
|
||||
return false;
|
||||
|
||||
Exception metadataError = metadataErrorOpt.get();
|
||||
notifiable.completeExceptionallyWithMetadataError(metadataError);
|
||||
return true;
|
||||
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
|
||||
|
||||
if (andClearMetadataError.isPresent()) {
|
||||
Exception metadataError = andClearMetadataError.get();
|
||||
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
|
|||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
||||
future().completeExceptionally(metadataException);
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,12 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|||
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
|
||||
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
|
||||
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
|
||||
import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
||||
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
import org.apache.kafka.common.IsolationLevel;
|
||||
|
@ -228,7 +231,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||
consumerMembershipManager.maybeReconcile(true));
|
||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
ShareMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
@ -337,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
if (subscriptions.subscribe(event.topics(), event.listener())) {
|
||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||
}
|
||||
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
||||
event.future().complete(null);
|
||||
} catch (Exception e) {
|
||||
event.future().completeExceptionally(e);
|
||||
|
@ -360,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
try {
|
||||
subscriptions.subscribe(event.pattern(), event.listener());
|
||||
metadata.requestUpdateForNewTopics();
|
||||
updatePatternSubscription(metadata.fetch());
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
|
||||
});
|
||||
event.future().complete(null);
|
||||
} catch (Exception e) {
|
||||
event.future().completeExceptionally(e);
|
||||
|
@ -394,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
* This will make the consumer send the updated subscription on the next poll.
|
||||
*/
|
||||
private void process(final UpdatePatternSubscriptionEvent event) {
|
||||
if (!subscriptions.hasPatternSubscription()) {
|
||||
return;
|
||||
}
|
||||
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||
updatePatternSubscription(metadata.fetch());
|
||||
}
|
||||
requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
|
||||
event.future().complete(null);
|
||||
}
|
||||
|
||||
|
@ -724,11 +726,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
||||
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
StreamsMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
@ -807,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
};
|
||||
}
|
||||
|
||||
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
|
||||
if (!subscriptions.hasPatternSubscription()) {
|
||||
return;
|
||||
}
|
||||
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||
updatePatternSubscription(callback, metadata.fetch());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function evaluates the regex that the consumer subscribed to
|
||||
* against the list of topic names from metadata, and updates
|
||||
|
@ -814,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
*
|
||||
* @param cluster Cluster from which we get the topics
|
||||
*/
|
||||
private void updatePatternSubscription(Cluster cluster) {
|
||||
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
|
||||
log.warn("Group membership manager not present when processing a subscribe event");
|
||||
return;
|
||||
}
|
||||
private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
|
||||
final Set<String> topicsToSubscribe = cluster.topics().stream()
|
||||
.filter(subscriptions::matchesSubscribedPattern)
|
||||
.collect(Collectors.toSet());
|
||||
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||
|
||||
}
|
||||
// Join the group if not already part of it, or just send the updated subscription
|
||||
// to the broker on the next poll. Note that this is done even if no topics matched
|
||||
// the regex, to ensure the member joins the group if needed (with empty subscription).
|
||||
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
||||
callback.onSubscriptionUpdated();
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
int metadataVersionSnapshot() {
|
||||
return metadataVersionSnapshot;
|
||||
}
|
||||
|
||||
private interface OnSubscriptionUpdatedCallback {
|
||||
|
||||
void onSubscriptionUpdated();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
|
|||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
||||
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
|
|||
* or {@link Consumer#position(TopicPartition) position} process.
|
||||
*/
|
||||
@Override
|
||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
||||
future().completeExceptionally(metadataException);
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
}
|
|
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
|
|||
}
|
||||
|
||||
@Override
|
||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
||||
future().completeExceptionally(metadataException);
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -50,7 +50,7 @@ public interface MetadataErrorNotifiableEvent {
|
|||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()}
|
||||
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
|
||||
*/
|
||||
void completeExceptionallyWithMetadataError(Exception metadataException);
|
||||
void onMetadataError(Exception metadataError);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue