KAFKA-17750 Extend kafka-consumer-groups command line tool to support new consumer group (part 1) (#17958)

1) Bump validVersions of ConsumerGroupDescribeRequest.json and ConsumerGroupDescribeResponse.json to "0-1".

2) Add MemberType field to ConsumerGroupDescribeResponse.json. Default value is -1 (unknown). 0 is for classic member and 1 is for consumer member.

3) When ConsumerGroupMember#useClassicProtocol is true, return MemberType field as 0. Otherwise, return 1.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-12-04 06:08:39 +08:00 committed by GitHub
parent ac8b3dfbf0
commit fe88232b07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 138 additions and 13 deletions

View File

@ -18,7 +18,9 @@
"type": "request",
"listeners": ["broker"],
"name": "ConsumerGroupDescribeRequest",
"validVersions": "0",
// Version 1 adds MemberType field to ConsumerGroupDescribeResponse (KIP-1099).
// For ConsumerGroupDescribeRequest, version 1 is same as version 0.
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",

View File

@ -17,7 +17,8 @@
"apiKey": 69,
"type": "response",
"name": "ConsumerGroupDescribeResponse",
"validVersions": "0",
// Version 1 adds MemberType field (KIP-1099).
"validVersions": "0-1",
"flexibleVersions": "0+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
@ -69,7 +70,9 @@
{ "name": "Assignment", "type": "Assignment", "versions": "0+",
"about": "The current assignment." },
{ "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
"about": "The target assignment." }
"about": "The target assignment." },
{ "name": "MemberType", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true,
"about": "-1 for unknown. 0 for classic member. +1 for consumer member." }
]},
{ "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
"about": "32-bit bitfield to represent authorized operations for this group." }

View File

@ -20,7 +20,9 @@ import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.{ConsumerGroupState, Uuid}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid}
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -30,10 +32,11 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse}
import org.junit.jupiter.api.extension.ExtendWith
import java.lang.{Byte => JByte}
import java.util.Collections
import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ -153,6 +156,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
.setClientHost(clientHost)
.setSubscribedTopicRegex("")
.setSubscribedTopicNames(List("bar").asJava)
.setMemberType(if (version == 0) -1.toByte else 1.toByte)
).asJava),
new DescribedGroup()
.setGroupId("grp-2")
@ -176,7 +180,8 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](2).asJava)
).asJava)),
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member1Response.memberId)
.setMemberEpoch(grp2Member1Response.memberEpoch)
@ -197,7 +202,8 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
.setTopicId(topicId)
.setTopicName("foo")
.setPartitions(List[Integer](0, 1).asJava)
).asJava)),
).asJava))
.setMemberType(if (version == 0) -1.toByte else 1.toByte),
).asJava),
)
@ -213,4 +219,108 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
admin.close()
}
}
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testConsumerGroupDescribeWithMigrationMember(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
// Create the topic.
val topicName = "foo"
createTopic(
topic = topicName,
numPartitions = 3
)
val groupId = "grp"
// Classic member 1 joins the classic group.
val memberId1 = joinDynamicConsumerGroupWithOldProtocol(
groupId = groupId,
metadata = ConsumerProtocol.serializeSubscription(
new ConsumerPartitionAssignor.Subscription(
Collections.singletonList(topicName),
null,
List().asJava
)
).array,
assignment = ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(
List(0, 1, 2).map(p => new TopicPartition(topicName, p)).asJava
)
).array
)._1
// The joining request with a consumer group member 2 is accepted.
val memberId2 = consumerGroupHeartbeat(
groupId = groupId,
memberId = "member-2",
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List(topicName),
topicPartitions = List.empty,
expectedError = Errors.NONE
).memberId
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = consumerGroupDescribe(
groupIds = List(groupId),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(1, actual.size)
val group = actual.head
val member1 = group.members.asScala.find(_.memberId == memberId1)
assertFalse(member1.isEmpty)
// Version 0 doesn't have memberType field, so memberType field on member 1 is -1 (unknown).
// After version 1, there is memberType field and it should be +1 (classic) for member 1.
assertEquals(if (version == 0) -1.toByte else 0.toByte, member1.get.memberType)
val member2 = group.members.asScala.find(_.memberId == memberId2)
assertFalse(member2.isEmpty)
assertEquals(if (version == 0) -1.toByte else 1.toByte, member2.get.memberType)
}
// Classic member 1 leaves group.
leaveGroup(
groupId = groupId,
memberId = memberId1,
useNewProtocol = false,
version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
)
// Member 1 joins as consumer group member.
consumerGroupHeartbeat(
groupId = groupId,
memberId = memberId1,
rebalanceTimeoutMs = 5 * 60 * 1000,
subscribedTopicNames = List(topicName),
topicPartitions = List.empty,
expectedError = Errors.NONE
)
// There is no classic member in the group.
for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = consumerGroupDescribe(
groupIds = List(groupId),
includeAuthorizedOperations = true,
version = version.toShort,
)
assertEquals(1, actual.size)
val group = actual.head
val member1 = group.members.asScala.find(_.memberId == memberId1)
assertFalse(member1.isEmpty)
assertEquals(if (version == 0) -1.toByte else 1.toByte, member1.get.memberType)
val member2 = group.members.asScala.find(_.memberId == memberId2)
assertFalse(member2.isEmpty)
assertEquals(if (version == 0) -1.toByte else 1.toByte, member2.get.memberType)
}
}
}

View File

@ -408,7 +408,8 @@ public class ConsumerGroupMember extends ModernGroupMember {
.setInstanceId(instanceId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames))
.setSubscribedTopicRegex(subscribedTopicRegex);
.setSubscribedTopicRegex(subscribedTopicRegex)
.setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1);
}
private static List<ConsumerGroupDescribeResponseData.TopicPartitions> topicPartitionsFromMap(

View File

@ -26,6 +26,8 @@ import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.image.MetadataImage;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
@ -246,8 +248,9 @@ public class ConsumerGroupMemberTest {
assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation());
}
@Test
public void testAsConsumerGroupDescribeMember() {
@ParameterizedTest(name = "{displayName}.withClassicMemberMetadata={0}")
@ValueSource(booleans = {true, false})
public void testAsConsumerGroupDescribeMember(boolean withClassicMemberMetadata) {
Uuid topicId1 = Uuid.randomUuid();
Uuid topicId2 = Uuid.randomUuid();
Uuid topicId3 = Uuid.randomUuid();
@ -287,6 +290,8 @@ public class ConsumerGroupMemberTest {
.setClientHost(clientHost)
.setSubscribedTopicNames(subscribedTopicNames)
.setSubscribedTopicRegex(subscribedTopicRegex)
.setClassicMemberMetadata(withClassicMemberMetadata ? new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSupportedProtocols(toClassicProtocolCollection("range")) : null)
.build();
ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics());
@ -315,7 +320,8 @@ public class ConsumerGroupMemberTest {
.setTopicName("topic4")
.setPartitions(new ArrayList<>(item.getValue()))
).collect(Collectors.toList()))
);
)
.setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1);
assertEquals(expected, actual);
}
@ -344,7 +350,8 @@ public class ConsumerGroupMemberTest {
ConsumerGroupDescribeResponseData.Member expected = new ConsumerGroupDescribeResponseData.Member()
.setMemberId(memberId.toString())
.setSubscribedTopicRegex("");
.setSubscribedTopicRegex("")
.setMemberType((byte) 1);
ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(null,
new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 3)

View File

@ -1283,9 +1283,11 @@ public class ConsumerGroupTest {
new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.setSubscribedTopicRegex(""),
.setSubscribedTopicRegex("")
.setMemberType((byte) 1),
new ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
.setSubscribedTopicRegex("")
.setMemberType((byte) 1)
));
ConsumerGroupDescribeResponseData.DescribedGroup actual = group.asDescribedGroup(1, "",
new MetadataImageBuilder().build().topics());