KAFKA-19356: Prevent new consumer fetch assigned partitions not in explicit subscription (#19983)
CI / build (push) Waiting to run Details

Fix to ensure assigned partitions whose topics are not in the consumer
explicit subscription are considered not fetchable (so that no records
are returned on poll for them)

This scenario could happen in the new async consumer (using the Consumer
rebalance protocol) when the subscription changes, because the consumer
will keep its assignment until the coordinator sends a new one (broker
drives assignments).

This does not happen in the classic consumer because the assignment
logic lives on the client-side, so the consumer pro-actively updates
assignment as needed.

This PR validates assignment vs subscription on fetch for explicit
subscription  only. Regular expressions, shared subscription  remain
unchanged (regex case still under discussion, will be handled separately
if needed)

Reviewers: Andrew Schofield <aschofield@confluent.io>, TengYao Chi
 <frankvicky@apache.org>, Kirk True <ktrue@confluent.io>, Jhen-Yung Hsu
 <jhenyunghsu@gmail.com>
This commit is contained in:
Lianet Magrans 2025-06-18 18:31:46 -04:00 committed by GitHub
parent b8fc96272e
commit ce996b34e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 100 additions and 4 deletions

View File

@ -155,7 +155,10 @@ public class FetchCollector<K, V> {
log.debug("Not returning fetched records for partition {} since it is no longer assigned", tp);
} else if (!subscriptions.isFetchable(tp)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's
// poll call or if the offset is being reset
// poll call or if the offset is being reset.
// It can also happen under the Consumer rebalance protocol, when the consumer changes its subscription.
// Until the consumer receives an updated assignment from the coordinator, it can hold assigned partitions
// that are not in the subscription anymore, so we make them not fetchable.
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", tp);
} else {
SubscriptionState.FetchPosition position = subscriptions.position(tp);

View File

@ -487,7 +487,7 @@ public class SubscriptionState {
List<TopicPartition> result = new ArrayList<>();
assignment.forEach((topicPartition, topicPartitionState) -> {
// Cheap check is first to avoid evaluating the predicate if possible
if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || topicPartitionState.isFetchable())
if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || isFetchableAndSubscribed(topicPartition, topicPartitionState))
&& isAvailable.test(topicPartition)) {
result.add(topicPartition);
}
@ -495,6 +495,19 @@ public class SubscriptionState {
return result;
}
/**
* Check if the partition is fetchable.
* If the consumer has explicitly subscribed to a list of topic names,
* this will also check that the partition is contained in the subscription.
*/
private synchronized boolean isFetchableAndSubscribed(TopicPartition topicPartition, TopicPartitionState topicPartitionState) {
if (subscriptionType.equals(SubscriptionType.AUTO_TOPICS) && !subscription.contains(topicPartition.topic())) {
log.trace("Assigned partition {} is not in the subscription {} so will be considered not fetchable.", topicPartition, subscription);
return false;
}
return topicPartitionState.isFetchable();
}
public synchronized boolean hasAutoAssignedPartitions() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE || this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
@ -879,8 +892,8 @@ public class SubscriptionState {
}
synchronized boolean isFetchable(TopicPartition tp) {
TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
return assignedOrNull != null && assignedOrNull.isFetchable();
TopicPartitionState tps = assignedStateOrNull(tp);
return tps != null && isFetchableAndSubscribed(tp, tps);
}
public synchronized boolean hasValidPosition(TopicPartition tp) {

View File

@ -36,9 +36,12 @@ import org.junit.jupiter.api.Test;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
@ -113,6 +116,54 @@ public class SubscriptionStateTest {
assertEquals(0, state.numAssignedPartitions());
}
@Test
public void testIsFetchableOnManualAssignment() {
state.assignFromUser(Set.of(tp0, tp1));
assertAssignedPartitionIsFetchable();
}
@Test
public void testIsFetchableOnAutoAssignment() {
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
state.assignFromSubscribed(Set.of(tp0, tp1));
assertAssignedPartitionIsFetchable();
}
private void assertAssignedPartitionIsFetchable() {
assertEquals(2, state.assignedPartitions().size());
assertTrue(state.assignedPartitions().contains(tp0));
assertTrue(state.assignedPartitions().contains(tp1));
assertFalse(state.isFetchable(tp0), "Should not be fetchable without a valid position");
assertFalse(state.isFetchable(tp1), "Should not be fetchable without a valid position");
state.seek(tp0, 1);
state.seek(tp1, 1);
assertTrue(state.isFetchable(tp0));
assertTrue(state.isFetchable(tp1));
}
@Test
public void testIsFetchableConsidersExplicitTopicSubscription() {
state.subscribe(Set.of(topic1), Optional.of(rebalanceListener));
state.assignFromSubscribed(Set.of(t1p0));
state.seek(t1p0, 1);
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertTrue(state.isFetchable(t1p0));
// Change subscription. Assigned partitions should remain unchanged but not fetchable.
state.subscribe(Set.of(topic), Optional.of(rebalanceListener));
assertEquals(Set.of(t1p0), state.assignedPartitions());
assertFalse(state.isFetchable(t1p0), "Assigned partitions not in the subscription should not be fetchable");
// Unsubscribe. Assigned partitions should be cleared and not fetchable.
state.unsubscribe();
assertTrue(state.assignedPartitions().isEmpty());
assertFalse(state.isFetchable(t1p0));
}
@Test
public void testGroupSubscribe() {
state.subscribe(singleton(topic1), Optional.of(rebalanceListener));
@ -1071,4 +1122,33 @@ public class SubscriptionStateTest {
assertThrows(IllegalStateException.class, () -> state.isOffsetResetNeeded(unassignedPartition));
}
// This test ensures the "fetchablePartitions" does not run the custom predicate if the partition is not fetchable
// This func is used in the hot path for fetching, to find fetchable partitions that are not in the buffer,
// so it should avoid evaluating the predicate if not needed.
@Test
public void testFetchablePartitionsPerformsCheapChecksFirst() {
// Setup fetchable partition and pause it
state.assignFromUser(Set.of(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
state.pause(tp0);
// Retrieve fetchable partitions with custom predicate.
AtomicBoolean predicateEvaluated = new AtomicBoolean(false);
Predicate<TopicPartition> isBuffered = tp -> {
predicateEvaluated.set(true);
return true;
};
List<TopicPartition> fetchablePartitions = state.fetchablePartitions(isBuffered);
assertTrue(fetchablePartitions.isEmpty());
assertFalse(predicateEvaluated.get(), "Custom predicate should not be evaluated when partitions are not fetchable");
// Resume partition and retrieve fetchable again
state.resume(tp0);
predicateEvaluated.set(false);
fetchablePartitions = state.fetchablePartitions(isBuffered);
assertTrue(predicateEvaluated.get());
assertEquals(tp0, fetchablePartitions.get(0));
}
}