mirror of https://github.com/apache/kafka.git
KAFKA-16107 [2/N]: Fix inverted args & more tests (#15285)
Small fix for argument order in onPartitionsAssigned callback, and more tests for disabling/enabling partitions while callback runs. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
7bc4afee11
commit
8da6508966
|
@ -1130,8 +1130,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
||||||
|
|
||||||
// Invoke user call back.
|
// Invoke user call back.
|
||||||
CompletableFuture<Void> result = invokeOnPartitionsAssignedCallback(addedPartitions);
|
CompletableFuture<Void> result = invokeOnPartitionsAssignedCallback(addedPartitions);
|
||||||
result.whenComplete((error, callbackResult) -> {
|
result.whenComplete((__, exception) -> {
|
||||||
if (error == null) {
|
if (exception == null) {
|
||||||
// Enable newly added partitions to start fetching and updating positions for them.
|
// Enable newly added partitions to start fetching and updating positions for them.
|
||||||
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
|
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1141,7 +1141,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
||||||
if (!addedPartitions.isEmpty()) {
|
if (!addedPartitions.isEmpty()) {
|
||||||
log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not " +
|
log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not " +
|
||||||
"requiring initializing positions after onPartitionsAssigned callback failed.",
|
"requiring initializing positions after onPartitionsAssigned callback failed.",
|
||||||
addedPartitions, error);
|
addedPartitions, exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -1383,6 +1383,61 @@ public class MembershipManagerImplTest {
|
||||||
assertEquals(0, listener.lostCount());
|
assertEquals(0, listener.lostCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddedPartitionsTemporarilyDisabledAwaitingOnPartitionsAssignedCallback() {
|
||||||
|
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
|
||||||
|
String topicName = "topic1";
|
||||||
|
ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
|
||||||
|
int partitionOwned = 0;
|
||||||
|
int partitionAdded = 1;
|
||||||
|
SortedSet<TopicPartition> assignedPartitions = topicPartitions(topicName, partitionOwned,
|
||||||
|
partitionAdded);
|
||||||
|
SortedSet<TopicPartition> addedPartitions = topicPartitions(topicName, partitionAdded);
|
||||||
|
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, partitionAdded,
|
||||||
|
new CounterConsumerRebalanceListener(), membershipManager);
|
||||||
|
|
||||||
|
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions);
|
||||||
|
|
||||||
|
performCallback(
|
||||||
|
membershipManager,
|
||||||
|
invoker,
|
||||||
|
ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
|
||||||
|
addedPartitions,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
verify(subscriptionState).enablePartitionsAwaitingCallback(addedPartitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddedPartitionsNotEnabledAfterFailedOnPartitionsAssignedCallback() {
|
||||||
|
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
|
||||||
|
String topicName = "topic1";
|
||||||
|
ConsumerRebalanceListenerInvoker invoker = consumerRebalanceListenerInvoker();
|
||||||
|
int partitionOwned = 0;
|
||||||
|
int partitionAdded = 1;
|
||||||
|
SortedSet<TopicPartition> assignedPartitions = topicPartitions(topicName, partitionOwned,
|
||||||
|
partitionAdded);
|
||||||
|
SortedSet<TopicPartition> addedPartitions = topicPartitions(topicName, partitionAdded);
|
||||||
|
CounterConsumerRebalanceListener listener =
|
||||||
|
new CounterConsumerRebalanceListener(Optional.empty(),
|
||||||
|
Optional.of(new RuntimeException("onPartitionsAssigned failed!")),
|
||||||
|
Optional.empty());
|
||||||
|
mockPartitionOwnedAndNewPartitionAdded(topicName, partitionOwned, partitionAdded,
|
||||||
|
listener, membershipManager);
|
||||||
|
|
||||||
|
verify(subscriptionState).assignFromSubscribedAwaitingCallback(assignedPartitions, addedPartitions);
|
||||||
|
|
||||||
|
performCallback(
|
||||||
|
membershipManager,
|
||||||
|
invoker,
|
||||||
|
ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED,
|
||||||
|
addedPartitions,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
verify(subscriptionState, never()).enablePartitionsAwaitingCallback(any());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOnPartitionsLostNoError() {
|
public void testOnPartitionsLostNoError() {
|
||||||
MembershipManagerImpl membershipManager = createMemberInStableState();
|
MembershipManagerImpl membershipManager = createMemberInStableState();
|
||||||
|
@ -1401,6 +1456,23 @@ public class MembershipManagerImplTest {
|
||||||
testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test")));
|
testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void mockPartitionOwnedAndNewPartitionAdded(String topicName,
|
||||||
|
int partitionOwned,
|
||||||
|
int partitionAdded,
|
||||||
|
CounterConsumerRebalanceListener listener,
|
||||||
|
MembershipManagerImpl membershipManager) {
|
||||||
|
Uuid topicId = Uuid.randomUuid();
|
||||||
|
TopicPartition owned = new TopicPartition(topicName, partitionOwned);
|
||||||
|
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(owned));
|
||||||
|
membershipManager.updateCurrentAssignment(Collections.singleton(new TopicIdPartition(topicId, owned)));
|
||||||
|
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
|
||||||
|
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
|
||||||
|
when(subscriptionState.rebalanceListener()).thenReturn(Optional.ofNullable(listener));
|
||||||
|
|
||||||
|
// Receive assignment adding a new partition
|
||||||
|
receiveAssignment(topicId, Arrays.asList(partitionOwned, partitionAdded), membershipManager);
|
||||||
|
}
|
||||||
|
|
||||||
private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
|
private void testOnPartitionsLost(Optional<RuntimeException> lostError) {
|
||||||
// Step 1: set up mocks
|
// Step 1: set up mocks
|
||||||
MembershipManagerImpl membershipManager = createMemberInStableState();
|
MembershipManagerImpl membershipManager = createMemberInStableState();
|
||||||
|
|
Loading…
Reference in New Issue