mirror of https://github.com/apache/kafka.git
Compare commits
5 Commits
e7b53865fd
...
f660b15e52
| Author | SHA1 | Date |
|---|---|---|
|
|
f660b15e52 | |
|
|
5215030a4c | |
|
|
68f5bc19d3 | |
|
|
2e5a982c1d | |
|
|
4985c7de17 |
|
|
@ -43,7 +43,6 @@ import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static java.util.Collections.unmodifiableList;
|
import static java.util.Collections.unmodifiableList;
|
||||||
|
|
||||||
|
|
@ -488,16 +487,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
|
||||||
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
|
* active call to {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
|
||||||
*/
|
*/
|
||||||
public void onConsumerPoll() {
|
public void onConsumerPoll() {
|
||||||
if (subscriptions.hasPatternSubscription()) {
|
|
||||||
final Set<String> topicsToSubscribe = metadata.fetch().topics().stream()
|
|
||||||
.filter(subscriptions::matchesSubscribedPattern)
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
|
||||||
metadata.requestUpdateForNewTopics();
|
|
||||||
}
|
|
||||||
subscriptionUpdated.compareAndSet(false, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
|
if (subscriptionUpdated.compareAndSet(true, false) && state == MemberState.UNSUBSCRIBED) {
|
||||||
transitionToJoining();
|
transitionToJoining();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1827,7 +1827,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
return fetch;
|
return fetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.trace("Polling for fetches with timeout {} and {}", pollTimeout, timer.remainingMs());
|
// We do not want to be stuck blocking in poll if we are missing some positions
|
||||||
|
// since the offset lookup may be backing off after a failure
|
||||||
|
if (pollTimeout > retryBackoffMs) {
|
||||||
|
pollTimeout = retryBackoffMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.trace("Polling for fetches with timeout {}", pollTimeout);
|
||||||
|
|
||||||
Timer pollTimer = time.timer(pollTimeout);
|
Timer pollTimer = time.timer(pollTimeout);
|
||||||
wakeupTrigger.setFetchAction(fetchBuffer);
|
wakeupTrigger.setFetchAction(fetchBuffer);
|
||||||
|
|
|
||||||
|
|
@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
||||||
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
||||||
// so metadata errors need to be checked and handled right away.
|
// so metadata errors need to be checked and handled right away.
|
||||||
if (event instanceof MetadataErrorNotifiableEvent) {
|
if (event instanceof MetadataErrorNotifiableEvent) {
|
||||||
if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event))
|
if (maybeFailOnMetadataError(List.of(event)))
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
applicationEventProcessor.process(event);
|
applicationEventProcessor.process(event);
|
||||||
|
|
@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
||||||
/**
|
/**
|
||||||
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
|
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
|
||||||
*/
|
*/
|
||||||
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
|
private boolean maybeFailOnMetadataError(List<?> events) {
|
||||||
List<MetadataErrorNotifiableEvent> notifiables = new ArrayList<>();
|
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
|
||||||
|
|
||||||
for (CompletableEvent<?> ce : events) {
|
for (Object obj : events) {
|
||||||
if (ce instanceof MetadataErrorNotifiableEvent) {
|
if (obj instanceof MetadataErrorNotifiableEvent) {
|
||||||
notifiables.add((MetadataErrorNotifiableEvent) ce);
|
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (notifiables.isEmpty())
|
// Don't get-and-clear the metadata error if there are no events that will be notified.
|
||||||
return;
|
if (filteredEvents.isEmpty())
|
||||||
|
|
||||||
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
|
|
||||||
|
|
||||||
if (metadataErrorOpt.isEmpty())
|
|
||||||
return;
|
|
||||||
|
|
||||||
Exception metadataError = metadataErrorOpt.get();
|
|
||||||
notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If there is a metadata error, complete this event exceptionally.
|
|
||||||
*/
|
|
||||||
private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) {
|
|
||||||
Optional<Exception> metadataErrorOpt = networkClientDelegate.getAndClearMetadataError();
|
|
||||||
|
|
||||||
if (metadataErrorOpt.isEmpty())
|
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
Exception metadataError = metadataErrorOpt.get();
|
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
|
||||||
notifiable.completeExceptionallyWithMetadataError(metadataError);
|
|
||||||
return true;
|
if (andClearMetadataError.isPresent()) {
|
||||||
|
Exception metadataError = andClearMetadataError.get();
|
||||||
|
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
public void onMetadataError(Exception metadataError) {
|
||||||
future().completeExceptionally(metadataException);
|
future().completeExceptionally(metadataError);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,12 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
|
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
|
||||||
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
|
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
|
||||||
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
|
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
|
||||||
|
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
|
||||||
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
|
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
|
||||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
||||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||||
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
|
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
|
||||||
import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
||||||
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
|
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
|
||||||
|
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
|
||||||
|
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
|
||||||
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
||||||
import org.apache.kafka.common.Cluster;
|
import org.apache.kafka.common.Cluster;
|
||||||
import org.apache.kafka.common.IsolationLevel;
|
import org.apache.kafka.common.IsolationLevel;
|
||||||
|
|
@ -228,7 +231,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||||
consumerMembershipManager.maybeReconcile(true));
|
consumerMembershipManager.maybeReconcile(true));
|
||||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||||
hrm.membershipManager().onConsumerPoll();
|
ShareMembershipManager membershipManager = hrm.membershipManager();
|
||||||
|
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||||
|
membershipManager.onConsumerPoll();
|
||||||
hrm.resetPollTimer(event.pollTimeMs());
|
hrm.resetPollTimer(event.pollTimeMs());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -337,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
if (subscriptions.subscribe(event.topics(), event.listener())) {
|
if (subscriptions.subscribe(event.topics(), event.listener())) {
|
||||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||||
}
|
}
|
||||||
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
|
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
||||||
event.future().complete(null);
|
event.future().complete(null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
event.future().completeExceptionally(e);
|
event.future().completeExceptionally(e);
|
||||||
|
|
@ -360,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
try {
|
try {
|
||||||
subscriptions.subscribe(event.pattern(), event.listener());
|
subscriptions.subscribe(event.pattern(), event.listener());
|
||||||
metadata.requestUpdateForNewTopics();
|
metadata.requestUpdateForNewTopics();
|
||||||
updatePatternSubscription(metadata.fetch());
|
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||||
|
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||||
|
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
|
||||||
|
});
|
||||||
event.future().complete(null);
|
event.future().complete(null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
event.future().completeExceptionally(e);
|
event.future().completeExceptionally(e);
|
||||||
|
|
@ -394,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
* This will make the consumer send the updated subscription on the next poll.
|
* This will make the consumer send the updated subscription on the next poll.
|
||||||
*/
|
*/
|
||||||
private void process(final UpdatePatternSubscriptionEvent event) {
|
private void process(final UpdatePatternSubscriptionEvent event) {
|
||||||
if (!subscriptions.hasPatternSubscription()) {
|
requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
|
||||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
|
||||||
updatePatternSubscription(metadata.fetch());
|
|
||||||
}
|
|
||||||
event.future().complete(null);
|
event.future().complete(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -724,11 +726,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
||||||
|
|
||||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||||
hrm.membershipManager().onConsumerPoll();
|
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||||
|
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||||
|
membershipManager.onConsumerPoll();
|
||||||
hrm.resetPollTimer(event.pollTimeMs());
|
hrm.resetPollTimer(event.pollTimeMs());
|
||||||
});
|
});
|
||||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||||
hrm.membershipManager().onConsumerPoll();
|
StreamsMembershipManager membershipManager = hrm.membershipManager();
|
||||||
|
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||||
|
membershipManager.onConsumerPoll();
|
||||||
hrm.resetPollTimer(event.pollTimeMs());
|
hrm.resetPollTimer(event.pollTimeMs());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -807,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
|
||||||
|
if (!subscriptions.hasPatternSubscription()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
||||||
|
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||||
|
updatePatternSubscription(callback, metadata.fetch());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function evaluates the regex that the consumer subscribed to
|
* This function evaluates the regex that the consumer subscribed to
|
||||||
* against the list of topic names from metadata, and updates
|
* against the list of topic names from metadata, and updates
|
||||||
|
|
@ -814,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
*
|
*
|
||||||
* @param cluster Cluster from which we get the topics
|
* @param cluster Cluster from which we get the topics
|
||||||
*/
|
*/
|
||||||
private void updatePatternSubscription(Cluster cluster) {
|
private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
|
||||||
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
|
|
||||||
log.warn("Group membership manager not present when processing a subscribe event");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final Set<String> topicsToSubscribe = cluster.topics().stream()
|
final Set<String> topicsToSubscribe = cluster.topics().stream()
|
||||||
.filter(subscriptions::matchesSubscribedPattern)
|
.filter(subscriptions::matchesSubscribedPattern)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
||||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||||
|
|
||||||
}
|
}
|
||||||
// Join the group if not already part of it, or just send the updated subscription
|
// Join the group if not already part of it, or just send the updated subscription
|
||||||
// to the broker on the next poll. Note that this is done even if no topics matched
|
// to the broker on the next poll. Note that this is done even if no topics matched
|
||||||
// the regex, to ensure the member joins the group if needed (with empty subscription).
|
// the regex, to ensure the member joins the group if needed (with empty subscription).
|
||||||
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
callback.onSubscriptionUpdated();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
int metadataVersionSnapshot() {
|
int metadataVersionSnapshot() {
|
||||||
return metadataVersionSnapshot;
|
return metadataVersionSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private interface OnSubscriptionUpdatedCallback {
|
||||||
|
|
||||||
|
void onSubscriptionUpdated();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
public void onMetadataError(Exception metadataError) {
|
||||||
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
|
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
|
||||||
* or {@link Consumer#position(TopicPartition) position} process.
|
* or {@link Consumer#position(TopicPartition) position} process.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
public void onMetadataError(Exception metadataError) {
|
||||||
future().completeExceptionally(metadataException);
|
future().completeExceptionally(metadataError);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completeExceptionallyWithMetadataError(Exception metadataException) {
|
public void onMetadataError(Exception metadataError) {
|
||||||
future().completeExceptionally(metadataException);
|
future().completeExceptionally(metadataError);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ public interface MetadataErrorNotifiableEvent {
|
||||||
* </li>
|
* </li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()}
|
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
|
||||||
*/
|
*/
|
||||||
void completeExceptionallyWithMetadataError(Exception metadataException);
|
void onMetadataError(Exception metadataError);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue