KAFKA-14509; [4/4] Handle includeAuthorizedOperations in ConsumerGroupDescribe API (#16158)

This patch implements the handling of `includeAuthorizedOperations` flag in the ConsumerGroupDescribe API.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Max Riedel 2024-06-10 14:07:45 +02:00 committed by David Jacot
parent b0333f2ad5
commit db3bf4ae3d
4 changed files with 50 additions and 10 deletions

View File

@ -3831,6 +3831,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = {
val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest]
val includeAuthorizedOperations = consumerGroupDescribeRequest.data.includeAuthorizedOperations
if (!isConsumerGroupProtocolEnabled()) { if (!isConsumerGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the default). If the // The API is not supported by the "old" group coordinator (the default). If the
@ -3859,6 +3860,17 @@ class KafkaApis(val requestChannel: RequestChannel,
if (exception != null) { if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception))
} else { } else {
if (includeAuthorizedOperations) {
results.forEach { groupResult =>
if (groupResult.errorCode == Errors.NONE.code) {
groupResult.setAuthorizedOperations(authHelper.authorizedOperations(
request,
new Resource(ResourceType.GROUP, groupResult.groupId)
))
}
}
}
if (response.groups.isEmpty) { if (response.groups.isEmpty) {
// If the response is empty, we can directly reuse the results. // If the response is empty, we can directly reuse the results.
response.setGroups(results) response.setGroups(results)

View File

@ -16,9 +16,8 @@
*/ */
package kafka.server package kafka.server
import kafka.server.GroupCoordinatorBaseRequestTest
import kafka.test.ClusterInstance import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterFeature, ClusterTest, ClusterTestDefaults, Type} import kafka.test.annotation._
import kafka.test.junit.ClusterTestExtensions import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.ConsumerGroupState import org.apache.kafka.common.ConsumerGroupState
@ -26,11 +25,15 @@ import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assign
import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.message.{ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse} import org.apache.kafka.common.requests.{ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Features import org.apache.kafka.server.common.Features
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{Tag, Timeout} import org.junit.jupiter.api.{Tag, Timeout}
import java.lang.{Byte => JByte}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@Timeout(120) @Timeout(120)
@ -116,6 +119,9 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
val timeoutMs = 5 * 60 * 1000 val timeoutMs = 5 * 60 * 1000
val clientId = "client-id" val clientId = "client-id"
val clientHost = "/127.0.0.1" val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
// Add first group with one member. // Add first group with one member.
var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null var grp1Member1Response: ConsumerGroupHeartbeatResponseData = null
@ -162,6 +168,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(1) .setGroupEpoch(1)
.setAssignmentEpoch(1) .setAssignmentEpoch(1)
.setAssignorName("uniform") .setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List( .setMembers(List(
new ConsumerGroupDescribeResponseData.Member() new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp1Member1Response.memberId) .setMemberId(grp1Member1Response.memberId)
@ -177,6 +184,7 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
.setGroupEpoch(grp2Member2Response.memberEpoch) .setGroupEpoch(grp2Member2Response.memberEpoch)
.setAssignmentEpoch(grp2Member2Response.memberEpoch) .setAssignmentEpoch(grp2Member2Response.memberEpoch)
.setAssignorName("range") .setAssignorName("range")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List( .setMembers(List(
new ConsumerGroupDescribeResponseData.Member() new ConsumerGroupDescribeResponseData.Member()
.setMemberId(grp2Member2Response.memberId) .setMemberId(grp2Member2Response.memberId)
@ -219,7 +227,8 @@ class ConsumerGroupDescribeRequestsTest(cluster: ClusterInstance) extends GroupC
val actual = consumerGroupDescribe( val actual = consumerGroupDescribe(
groupIds = List("grp-1", "grp-2"), groupIds = List("grp-1", "grp-2"),
version = version.toShort includeAuthorizedOperations = true,
version = version.toShort,
) )
assertEquals(expected, actual) assertEquals(expected, actual)

View File

@ -421,10 +421,13 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
protected def consumerGroupDescribe( protected def consumerGroupDescribe(
groupIds: List[String], groupIds: List[String],
includeAuthorizedOperations: Boolean,
version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled) version: Short = ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[ConsumerGroupDescribeResponseData.DescribedGroup] = { ): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder( val consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData().setGroupIds(groupIds.asJava) new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version) ).build(version)
val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest) val consumerGroupDescribeResponse = connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)

View File

@ -76,6 +76,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1} import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
@ -92,6 +93,7 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import java.lang.{Byte => JByte}
import java.net.InetAddress import java.net.InetAddress
import java.nio.charset.StandardCharsets import java.nio.charset.StandardCharsets
import java.time.Duration import java.time.Duration
@ -7115,8 +7117,9 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
} }
@Test @ParameterizedTest
def testConsumerGroupDescribe(): Unit = { @ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache]) metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn { when(metadataCache.features()).thenReturn {
new FinalizedFeatures( new FinalizedFeatures(
@ -7129,6 +7132,7 @@ class KafkaApisTest extends Logging {
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
consumerGroupDescribeRequestData.groupIds.addAll(groupIds) consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())
@ -7143,15 +7147,27 @@ class KafkaApisTest extends Logging {
) )
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching) kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
future.complete(List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
).asJava)
var authorizedOperationsInt = Int.MinValue;
if (includeAuthorizedOperations) {
authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
}
// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List( val describedGroups = List(
new DescribedGroup().setGroupId(groupIds.get(0)), new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)), new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2)) new DescribedGroup().setGroupId(groupIds.get(2))
).asJava ).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
future.complete(describedGroups)
val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(describedGroups) .setGroups(describedGroups.asJava)
val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)