mirror of https://github.com/apache/kafka.git
KAFKA-17317; Validate and maybe trigger downgrade after static member replacement (#17306)
This implementation doesn't change the existing downgrade path. In `classicGroupJoinToConsumerGroup`, if the group should be downgraded, it will be converted to a classic group at the end of the method. The returned records will be the records from GroupJoin plus the records from conversion. No rebalance will be triggered in the newly converted group. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
18a584c90e
commit
366aa1014c
|
@ -1043,12 +1043,12 @@ public class GroupMetadataManager {
|
|||
/**
|
||||
* Validates the online downgrade if a consumer member is fenced from the consumer group.
|
||||
*
|
||||
* @param consumerGroup The ConsumerGroup.
|
||||
* @param memberId The fenced member id.
|
||||
* @param consumerGroup The ConsumerGroup.
|
||||
* @param fencedMemberId The fenced member id.
|
||||
* @return A boolean indicating whether it's valid to online downgrade the consumer group.
|
||||
*/
|
||||
private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) {
|
||||
if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
|
||||
private boolean validateOnlineDowngradeWithFencedMember(ConsumerGroup consumerGroup, String fencedMemberId) {
|
||||
if (!consumerGroup.allMembersUseClassicProtocolExcept(fencedMemberId)) {
|
||||
return false;
|
||||
} else if (consumerGroup.numMembers() <= 1) {
|
||||
log.debug("Skip downgrading the consumer group {} to classic group because it's empty.",
|
||||
|
@ -1066,27 +1066,59 @@ public class GroupMetadataManager {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates whether the group id is eligible for an online downgrade if an existing
|
||||
* static member is replaced by another new one uses the classic protocol.
|
||||
*
|
||||
* @param consumerGroup The group to downgrade.
|
||||
* @param replacedMemberId The replaced member id.
|
||||
*
|
||||
* @return A boolean indicating whether it's valid to online downgrade the consumer group.
|
||||
*/
|
||||
private boolean validateOnlineDowngradeWithReplacedMemberId(
|
||||
ConsumerGroup consumerGroup,
|
||||
String replacedMemberId
|
||||
) {
|
||||
if (!consumerGroup.allMembersUseClassicProtocolExcept(replacedMemberId)) {
|
||||
return false;
|
||||
} else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
|
||||
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
|
||||
consumerGroup.groupId());
|
||||
return false;
|
||||
} else if (consumerGroup.numMembers() > classicGroupMaxSize) {
|
||||
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
|
||||
consumerGroup.groupId());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
|
||||
*
|
||||
* @param consumerGroup The converted ConsumerGroup.
|
||||
* @param leavingMemberId The leaving member that triggers the downgrade validation.
|
||||
* @param response The response of the returned CoordinatorResult.
|
||||
* @return A CoordinatorResult.
|
||||
* @param joiningMember The newly joined member if the downgrade is triggered by static member replacement.
|
||||
* @param records The record list to which the conversion records are added.
|
||||
*/
|
||||
private <T> CoordinatorResult<T, CoordinatorRecord> convertToClassicGroup(
|
||||
private void convertToClassicGroup(
|
||||
ConsumerGroup consumerGroup,
|
||||
String leavingMemberId,
|
||||
T response
|
||||
ConsumerGroupMember joiningMember,
|
||||
List<CoordinatorRecord> records
|
||||
) {
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
consumerGroup.createGroupTombstoneRecords(records);
|
||||
if (joiningMember == null) {
|
||||
consumerGroup.createGroupTombstoneRecords(records);
|
||||
} else {
|
||||
consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, leavingMemberId, joiningMember.memberId());
|
||||
}
|
||||
|
||||
ClassicGroup classicGroup;
|
||||
try {
|
||||
classicGroup = ClassicGroup.fromConsumerGroup(
|
||||
consumerGroup,
|
||||
leavingMemberId,
|
||||
joiningMember,
|
||||
logContext,
|
||||
time,
|
||||
metadataImage
|
||||
|
@ -1102,14 +1134,15 @@ public class GroupMetadataManager {
|
|||
|
||||
// Directly update the states instead of replaying the records because
|
||||
// the classicGroup reference is needed for triggering the rebalance.
|
||||
// Set the appendFuture to prevent the records from being replayed.
|
||||
removeGroup(consumerGroup.groupId());
|
||||
groups.put(consumerGroup.groupId(), classicGroup);
|
||||
|
||||
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
|
||||
|
||||
return new CoordinatorResult<>(records, response, null, false);
|
||||
// If the downgrade is triggered by a member leaving the group, a rebalance should be triggered.
|
||||
if (joiningMember == null) {
|
||||
prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2028,6 +2061,20 @@ public class GroupMetadataManager {
|
|||
records
|
||||
);
|
||||
|
||||
// 4. Maybe downgrade the consumer group if the last static member using the
|
||||
// consumer protocol is replaced by the joining static member.
|
||||
String existingStaticMemberIdOrNull = group.staticMemberId(request.groupInstanceId());
|
||||
boolean downgrade = existingStaticMemberIdOrNull != null &&
|
||||
validateOnlineDowngradeWithReplacedMemberId(group, existingStaticMemberIdOrNull);
|
||||
if (downgrade) {
|
||||
convertToClassicGroup(
|
||||
group,
|
||||
existingStaticMemberIdOrNull,
|
||||
updatedMember,
|
||||
records
|
||||
);
|
||||
}
|
||||
|
||||
final JoinGroupResponseData response = new JoinGroupResponseData()
|
||||
.setMemberId(updatedMember.memberId())
|
||||
.setGenerationId(updatedMember.memberEpoch())
|
||||
|
@ -2038,15 +2085,22 @@ public class GroupMetadataManager {
|
|||
appendFuture.whenComplete((__, t) -> {
|
||||
if (t == null) {
|
||||
cancelConsumerGroupJoinTimeout(groupId, response.memberId());
|
||||
scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs);
|
||||
// The sync timeout ensures that the member send sync request within the rebalance timeout.
|
||||
scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
|
||||
|
||||
if (!downgrade) {
|
||||
// If the group is still a consumer group, schedule the session
|
||||
// timeout for the joining member and the sync timeout to ensure
|
||||
// that the member send sync request within the rebalance timeout.
|
||||
scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs);
|
||||
scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
|
||||
}
|
||||
responseFuture.complete(response);
|
||||
}
|
||||
});
|
||||
|
||||
return new CoordinatorResult<>(records, null, appendFuture, true);
|
||||
// If the joining member triggers a valid downgrade, the soft states will be directly
|
||||
// updated in the conversion method, so the records don't need to be replayed.
|
||||
// If the joining member doesn't trigger a valid downgrade, the group is still a
|
||||
// consumer group. We still rely on replaying records to update the soft states.
|
||||
return new CoordinatorResult<>(records, null, appendFuture, !downgrade);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2728,10 +2782,11 @@ public class GroupMetadataManager {
|
|||
ConsumerGroupMember member,
|
||||
T response
|
||||
) {
|
||||
if (validateOnlineDowngrade(group, member.memberId())) {
|
||||
return convertToClassicGroup(group, member.memberId(), response);
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
if (validateOnlineDowngradeWithFencedMember(group, member.memberId())) {
|
||||
convertToClassicGroup(group, member.memberId(), null, records);
|
||||
return new CoordinatorResult<>(records, response, null, false);
|
||||
} else {
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
removeMember(records, group.groupId(), member.memberId());
|
||||
|
||||
// We update the subscription metadata without the leaving member.
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
|
|||
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
|
||||
|
@ -1357,6 +1358,7 @@ public class ClassicGroup implements Group {
|
|||
*
|
||||
* @param consumerGroup The converted ConsumerGroup.
|
||||
* @param leavingMemberId The member that will not be converted in the ClassicGroup.
|
||||
* @param joiningMember The member that needs to be converted and added to the ClassicGroup.
|
||||
* @param logContext The logContext to create the ClassicGroup.
|
||||
* @param time The time to create the ClassicGroup.
|
||||
* @param metadataImage The MetadataImage.
|
||||
|
@ -1365,6 +1367,7 @@ public class ClassicGroup implements Group {
|
|||
public static ClassicGroup fromConsumerGroup(
|
||||
ConsumerGroup consumerGroup,
|
||||
String leavingMemberId,
|
||||
ConsumerGroupMember joiningMember,
|
||||
LogContext logContext,
|
||||
Time time,
|
||||
MetadataImage metadataImage
|
||||
|
@ -1399,15 +1402,38 @@ public class ClassicGroup implements Group {
|
|||
}
|
||||
});
|
||||
|
||||
if (joiningMember != null) {
|
||||
classicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
joiningMember.memberId(),
|
||||
Optional.ofNullable(joiningMember.instanceId()),
|
||||
joiningMember.clientId(),
|
||||
joiningMember.clientHost(),
|
||||
joiningMember.rebalanceTimeoutMs(),
|
||||
joiningMember.classicProtocolSessionTimeout().get(),
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
joiningMember.supportedJoinGroupRequestProtocols(),
|
||||
null
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
|
||||
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
|
||||
|
||||
classicGroup.allMembers().forEach(classicGroupMember -> {
|
||||
// Set the assignment with serializing the ConsumerGroup's targetAssignment.
|
||||
// The serializing version should align with that of the member's JoinGroupRequestProtocol.
|
||||
String memberId = classicGroupMember.memberId();
|
||||
if (joiningMember != null && memberId.equals(joiningMember.memberId())) {
|
||||
// If the downgraded is triggered by the joining static member replacing
|
||||
// the leaving static member, the joining member should take the assignment
|
||||
// of the leaving one.
|
||||
memberId = leavingMemberId;
|
||||
}
|
||||
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(
|
||||
toConsumerProtocolAssignment(
|
||||
consumerGroup.targetAssignment().get(classicGroupMember.memberId()).partitions(),
|
||||
consumerGroup.targetAssignment().get(memberId).partitions(),
|
||||
metadataImage.topics()
|
||||
),
|
||||
ConsumerProtocol.deserializeVersion(
|
||||
|
@ -1452,6 +1478,13 @@ public class ClassicGroup implements Group {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing only.
|
||||
*/
|
||||
public void setLeaderId(Optional<String> leaderId) {
|
||||
this.leaderId = leaderId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ClassicGroupMetadata(" +
|
||||
|
|
|
@ -528,6 +528,40 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
|
|||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the list of records with tombstone(s) for deleting the group.
|
||||
* If the removed member is the leaving member, create its tombstone with
|
||||
* the joining member id.
|
||||
*
|
||||
* @param records The list of records.
|
||||
* @param leavingMemberId The leaving member id.
|
||||
* @param joiningMemberId The joining member id.
|
||||
*/
|
||||
public void createGroupTombstoneRecordsWithReplacedMember(
|
||||
List<CoordinatorRecord> records,
|
||||
String leavingMemberId,
|
||||
String joiningMemberId
|
||||
) {
|
||||
members().forEach((memberId, __) -> {
|
||||
String removedMemberId = memberId.equals(leavingMemberId) ? joiningMemberId : memberId;
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(), removedMemberId));
|
||||
});
|
||||
|
||||
members().forEach((memberId, __) -> {
|
||||
String removedMemberId = memberId.equals(leavingMemberId) ? joiningMemberId : memberId;
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(), removedMemberId));
|
||||
});
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
|
||||
|
||||
members().forEach((memberId, __) -> {
|
||||
String removedMemberId = memberId.equals(leavingMemberId) ? joiningMemberId : memberId;
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(), removedMemberId));
|
||||
});
|
||||
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
|
||||
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return state() == ConsumerGroupState.EMPTY;
|
||||
|
|
|
@ -93,6 +93,7 @@ import org.apache.kafka.server.common.MetadataVersion;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.opentest4j.AssertionFailedError;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -104,6 +105,7 @@ import java.util.Optional;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -10945,6 +10947,12 @@ public class GroupMetadataManagerTest {
|
|||
// A new rebalance is triggered.
|
||||
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
|
||||
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
|
||||
|
||||
// Simulate a failed write to the log.
|
||||
context.rollback();
|
||||
|
||||
// The group is reverted back to the consumer group.
|
||||
assertEquals(consumerGroup, context.groupMetadataManager.consumerGroup(groupId));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -11340,6 +11348,242 @@ public class GroupMetadataManagerTest {
|
|||
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws ExecutionException, InterruptedException {
|
||||
String groupId = "group-id";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String oldMemberId2 = Uuid.randomUuid().toString();
|
||||
String instanceId = "instance-id";
|
||||
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String fooTopicName = "foo";
|
||||
Uuid barTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
|
||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = Collections.singletonList(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
Arrays.asList(fooTopicName, barTopicName),
|
||||
null,
|
||||
Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
new TopicPartition(fooTopicName, 1),
|
||||
new TopicPartition(fooTopicName, 2),
|
||||
new TopicPartition(barTopicName, 0),
|
||||
new TopicPartition(barTopicName, 1)
|
||||
)
|
||||
))))
|
||||
);
|
||||
|
||||
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(5000)
|
||||
.setSupportedProtocols(protocols1)
|
||||
)
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0, 1, 2),
|
||||
mkTopicAssignment(barTopicId, 0, 1)))
|
||||
.build();
|
||||
ConsumerGroupMember oldMember2 = new ConsumerGroupMember.Builder(oldMemberId2)
|
||||
.setInstanceId(instanceId)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId(DEFAULT_CLIENT_ID)
|
||||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
|
||||
.setSubscribedTopicNames(Collections.singletonList("foo"))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 3, 4, 5)))
|
||||
.build();
|
||||
|
||||
// Consumer group with two members.
|
||||
// Member 1 uses the classic protocol and static member 2 uses the consumer protocol.
|
||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
|
||||
.withConsumerGroupAssignors(Collections.singletonList(assignor))
|
||||
.withMetadataImage(new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 6)
|
||||
.addTopic(barTopicId, barTopicName, 2)
|
||||
.addRacks()
|
||||
.build())
|
||||
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||
.withMember(member1)
|
||||
.withMember(oldMember2)
|
||||
.withAssignment(memberId1, mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0, 1, 2),
|
||||
mkTopicAssignment(barTopicId, 0, 1)))
|
||||
.withAssignment(oldMemberId2, mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 3, 4, 5)))
|
||||
.withAssignmentEpoch(10))
|
||||
.build();
|
||||
|
||||
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
|
||||
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
|
||||
{
|
||||
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
|
||||
put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 2));
|
||||
}
|
||||
}));
|
||||
context.commit();
|
||||
|
||||
// A new member using classic protocol with the same instance id joins, scheduling the downgrade.
|
||||
JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
|
||||
.withGroupId(groupId)
|
||||
.withMemberId(UNKNOWN_MEMBER_ID)
|
||||
.withGroupInstanceId(instanceId)
|
||||
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
|
||||
.withDefaultProtocolTypeAndProtocols()
|
||||
.build();
|
||||
GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest);
|
||||
result.appendFuture.complete(null);
|
||||
String newMemberId2 = result.joinFuture.get().memberId();
|
||||
|
||||
ConsumerGroupMember expectedNewConsumerMember2 = new ConsumerGroupMember.Builder(oldMember2, newMemberId2)
|
||||
.setMemberEpoch(0)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.build();
|
||||
ConsumerGroupMember expectedNewClassicMember2 = new ConsumerGroupMember.Builder(oldMember2, newMemberId2)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
|
||||
.setSupportedProtocols(Collections.singletonList(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
Collections.singletonList(fooTopicName)))))))
|
||||
).build();
|
||||
|
||||
byte[] assignment1 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
new TopicPartition(fooTopicName, 1),
|
||||
new TopicPartition(fooTopicName, 2),
|
||||
new TopicPartition(barTopicName, 0),
|
||||
new TopicPartition(barTopicName, 1)
|
||||
))));
|
||||
byte[] assignment2 = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 3),
|
||||
new TopicPartition(fooTopicName, 4),
|
||||
new TopicPartition(fooTopicName, 5)
|
||||
))));
|
||||
Map<String, byte[]> assignments = new HashMap<>();
|
||||
assignments.put(memberId1, assignment1);
|
||||
assignments.put(newMemberId2, assignment2);
|
||||
|
||||
ClassicGroup expectedClassicGroup = new ClassicGroup(
|
||||
new LogContext(),
|
||||
groupId,
|
||||
STABLE,
|
||||
context.time,
|
||||
10,
|
||||
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
|
||||
Optional.of("range"),
|
||||
Optional.of(memberId1),
|
||||
Optional.of(context.time.milliseconds())
|
||||
);
|
||||
expectedClassicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
memberId1,
|
||||
Optional.ofNullable(member1.instanceId()),
|
||||
member1.clientId(),
|
||||
member1.clientHost(),
|
||||
member1.rebalanceTimeoutMs(),
|
||||
member1.classicProtocolSessionTimeout().get(),
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
member1.supportedJoinGroupRequestProtocols(),
|
||||
assignment1
|
||||
)
|
||||
);
|
||||
expectedClassicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
newMemberId2,
|
||||
Optional.ofNullable(oldMember2.instanceId()),
|
||||
DEFAULT_CLIENT_ID,
|
||||
DEFAULT_CLIENT_ADDRESS.toString(),
|
||||
joinRequest.rebalanceTimeoutMs(),
|
||||
joinRequest.sessionTimeoutMs(),
|
||||
joinRequest.protocolType(),
|
||||
joinRequest.protocols(),
|
||||
assignment2
|
||||
)
|
||||
);
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = Arrays.asList(
|
||||
// Remove the existing member 2 that uses the consumer protocol.
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, oldMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, oldMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, oldMemberId2),
|
||||
|
||||
// Create the new member 2 that uses the consumer protocol.
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewConsumerMember2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId2, expectedNewConsumerMember2.assignedPartitions()),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewConsumerMember2),
|
||||
|
||||
// Update the new member 2 to the member that uses classic protocol.
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedNewClassicMember2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedNewClassicMember2),
|
||||
|
||||
// Remove member 1, member 2 and the consumer group.
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
|
||||
|
||||
// Create the classic group.
|
||||
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting())
|
||||
);
|
||||
|
||||
assertEquals(expectedRecords.size(), result.records.size());
|
||||
assertRecordsEquals(expectedRecords.subList(0, 8), result.records.subList(0, 8));
|
||||
assertUnorderedListEquals(expectedRecords.subList(8, 10), result.records.subList(8, 10));
|
||||
assertUnorderedListEquals(expectedRecords.subList(10, 12), result.records.subList(10, 12));
|
||||
assertRecordEquals(expectedRecords.get(12), result.records.get(12));
|
||||
assertUnorderedListEquals(expectedRecords.subList(13, 15), result.records.subList(13, 15));
|
||||
assertRecordsEquals(expectedRecords.subList(15, 17), result.records.subList(15, 17));
|
||||
|
||||
// Leader can be either member 1 or member 2.
|
||||
try {
|
||||
assertRecordEquals(expectedRecords.get(17), result.records.get(17));
|
||||
} catch (AssertionFailedError e) {
|
||||
expectedClassicGroup.setLeaderId(Optional.of(newMemberId2));
|
||||
assertRecordEquals(
|
||||
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()),
|
||||
result.records.get(9)
|
||||
);
|
||||
}
|
||||
|
||||
verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null);
|
||||
|
||||
// The new classic member 1 has a heartbeat timeout.
|
||||
ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = context.timer.timeout(
|
||||
classicGroupHeartbeatKey(groupId, memberId1)
|
||||
);
|
||||
assertNotNull(heartbeatTimeout);
|
||||
|
||||
// No rebalance is triggered.
|
||||
ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
|
||||
assertTrue(classicGroup.isInState(STABLE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
|
||||
String groupId = "group-id";
|
||||
|
|
|
@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.classic;
|
|||
|
||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
|
||||
import org.apache.kafka.common.errors.FencedInstanceIdException;
|
||||
import org.apache.kafka.common.errors.GroupIdNotFoundException;
|
||||
|
@ -25,6 +27,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
|
|||
import org.apache.kafka.common.errors.IllegalGenerationException;
|
||||
import org.apache.kafka.common.errors.RebalanceInProgressException;
|
||||
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
||||
import org.apache.kafka.common.message.JoinGroupRequestData;
|
||||
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
|
||||
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
|
||||
import org.apache.kafka.common.message.JoinGroupResponseData;
|
||||
|
@ -34,10 +37,20 @@ import org.apache.kafka.common.protocol.Errors;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
|
||||
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
|
||||
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberState;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
|
||||
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
|
||||
import org.apache.kafka.image.MetadataImage;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -56,17 +69,21 @@ import java.util.concurrent.CompletableFuture;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
|
||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ClassicGroupTest {
|
||||
private final String protocolType = "consumer";
|
||||
|
@ -1349,6 +1366,272 @@ public class ClassicGroupTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromConsumerGroupWithJoiningMember() {
|
||||
MockTime time = new MockTime();
|
||||
String groupId = "group-id";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
String newMemberId2 = Uuid.randomUuid().toString();
|
||||
String instanceId2 = "instance-id-2";
|
||||
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String fooTopicName = "foo";
|
||||
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 2)
|
||||
.addRacks()
|
||||
.build();
|
||||
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
);
|
||||
consumerGroup.setGroupEpoch(10);
|
||||
consumerGroup.setTargetAssignmentEpoch(10);
|
||||
|
||||
consumerGroup.updateTargetAssignment(memberId1, new Assignment(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0)
|
||||
)));
|
||||
consumerGroup.updateTargetAssignment(memberId2, new Assignment(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 1)
|
||||
)));
|
||||
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = Collections.singletonList(createClassicProtocol(
|
||||
"range",
|
||||
Collections.singletonList(fooTopicName),
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 0))
|
||||
));
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols2 = Collections.singletonList(createClassicProtocol(
|
||||
"range",
|
||||
Collections.singletonList(fooTopicName),
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 1))
|
||||
));
|
||||
|
||||
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId("client-id")
|
||||
.setClientHost("client-host")
|
||||
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(5000)
|
||||
.setSupportedProtocols(protocols1))
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0)))
|
||||
.build();
|
||||
consumerGroup.updateMember(member1);
|
||||
|
||||
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
|
||||
.setInstanceId(instanceId2)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId("client-id")
|
||||
.setClientHost("client-host")
|
||||
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 1)))
|
||||
.build();
|
||||
consumerGroup.updateMember(member2);
|
||||
|
||||
ConsumerGroupMember newMember2 = new ConsumerGroupMember.Builder(member2, newMemberId2)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(0)
|
||||
.setClientId("client-id")
|
||||
.setClientHost("client-host")
|
||||
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(5000)
|
||||
.setSupportedProtocols(protocols2))
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 1)))
|
||||
.build();
|
||||
|
||||
ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
|
||||
consumerGroup,
|
||||
memberId2,
|
||||
newMember2,
|
||||
logContext,
|
||||
time,
|
||||
metadataImage
|
||||
);
|
||||
|
||||
ClassicGroup expectedClassicGroup = new ClassicGroup(
|
||||
logContext,
|
||||
groupId,
|
||||
STABLE,
|
||||
time,
|
||||
10,
|
||||
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
|
||||
Optional.of("range"),
|
||||
Optional.empty(),
|
||||
Optional.of(time.milliseconds())
|
||||
);
|
||||
expectedClassicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
memberId1,
|
||||
Optional.empty(),
|
||||
member1.clientId(),
|
||||
member1.clientHost(),
|
||||
member1.rebalanceTimeoutMs(),
|
||||
member1.classicProtocolSessionTimeout().get(),
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName(protocols1.get(0).name())
|
||||
.setMetadata(protocols1.get(0).metadata())
|
||||
).iterator()),
|
||||
Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 0))
|
||||
)))
|
||||
)
|
||||
);
|
||||
expectedClassicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
newMemberId2,
|
||||
Optional.of(instanceId2),
|
||||
newMember2.clientId(),
|
||||
newMember2.clientHost(),
|
||||
newMember2.rebalanceTimeoutMs(),
|
||||
newMember2.classicProtocolSessionTimeout().get(),
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName(protocols2.get(0).name())
|
||||
.setMetadata(protocols2.get(0).metadata())
|
||||
).iterator()),
|
||||
Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 1))
|
||||
)))
|
||||
)
|
||||
);
|
||||
|
||||
assertClassicGroupEquals(expectedClassicGroup, classicGroup);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromConsumerGroupWithoutJoiningMember() {
|
||||
MockTime time = new MockTime();
|
||||
String groupId = "group-id";
|
||||
String memberId1 = Uuid.randomUuid().toString();
|
||||
String memberId2 = Uuid.randomUuid().toString();
|
||||
String instanceId2 = "instance-id-2";
|
||||
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String fooTopicName = "foo";
|
||||
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 2)
|
||||
.addRacks()
|
||||
.build();
|
||||
|
||||
ConsumerGroup consumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
);
|
||||
consumerGroup.setGroupEpoch(10);
|
||||
consumerGroup.setTargetAssignmentEpoch(10);
|
||||
consumerGroup.updateTargetAssignment(memberId1, new Assignment(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0)
|
||||
)));
|
||||
consumerGroup.updateTargetAssignment(memberId2, new Assignment(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 1)
|
||||
)));
|
||||
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = Collections.singletonList(createClassicProtocol(
|
||||
"range",
|
||||
Collections.singletonList(fooTopicName),
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 0))
|
||||
));
|
||||
|
||||
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId("client-id")
|
||||
.setClientHost("client-host")
|
||||
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(5000)
|
||||
.setSupportedProtocols(protocols1))
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0)))
|
||||
.build();
|
||||
consumerGroup.updateMember(member1);
|
||||
|
||||
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
|
||||
.setInstanceId(instanceId2)
|
||||
.setState(MemberState.STABLE)
|
||||
.setMemberEpoch(10)
|
||||
.setPreviousMemberEpoch(9)
|
||||
.setClientId("client-id")
|
||||
.setClientHost("client-host")
|
||||
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
|
||||
.setServerAssignorName("range")
|
||||
.setRebalanceTimeoutMs(45000)
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 1)))
|
||||
.build();
|
||||
consumerGroup.updateMember(member2);
|
||||
|
||||
ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
|
||||
consumerGroup,
|
||||
memberId2,
|
||||
null,
|
||||
logContext,
|
||||
time,
|
||||
metadataImage
|
||||
);
|
||||
|
||||
ClassicGroup expectedClassicGroup = new ClassicGroup(
|
||||
logContext,
|
||||
groupId,
|
||||
STABLE,
|
||||
time,
|
||||
10,
|
||||
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
|
||||
Optional.of("range"),
|
||||
Optional.empty(),
|
||||
Optional.of(time.milliseconds())
|
||||
);
|
||||
expectedClassicGroup.add(
|
||||
new ClassicGroupMember(
|
||||
memberId1,
|
||||
Optional.empty(),
|
||||
member1.clientId(),
|
||||
member1.clientHost(),
|
||||
member1.rebalanceTimeoutMs(),
|
||||
member1.classicProtocolSessionTimeout().get(),
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName(protocols1.get(0).name())
|
||||
.setMetadata(protocols1.get(0).metadata())
|
||||
).iterator()),
|
||||
Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(
|
||||
Collections.singletonList(new TopicPartition(fooTopicName, 0))
|
||||
)))
|
||||
)
|
||||
);
|
||||
|
||||
assertClassicGroupEquals(expectedClassicGroup, classicGroup);
|
||||
}
|
||||
|
||||
private void assertState(ClassicGroup group, ClassicGroupState targetState) {
|
||||
Set<ClassicGroupState> otherStates = new HashSet<>();
|
||||
otherStates.add(STABLE);
|
||||
|
@ -1360,4 +1643,42 @@ public class ClassicGroupTest {
|
|||
otherStates.forEach(otherState -> assertFalse(group.isInState(otherState)));
|
||||
assertTrue(group.isInState(targetState));
|
||||
}
|
||||
|
||||
private void assertClassicGroupEquals(ClassicGroup expected, ClassicGroup actual) {
|
||||
assertEquals(expected.groupId(), actual.groupId());
|
||||
assertEquals(expected.protocolName(), actual.protocolName());
|
||||
assertEquals(expected.protocolType(), actual.protocolType());
|
||||
assertEquals(expected.leaderOrNull(), actual.leaderOrNull());
|
||||
assertEquals(expected.stateAsString(), actual.stateAsString());
|
||||
assertEquals(expected.generationId(), actual.generationId());
|
||||
assertEquals(expected.allMembers().size(), actual.allMembers().size());
|
||||
expected.allMembers().forEach(expectedMember ->
|
||||
assertClassicGroupMemberEquals(expectedMember, actual.member(expectedMember.memberId())));
|
||||
}
|
||||
|
||||
private void assertClassicGroupMemberEquals(ClassicGroupMember expected, ClassicGroupMember actual) {
|
||||
assertEquals(expected.memberId(), actual.memberId());
|
||||
assertEquals(expected.groupInstanceId(), actual.groupInstanceId());
|
||||
assertEquals(expected.clientId(), actual.clientId());
|
||||
assertEquals(expected.clientHost(), actual.clientHost());
|
||||
assertEquals(expected.rebalanceTimeoutMs(), actual.rebalanceTimeoutMs());
|
||||
assertEquals(expected.sessionTimeoutMs(), actual.sessionTimeoutMs());
|
||||
assertEquals(expected.protocolType(), actual.protocolType());
|
||||
assertEquals(expected.supportedProtocols(), actual.supportedProtocols());
|
||||
assertArrayEquals(expected.assignment(), actual.assignment());
|
||||
}
|
||||
|
||||
private ConsumerGroupMemberMetadataValue.ClassicProtocol createClassicProtocol(
|
||||
String protocolName,
|
||||
List<String> subscribedTopics,
|
||||
List<TopicPartition> assignedTopicPartitions
|
||||
) {
|
||||
return new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName(protocolName)
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
subscribedTopics,
|
||||
null,
|
||||
assignedTopicPartitions
|
||||
))));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.coordinator.group.modern.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
|
@ -25,15 +26,21 @@ import org.apache.kafka.common.errors.StaleMemberEpochException;
|
|||
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
|
||||
import org.apache.kafka.common.message.JoinGroupRequestData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
|
||||
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
|
||||
import org.apache.kafka.coordinator.group.Group;
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
|
||||
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
|
||||
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
|
||||
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
|
||||
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
|
||||
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
|
||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
|
||||
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
|
||||
import org.apache.kafka.coordinator.group.modern.Assignment;
|
||||
|
@ -56,10 +63,14 @@ import java.util.OptionalLong;
|
|||
|
||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||
import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
|
||||
import static org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
|
||||
import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
|
||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
|
||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
|
||||
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
|
||||
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
|
||||
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
@ -1435,4 +1446,156 @@ public class ConsumerGroupTest {
|
|||
consumerGroup.updateMember(member2);
|
||||
assertEquals(1, consumerGroup.numClassicProtocolMembers());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateGroupTombstoneRecordsWithReplacedMember() {
|
||||
String groupId = "group";
|
||||
String memberId1 = "member-1";
|
||||
String memberId2 = "member-2";
|
||||
String newMemberId2 = "new-member-2";
|
||||
|
||||
ConsumerGroup consumerGroup = createConsumerGroup(groupId);
|
||||
List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new ArrayList<>();
|
||||
protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(new byte[0]));
|
||||
|
||||
ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1)
|
||||
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSupportedProtocols(protocols))
|
||||
.build();
|
||||
consumerGroup.updateMember(member1);
|
||||
|
||||
ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2)
|
||||
.setInstanceId("instance-id-2")
|
||||
.build();
|
||||
consumerGroup.updateMember(member2);
|
||||
|
||||
List<CoordinatorRecord> records = new ArrayList<>();
|
||||
consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, memberId2, newMemberId2);
|
||||
|
||||
List<CoordinatorRecord> expectedRecords = Arrays.asList(
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, newMemberId2),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
|
||||
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
|
||||
);
|
||||
assertEquals(expectedRecords.size(), records.size());
|
||||
assertUnorderedListEquals(expectedRecords.subList(0, 2), records.subList(0, 2));
|
||||
assertUnorderedListEquals(expectedRecords.subList(2, 4), records.subList(2, 4));
|
||||
assertRecordEquals(expectedRecords.get(4), records.get(4));
|
||||
assertUnorderedListEquals(expectedRecords.subList(5, 7), records.subList(5, 7));
|
||||
assertRecordsEquals(expectedRecords.subList(7, 9), records.subList(7, 9));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromClassicGroup() {
|
||||
MockTime time = new MockTime();
|
||||
LogContext logContext = new LogContext();
|
||||
String groupId = "group-id";
|
||||
String memberId = Uuid.randomUuid().toString();
|
||||
|
||||
Uuid fooTopicId = Uuid.randomUuid();
|
||||
String fooTopicName = "foo";
|
||||
Uuid barTopicId = Uuid.randomUuid();
|
||||
String barTopicName = "bar";
|
||||
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(fooTopicId, fooTopicName, 1)
|
||||
.addTopic(barTopicId, barTopicName, 1)
|
||||
.addRacks()
|
||||
.build();
|
||||
|
||||
ClassicGroup classicGroup = new ClassicGroup(
|
||||
logContext,
|
||||
groupId,
|
||||
STABLE,
|
||||
time,
|
||||
10,
|
||||
Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
|
||||
Optional.of("range"),
|
||||
Optional.empty(),
|
||||
Optional.of(time.milliseconds())
|
||||
);
|
||||
|
||||
ClassicGroupMember member = new ClassicGroupMember(
|
||||
memberId,
|
||||
Optional.empty(),
|
||||
"client-id",
|
||||
"client-host",
|
||||
5000,
|
||||
500,
|
||||
ConsumerProtocol.PROTOCOL_TYPE,
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
|
||||
new JoinGroupRequestData.JoinGroupRequestProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
Arrays.asList(fooTopicName, barTopicName),
|
||||
null,
|
||||
Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
new TopicPartition(barTopicName, 0))))))
|
||||
).iterator()),
|
||||
Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
new TopicPartition(barTopicName, 0)
|
||||
))))
|
||||
);
|
||||
classicGroup.add(member);
|
||||
|
||||
ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
mock(GroupCoordinatorMetricsShard.class),
|
||||
classicGroup,
|
||||
metadataImage.topics()
|
||||
);
|
||||
|
||||
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
|
||||
new SnapshotRegistry(logContext),
|
||||
groupId,
|
||||
mock(GroupCoordinatorMetricsShard.class)
|
||||
);
|
||||
expectedConsumerGroup.setGroupEpoch(10);
|
||||
expectedConsumerGroup.setTargetAssignmentEpoch(10);
|
||||
expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0)
|
||||
)));
|
||||
expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId)
|
||||
.setMemberEpoch(classicGroup.generationId())
|
||||
.setState(MemberState.STABLE)
|
||||
.setPreviousMemberEpoch(classicGroup.generationId())
|
||||
.setInstanceId(null)
|
||||
.setRackId(null)
|
||||
.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
|
||||
.setClientId(member.clientId())
|
||||
.setClientHost(member.clientHost())
|
||||
.setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
|
||||
.setAssignedPartitions(mkAssignment(
|
||||
mkTopicAssignment(fooTopicId, 0),
|
||||
mkTopicAssignment(barTopicId, 0)))
|
||||
.setClassicMemberMetadata(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
|
||||
.setSessionTimeoutMs(member.sessionTimeoutMs())
|
||||
.setSupportedProtocols(Collections.singletonList(
|
||||
new ConsumerGroupMemberMetadataValue.ClassicProtocol()
|
||||
.setName("range")
|
||||
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
|
||||
Arrays.asList(fooTopicName, barTopicName),
|
||||
null,
|
||||
Arrays.asList(
|
||||
new TopicPartition(fooTopicName, 0),
|
||||
new TopicPartition(barTopicName, 0)))))))))
|
||||
.build());
|
||||
|
||||
assertEquals(expectedConsumerGroup.groupId(), consumerGroup.groupId());
|
||||
assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch());
|
||||
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
|
||||
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
|
||||
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue