diff --git a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json index a581d15dee3..ff404a0966f 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json +++ b/clients/src/main/resources/common/message/ConsumerGroupDescribeRequest.json @@ -18,7 +18,9 @@ "type": "request", "listeners": ["broker"], "name": "ConsumerGroupDescribeRequest", - "validVersions": "0", + // Version 1 adds MemberType field to ConsumerGroupDescribeResponse (KIP-1099). + // For ConsumerGroupDescribeRequest, version 1 is same as version 0. + "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", diff --git a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json index 3c6ed4e78de..14d80e20ce2 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json @@ -17,7 +17,8 @@ "apiKey": 69, "type": "response", "name": "ConsumerGroupDescribeResponse", - "validVersions": "0", + // Version 1 adds MemberType field (KIP-1099). + "validVersions": "0-1", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -69,7 +70,9 @@ { "name": "Assignment", "type": "Assignment", "versions": "0+", "about": "The current assignment." }, { "name": "TargetAssignment", "type": "Assignment", "versions": "0+", - "about": "The target assignment." } + "about": "The target assignment." }, + { "name": "MemberType", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true, + "about": "-1 for unknown. 0 for classic member. +1 for consumer member." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala index ad98d2d875e..47e287a7166 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala @@ -20,7 +20,9 @@ import org.apache.kafka.common.test.api.ClusterInstance import org.apache.kafka.common.test.api._ import org.apache.kafka.common.test.api.ClusterTestExtensions import kafka.utils.TestUtils -import org.apache.kafka.common.{ConsumerGroupState, Uuid} +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.{ConsumerGroupState, TopicPartition, Uuid} import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions} import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -30,10 +32,11 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.common.Features -import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} import org.junit.jupiter.api.extension.ExtendWith import java.lang.{Byte => JByte} +import java.util.Collections import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @@ -153,6 +156,7 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo .setClientHost(clientHost) .setSubscribedTopicRegex("") .setSubscribedTopicNames(List("bar").asJava) + .setMemberType(if (version == 0) -1.toByte else 1.toByte) ).asJava), new DescribedGroup() .setGroupId("grp-2") @@ -176,7 +180,8 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo .setTopicId(topicId) .setTopicName("foo") .setPartitions(List[Integer](2).asJava) - ).asJava)), + ).asJava)) + .setMemberType(if (version == 0) -1.toByte else 1.toByte), new ConsumerGroupDescribeResponseData.Member() .setMemberId(grp2Member1Response.memberId) .setMemberEpoch(grp2Member1Response.memberEpoch) @@ -197,7 +202,8 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo .setTopicId(topicId) .setTopicName("foo") .setPartitions(List[Integer](0, 1).asJava) - ).asJava)), + ).asJava)) + .setMemberType(if (version == 0) -1.toByte else 1.toByte), ).asJava), ) @@ -213,4 +219,108 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo admin.close() } } + + @ClusterTest( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) + ) + def testConsumerGroupDescribeWithMigrationMember(): Unit = { + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + val topicName = "foo" + createTopic( + topic = topicName, + numPartitions = 3 + ) + + val groupId = "grp" + + // Classic member 1 joins the classic group. + val memberId1 = joinDynamicConsumerGroupWithOldProtocol( + groupId = groupId, + metadata = ConsumerProtocol.serializeSubscription( + new ConsumerPartitionAssignor.Subscription( + Collections.singletonList(topicName), + null, + List().asJava + ) + ).array, + assignment = ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment( + List(0, 1, 2).map(p => new TopicPartition(topicName, p)).asJava + ) + ).array + )._1 + + // The joining request with a consumer group member 2 is accepted. + val memberId2 = consumerGroupHeartbeat( + groupId = groupId, + memberId = "member-2", + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List(topicName), + topicPartitions = List.empty, + expectedError = Errors.NONE + ).memberId + + for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) { + val actual = consumerGroupDescribe( + groupIds = List(groupId), + includeAuthorizedOperations = true, + version = version.toShort, + ) + assertEquals(1, actual.size) + val group = actual.head + val member1 = group.members.asScala.find(_.memberId == memberId1) + assertFalse(member1.isEmpty) + // Version 0 doesn't have memberType field, so memberType field on member 1 is -1 (unknown). + // After version 1, there is memberType field and it should be +1 (classic) for member 1. + assertEquals(if (version == 0) -1.toByte else 0.toByte, member1.get.memberType) + + val member2 = group.members.asScala.find(_.memberId == memberId2) + assertFalse(member2.isEmpty) + assertEquals(if (version == 0) -1.toByte else 1.toByte, member2.get.memberType) + } + + // Classic member 1 leaves group. + leaveGroup( + groupId = groupId, + memberId = memberId1, + useNewProtocol = false, + version = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled) + ) + + // Member 1 joins as consumer group member. + consumerGroupHeartbeat( + groupId = groupId, + memberId = memberId1, + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List(topicName), + topicPartitions = List.empty, + expectedError = Errors.NONE + ) + + // There is no classic member in the group. + for (version <- ApiKeys.CONSUMER_GROUP_DESCRIBE.oldestVersion() to ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) { + val actual = consumerGroupDescribe( + groupIds = List(groupId), + includeAuthorizedOperations = true, + version = version.toShort, + ) + assertEquals(1, actual.size) + val group = actual.head + val member1 = group.members.asScala.find(_.memberId == memberId1) + assertFalse(member1.isEmpty) + assertEquals(if (version == 0) -1.toByte else 1.toByte, member1.get.memberType) + + val member2 = group.members.asScala.find(_.memberId == memberId2) + assertFalse(member2.isEmpty) + assertEquals(if (version == 0) -1.toByte else 1.toByte, member2.get.memberType) + } + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java index 2f04931bda4..69fa5cdd50c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java @@ -408,7 +408,8 @@ public class ConsumerGroupMember extends ModernGroupMember { .setInstanceId(instanceId) .setRackId(rackId) .setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames)) - .setSubscribedTopicRegex(subscribedTopicRegex); + .setSubscribedTopicRegex(subscribedTopicRegex) + .setMemberType(useClassicProtocol() ? (byte) 0 : (byte) 1); } private static List topicPartitionsFromMap( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java index 96122e79de3..658e6c23260 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.image.MetadataImage; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -246,8 +248,9 @@ public class ConsumerGroupMemberTest { assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation()); } - @Test - public void testAsConsumerGroupDescribeMember() { + @ParameterizedTest(name = "{displayName}.withClassicMemberMetadata={0}") + @ValueSource(booleans = {true, false}) + public void testAsConsumerGroupDescribeMember(boolean withClassicMemberMetadata) { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); Uuid topicId3 = Uuid.randomUuid(); @@ -287,6 +290,8 @@ public class ConsumerGroupMemberTest { .setClientHost(clientHost) .setSubscribedTopicNames(subscribedTopicNames) .setSubscribedTopicRegex(subscribedTopicRegex) + .setClassicMemberMetadata(withClassicMemberMetadata ? new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSupportedProtocols(toClassicProtocolCollection("range")) : null) .build(); ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(targetAssignment, metadataImage.topics()); @@ -315,7 +320,8 @@ public class ConsumerGroupMemberTest { .setTopicName("topic4") .setPartitions(new ArrayList<>(item.getValue())) ).collect(Collectors.toList())) - ); + ) + .setMemberType(withClassicMemberMetadata ? (byte) 0 : (byte) 1); assertEquals(expected, actual); } @@ -344,7 +350,8 @@ public class ConsumerGroupMemberTest { ConsumerGroupDescribeResponseData.Member expected = new ConsumerGroupDescribeResponseData.Member() .setMemberId(memberId.toString()) - .setSubscribedTopicRegex(""); + .setSubscribedTopicRegex("") + .setMemberType((byte) 1); ConsumerGroupDescribeResponseData.Member actual = member.asConsumerGroupDescribeMember(null, new MetadataImageBuilder() .addTopic(Uuid.randomUuid(), "foo", 3) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index fc589df97a7..9c829630f09 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1283,9 +1283,11 @@ public class ConsumerGroupTest { new ConsumerGroupDescribeResponseData.Member() .setMemberId("member1") .setSubscribedTopicNames(Collections.singletonList("foo")) - .setSubscribedTopicRegex(""), + .setSubscribedTopicRegex("") + .setMemberType((byte) 1), new ConsumerGroupDescribeResponseData.Member().setMemberId("member2") .setSubscribedTopicRegex("") + .setMemberType((byte) 1) )); ConsumerGroupDescribeResponseData.DescribedGroup actual = group.asDescribedGroup(1, "", new MetadataImageBuilder().build().topics());