KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs (#15856)

Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.

Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.

The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Lianet Magrans 2024-05-07 10:40:00 +02:00 committed by GitHub
parent 459eaec666
commit ea485a7061
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 49 deletions

View File

@ -76,6 +76,7 @@ import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
@ -1632,7 +1633,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
* Set the fetch position to the committed position (if there is one)
* or reset it using the offset reset policy the user has configured.
*
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws AuthenticationException If authentication fails. See the exception for more details
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no offset reset policy is
* defined
* @return true iff the operation completed without timing out
@ -1659,12 +1660,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// If there are partitions still needing a position and a reset policy is defined,
// request reset using the default policy. If no reset strategy is defined and there
// are partitions with a missing position, then we will raise a NoOffsetForPartitionException exception.
//
// Note: this will *not* initialize the position for any partitions that are in the process
// of being assigned and awaiting ConsumerRebalanceListener callbacks. We don't want to reset
// positions until the partition has been fully assigned *and* we want to wait until
// initWithCommittedOffsetsIfNeeded has had a chance to look up the partition's committed
// offset, if applicable.
subscriptions.resetInitializingPositions();
// Reset positions using partition offsets retrieved from the leader, for any partitions

View File

@ -1100,13 +1100,14 @@ public class SubscriptionState {
}
/**
* Only partitions that are {@link FetchStates#INITIALIZING initializing} <em>and not</em>
* {@link #pendingOnAssignedCallback pending} the completion of the
* {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection) onPartitionsAssigned} callback
* should be considered as initialize-able.
* True if the partition is in {@link FetchStates#INITIALIZING} state. While in this
* state, a position for the partition can be retrieved (based on committed offsets or
* partitions offsets).
* Note that retrieving a position does not mean that we can start fetching from the
* partition (see {@link #isFetchable()})
*/
private boolean shouldInitialize() {
return fetchState.equals(FetchStates.INITIALIZING) && !pendingOnAssignedCallback;
return fetchState.equals(FetchStates.INITIALIZING);
}
private boolean isFetchable() {

View File

@ -318,7 +318,7 @@ public class SubscriptionStateTest {
state.assignFromSubscribedAwaitingCallback(Utils.mkSet(tp0, tp1), singleton(tp1));
assertTrue(state.isFetchable(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(0, state.initializingPartitions().size());
assertEquals(1, state.initializingPartitions().size());
// Callback completed. Added partition be initializing positions and become fetchable when it gets one.
state.enablePartitionsAwaitingCallback(singleton(tp1));
@ -334,7 +334,7 @@ public class SubscriptionStateTest {
assertEquals(singleton(topicPartition.topic()), state.subscription());
assertFalse(state.isFetchable(topicPartition));
assertEquals(0, state.initializingPartitions().size());
assertEquals(1, state.initializingPartitions().size());
assertFalse(state.isPaused(topicPartition));
}
@ -967,39 +967,4 @@ public class SubscriptionStateTest {
assertThrows(IllegalStateException.class, () -> state.isOffsetResetNeeded(unassignedPartition));
}
/**
* This test checks that we will not attempt to prematurely reset position of partitions that are pending.
*
* See KAFKA-16556.
*/
@Test
public void testPendingPartitionsDoNotResetPositions() {
Optional<ConsumerRebalanceListener> listener = Optional.of(new CounterConsumerRebalanceListener());
Set<String> topics = Collections.singleton(topic);
Collection<TopicPartition> assignedPartitions = Collections.singleton(tp0);
// User subscribes to a topic and the group coordinator assigns a partitions to our consumer.
state.subscribe(topics, listener);
state.assignFromSubscribedAwaitingCallback(assignedPartitions, assignedPartitions);
// The logic in initializingPartitions will filter out the pending partition and it will
// not be considered fetchable.
assertFalse(state.initializingPartitions().contains(tp0));
assertFalse(state.isFetchable(tp0));
assertFalse(state.hasAllFetchPositions());
// Let's pretend this code is being executed by the Consumer.poll() code on the application thread.
assertFalse(state.isOffsetResetNeeded(tp0));
state.resetInitializingPositions();
assertFalse(state.isOffsetResetNeeded(tp0));
// Shortly after, on the next loop of the poll() method, we complete the callback.
state.enablePartitionsAwaitingCallback(Collections.singleton(tp0));
// THEN, we can reset the partition (if needed).
assertFalse(state.isOffsetResetNeeded(tp0));
state.resetInitializingPositions();
assertTrue(state.isOffsetResetNeeded(tp0));
}
}

View File

@ -907,4 +907,28 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100)))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback(quorum: String,
groupProtocol: String): Unit = {
val consumer = createConsumer()
val listener = new TestConsumerReassignmentListener {
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
super.onPartitionsRevoked(partitions)
}
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
super.onPartitionsAssigned(partitions)
partitions.forEach(tp => {
assertDoesNotThrow(() => consumer.position(tp))
})
}
}
consumer.subscribe(List(topic).asJava, listener)
awaitRebalance(consumer, listener)
assertEquals(1, listener.callsToAssigned)
assertEquals(0, listener.callsToRevoked)
}
}