From e73719d9624efa041498776a6bc0158cb82ac3d9 Mon Sep 17 00:00:00 2001 From: Lan Ding <53332773+DL1231@users.noreply.github.com> Date: Thu, 20 Mar 2025 03:42:05 +0800 Subject: [PATCH] KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API check topic describe (#19183) This patch filters out the topic describe unauthorized topics from the StreamsGroupHeartbeat and StreamsGroupDescribe response. Reviewers: Lucas Brutschy --- .../DescribeStreamsGroupsHandler.java | 1 + .../StreamsGroupDescribeResponse.java | 1 + .../message/StreamsGroupDescribeResponse.json | 1 + .../StreamsGroupHeartbeatResponse.json | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 55 +++++- .../unit/kafka/server/KafkaApisTest.scala | 181 +++++++++++++++++- 6 files changed, 232 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java index 8bf793bd31c..8355a78b9d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java @@ -238,6 +238,7 @@ public class DescribeStreamsGroupsHandler extends AdminApiHandler.Batched groupsToUnmap) { switch (error) { case GROUP_AUTHORIZATION_FAILED: + case TOPIC_AUTHORIZATION_FAILED: log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception(errorMsg)); break; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java index 83db6700a4a..0439b955325 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupDescribeResponse.java @@ -35,6 +35,7 @@ import java.util.Map; * - {@link Errors#INVALID_REQUEST} * - {@link Errors#INVALID_GROUP_ID} * - {@link Errors#GROUP_ID_NOT_FOUND} + * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} */ public class StreamsGroupDescribeResponse extends AbstractResponse { diff --git a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json index 9cf2954c17f..5dff3d7bf44 100644 --- a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json @@ -27,6 +27,7 @@ // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index 43b5268e205..a5f3a99f9de 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -30,7 +30,7 @@ // - FENCED_MEMBER_EPOCH (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) - // - TOPIC_AUTHORIZATION_FAILED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - CLUSTER_AUTHORIZATION_FAILED (version 0+) // - STREAMS_INVALID_TOPOLOGY (version 0+) // - STREAMS_INVALID_TOPOLOGY_EPOCH (version 0+) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index adf5c5a6e53..e2d8e17f950 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2707,11 +2707,20 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse)) return CompletableFuture.completedFuture[Unit](()) } + + if (requiredTopics.nonEmpty) { + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, requiredTopics)(identity) + if (authorizedTopics.size < requiredTopics.size) { + val responseData = new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData)) + return CompletableFuture.completedFuture[Unit](()) + } + } } groupCoordinator.streamsGroupHeartbeat( request.context, - streamsGroupHeartbeatRequest.data, + streamsGroupHeartbeatRequest.data ).handle[Unit] { (response, exception) => if (exception != null) { requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception)) @@ -2795,6 +2804,50 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + if (authorizer.isDefined) { + val topicsToCheck = response.groups.stream() + .filter(group => group.topology != null) + .flatMap(group => group.topology.subtopologies.stream) + .flatMap(subtopology => java.util.stream.Stream.concat( + java.util.stream.Stream.concat( + java.util.stream.Stream.concat( + subtopology.sourceTopics.stream, + subtopology.repartitionSinkTopics.stream), + subtopology.repartitionSourceTopics.stream.map(_.name)), + subtopology.stateChangelogTopics.stream.map(_.name))) + .collect(Collectors.toSet[String]) + .asScala + + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + topicsToCheck)(identity) + + val updatedGroups = response.groups.stream.map { group => + val hasUnauthorizedTopic = if (group.topology == null) false else + group.topology.subtopologies.stream() + .flatMap(subtopology => java.util.stream.Stream.concat( + java.util.stream.Stream.concat( + java.util.stream.Stream.concat( + subtopology.sourceTopics.stream, + subtopology.repartitionSinkTopics.stream), + subtopology.repartitionSourceTopics.stream.map(_.name)), + subtopology.stateChangelogTopics.stream.map(_.name))) + .anyMatch(topic => !authorizedTopics.contains(topic)) + + if (hasUnauthorizedTopic) { + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(group.groupId) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage("The described group uses topics that the client is not authorized to describe.") + .setMembers(List.empty.asJava) + .setTopology(null) + } else { + group + } + }.collect(Collectors.toList[StreamsGroupDescribeResponseData.DescribedGroup]) + response.setGroups(updatedGroups) + } + requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response)) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 70a64f47160..9c382d709c4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -9883,7 +9883,7 @@ class KafkaApisTest extends Logging { } @Test - def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = { + def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -9903,6 +9903,58 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + val groupId = "group" + val fooTopicName = "foo" + val barTopicName = "bar" + val zarTopicName = "zar" + val tarTopicName = "tar" + + val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology( + new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(3) + .setSubtopologies( + util.List.of(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology") + .setSourceTopics(Collections.singletonList(fooTopicName)) + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + .setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName))) + .setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName))) + ) + ) + ) + + val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + val acls = Map( + groupId -> AuthorizationResult.ALLOWED, + fooTopicName -> AuthorizationResult.ALLOWED, + barTopicName -> AuthorizationResult.DENIED, + zarTopicName -> AuthorizationResult.ALLOWED, + tarTopicName -> AuthorizationResult.ALLOWED + ) + when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] + )).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => + acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava + } + + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode) + } + @Test def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) @@ -10230,6 +10282,8 @@ class KafkaApisTest extends Logging { @ValueSource(booleans = Array(true, false)) def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) + val fooTopicName = "foo" + val barTopicName = "bar" val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() @@ -10247,10 +10301,32 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology0") + .setSourceTopics(Collections.singletonList(fooTopicName)) + + val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology1") + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + + val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology2") + .setSourceTopics(Collections.singletonList(fooTopicName)) + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + future.complete(List( - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)), - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2)) + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(0)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology0))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(1)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology1))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(2)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology2))) ).asJava) var authorizedOperationsInt = Int.MinValue; @@ -10262,9 +10338,18 @@ class KafkaApisTest extends Logging { // Can't reuse the above list here because we would not test the implementation in KafkaApis then val describedGroups = List( - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)), - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)), - new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2)) + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(0)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology0))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(1)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology1))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(2)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology2))) ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData() .setGroups(describedGroups.asJava) @@ -10353,6 +10438,88 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testStreamsGroupDescribeFilterUnauthorizedTopics(includeAuthorizedOperations: Boolean): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + val errorMessage = "The described group uses topics that the client is not authorized to describe." + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData() + .setIncludeAuthorizedOperations(includeAuthorizedOperations) + streamsGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + val acls = Map( + groupIds.get(0) -> AuthorizationResult.ALLOWED, + groupIds.get(1) -> AuthorizationResult.ALLOWED, + groupIds.get(2) -> AuthorizationResult.ALLOWED, + fooTopicName -> AuthorizationResult.ALLOWED, + barTopicName -> AuthorizationResult.DENIED, + ) + when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] + )).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => + acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava + } + + val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.streamsGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams") + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology0") + .setSourceTopics(Collections.singletonList(fooTopicName)) + + val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology1") + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + + val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId("subtopology2") + .setSourceTopics(Collections.singletonList(fooTopicName)) + .setRepartitionSinkTopics(Collections.singletonList(barTopicName)) + + future.complete(List( + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(0)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology0))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(1)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology1))), + new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupIds.get(2)) + .setTopology(new StreamsGroupDescribeResponseData.Topology() + .setSubtopologies(Collections.singletonList(subtopology2))) + ).asJava) + + val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest) + assertNotNull(response.data) + assertEquals(3, response.data.groups.size) + assertEquals(Errors.NONE.code(), response.data.groups.get(0).errorCode()) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(1).errorCode()) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(2).errorCode()) + assertEquals(errorMessage, response.data.groups.get(1).errorMessage()) + assertEquals(errorMessage, response.data.groups.get(2).errorMessage()) + } + @Test def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { val fooTopicName = "foo"