KAFKA-18819 StreamsGroupHeartbeat API and StreamsGroupDescribe API check topic describe (#19183)

This patch filters out the topic describe unauthorized topics from the
StreamsGroupHeartbeat and StreamsGroupDescribe response.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Lan Ding 2025-03-20 03:42:05 +08:00 committed by GitHub
parent fcca4056fd
commit e73719d962
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 232 additions and 9 deletions

View File

@ -238,6 +238,7 @@ public class DescribeStreamsGroupsHandler extends AdminApiHandler.Batched<Coordi
Set<CoordinatorKey> groupsToUnmap) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
case TOPIC_AUTHORIZATION_FAILED:
log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", groupId.idValue, error);
failed.put(groupId, error.exception(errorMsg));
break;

View File

@ -35,6 +35,7 @@ import java.util.Map;
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {

View File

@ -27,6 +27,7 @@
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

View File

@ -2707,11 +2707,20 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(errorResponse))
return CompletableFuture.completedFuture[Unit](())
}
if (requiredTopics.nonEmpty) {
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, requiredTopics)(identity)
if (authorizedTopics.size < requiredTopics.size) {
val responseData = new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
return CompletableFuture.completedFuture[Unit](())
}
}
}
groupCoordinator.streamsGroupHeartbeat(
request.context,
streamsGroupHeartbeatRequest.data,
streamsGroupHeartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(exception))
@ -2795,6 +2804,50 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}
// Clients are not allowed to see topics that are not authorized for Describe.
if (authorizer.isDefined) {
val topicsToCheck = response.groups.stream()
.filter(group => group.topology != null)
.flatMap(group => group.topology.subtopologies.stream)
.flatMap(subtopology => java.util.stream.Stream.concat(
java.util.stream.Stream.concat(
java.util.stream.Stream.concat(
subtopology.sourceTopics.stream,
subtopology.repartitionSinkTopics.stream),
subtopology.repartitionSourceTopics.stream.map(_.name)),
subtopology.stateChangelogTopics.stream.map(_.name)))
.collect(Collectors.toSet[String])
.asScala
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
topicsToCheck)(identity)
val updatedGroups = response.groups.stream.map { group =>
val hasUnauthorizedTopic = if (group.topology == null) false else
group.topology.subtopologies.stream()
.flatMap(subtopology => java.util.stream.Stream.concat(
java.util.stream.Stream.concat(
java.util.stream.Stream.concat(
subtopology.sourceTopics.stream,
subtopology.repartitionSinkTopics.stream),
subtopology.repartitionSourceTopics.stream.map(_.name)),
subtopology.stateChangelogTopics.stream.map(_.name)))
.anyMatch(topic => !authorizedTopics.contains(topic))
if (hasUnauthorizedTopic) {
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(group.groupId)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("The described group uses topics that the client is not authorized to describe.")
.setMembers(List.empty.asJava)
.setTopology(null)
} else {
group
}
}.collect(Collectors.toList[StreamsGroupDescribeResponseData.DescribedGroup])
response.setGroups(updatedGroups)
}
requestHelper.sendMaybeThrottle(request, new StreamsGroupDescribeResponse(response))
}
}

View File

@ -9883,7 +9883,7 @@ class KafkaApisTest extends Logging {
}
@Test
def testStreamsGroupHeartbeatRequestAuthorizationFailed(): Unit = {
def testStreamsGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -9903,6 +9903,58 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val groupId = "group"
val fooTopicName = "foo"
val barTopicName = "bar"
val zarTopicName = "zar"
val tarTopicName = "tar"
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId(groupId).setTopology(
new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(3)
.setSubtopologies(
util.List.of(new StreamsGroupHeartbeatRequestData.Subtopology().setSubtopologyId("subtopology")
.setSourceTopics(Collections.singletonList(fooTopicName))
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
.setRepartitionSourceTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(zarTopicName)))
.setStateChangelogTopics(Collections.singletonList(new StreamsGroupHeartbeatRequestData.TopicInfo().setName(tarTopicName)))
)
)
)
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupId -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
zarTopicName -> AuthorizationResult.ALLOWED,
tarTopicName -> AuthorizationResult.ALLOWED
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatRequestProtocolDisabled(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
@ -10230,6 +10282,8 @@ class KafkaApisTest extends Logging {
@ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val fooTopicName = "foo"
val barTopicName = "bar"
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
@ -10247,10 +10301,32 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology0")
.setSourceTopics(Collections.singletonList(fooTopicName))
val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology1")
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology2")
.setSourceTopics(Collections.singletonList(fooTopicName))
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
future.complete(List(
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(0))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology0))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(1))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology1))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(2))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology2)))
).asJava)
var authorizedOperationsInt = Int.MinValue;
@ -10262,9 +10338,18 @@ class KafkaApisTest extends Logging {
// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List(
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(0)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(1)),
new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupIds.get(2))
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(0))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology0))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(1))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology1))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(2))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology2)))
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedStreamsGroupDescribeResponseData = new StreamsGroupDescribeResponseData()
.setGroups(describedGroups.asJava)
@ -10353,6 +10438,88 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testStreamsGroupDescribeFilterUnauthorizedTopics(includeAuthorizedOperations: Boolean): Unit = {
val fooTopicName = "foo"
val barTopicName = "bar"
val errorMessage = "The described group uses topics that the client is not authorized to describe."
metadataCache = mock(classOf[KRaftMetadataCache])
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val streamsGroupDescribeRequestData = new StreamsGroupDescribeRequestData()
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
streamsGroupDescribeRequestData.groupIds.addAll(groupIds)
val requestChannelRequest = buildRequest(new StreamsGroupDescribeRequest.Builder(streamsGroupDescribeRequestData, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupIds.get(0) -> AuthorizationResult.ALLOWED,
groupIds.get(1) -> AuthorizationResult.ALLOWED,
groupIds.get(2) -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
val future = new CompletableFuture[util.List[StreamsGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.streamsGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val subtopology0 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology0")
.setSourceTopics(Collections.singletonList(fooTopicName))
val subtopology1 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology1")
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
val subtopology2 = new StreamsGroupDescribeResponseData.Subtopology()
.setSubtopologyId("subtopology2")
.setSourceTopics(Collections.singletonList(fooTopicName))
.setRepartitionSinkTopics(Collections.singletonList(barTopicName))
future.complete(List(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(0))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology0))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(1))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology1))),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupIds.get(2))
.setTopology(new StreamsGroupDescribeResponseData.Topology()
.setSubtopologies(Collections.singletonList(subtopology2)))
).asJava)
val response = verifyNoThrottling[StreamsGroupDescribeResponse](requestChannelRequest)
assertNotNull(response.data)
assertEquals(3, response.data.groups.size)
assertEquals(Errors.NONE.code(), response.data.groups.get(0).errorCode())
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(1).errorCode())
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), response.data.groups.get(2).errorCode())
assertEquals(errorMessage, response.data.groups.get(1).errorMessage())
assertEquals(errorMessage, response.data.groups.get(2).errorMessage())
}
@Test
def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
val fooTopicName = "foo"