From 36f19057e1d57a8548a4548c304799fd176c359f Mon Sep 17 00:00:00 2001 From: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Wed, 26 Feb 2025 13:05:36 -0500 Subject: [PATCH] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe (#18989) This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response. In ConsumerGroupHeartbeat, - if the request has `subscribedTopicNames` set, we directly check the authz in `KafkaApis` and return a topic auth failure in the response if any of the topics is denied. - Otherwise, we check the authz only if a regex refresh is triggered and we do it based on the acl of the consumer that triggered the refresh. If any of the topic is denied, we filter it out from the resolved subscription. In ConsumerGroupDescribe, we check the authz of the coordinator response. If any of the topic in the group is denied, we remove the described info and add a topic auth failure to the described group. (similar to the group auth failure) Reviewers: David Jacot , Lianet Magrans , Rajini Sivaram , Chia-Ping Tsai , TaiJuWu , TengYao Chi --- .../import-control-coordinator-common.xml | 1 + .../import-control-group-coordinator.xml | 3 + .../import-control-share-coordinator.xml | 1 + .../ConsumerGroupDescribeResponse.json | 1 + .../ConsumerGroupHeartbeatResponse.json | 1 + .../scala/kafka/server/BrokerServer.scala | 1 + .../main/scala/kafka/server/KafkaApis.scala | 48 +++- .../kafka/api/AuthorizerIntegrationTest.scala | 91 ++++++- .../kafka/api/EndToEndAuthorizationTest.scala | 15 +- .../unit/kafka/server/KafkaApisTest.scala | 200 ++++++++++++++- docs/ops.html | 2 + .../group/GroupCoordinatorService.java | 11 +- .../group/GroupCoordinatorShard.java | 13 + .../group/GroupMetadataManager.java | 114 +++++++-- .../group/GroupMetadataManagerTest.java | 229 ++++++++++++++++++ .../GroupMetadataManagerTestContext.java | 9 + .../coordinator/RegexResolutionBenchmark.java | 3 + 17 files changed, 700 insertions(+), 43 deletions(-) diff --git a/checkstyle/import-control-coordinator-common.xml b/checkstyle/import-control-coordinator-common.xml index c08955fd422..bafffe80697 100644 --- a/checkstyle/import-control-coordinator-common.xml +++ b/checkstyle/import-control-coordinator-common.xml @@ -58,6 +58,7 @@ + diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index c8a0f49d593..8b6a8d99f5e 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -40,6 +40,8 @@ + + @@ -63,6 +65,7 @@ + diff --git a/checkstyle/import-control-share-coordinator.xml b/checkstyle/import-control-share-coordinator.xml index aaea93d32e6..f430e283225 100644 --- a/checkstyle/import-control-share-coordinator.xml +++ b/checkstyle/import-control-share-coordinator.xml @@ -55,6 +55,7 @@ + diff --git a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json index 14d80e20ce2..95588551922 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json +++ b/clients/src/main/resources/common/message/ConsumerGroupDescribeResponse.json @@ -28,6 +28,7 @@ // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json index 956cfab8262..010fc2cfe93 100644 --- a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json @@ -30,6 +30,7 @@ // - UNSUPPORTED_ASSIGNOR (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) + // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - INVALID_REGULAR_EXPRESSION (version 1+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 5f5befb31dc..7ff50d175b5 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -645,6 +645,7 @@ class BrokerServer( .withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics)) .withGroupConfigManager(groupConfigManager) .withPersister(persister) + .withAuthorizer(authorizer.toJava) .build() } else { GroupCoordinatorAdapter( diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f3421a21bc6..1e3b35ff03c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -72,6 +72,7 @@ import java.time.Duration import java.util import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} +import java.util.stream.Collectors import java.util.{Collections, Optional} import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer @@ -2531,9 +2532,24 @@ class KafkaApis(val requestChannel: RequestChannel, requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { + if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null && + !consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) { + // Check the authorization if the subscribed topic names are provided. + // Clients are not allowed to see topics that are not authorized for Describe. + val subscribedTopicSet = consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala.toSet + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + subscribedTopicSet)(identity) + if (authorizedTopics.size < subscribedTopicSet.size) { + val responseData = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + requestHelper.sendMaybeThrottle(request, new ConsumerGroupHeartbeatResponse(responseData)) + return CompletableFuture.completedFuture[Unit](()) + } + } + groupCoordinator.consumerGroupHeartbeat( request.context, - consumerGroupHeartbeatRequest.data, + consumerGroupHeartbeatRequest.data ).handle[Unit] { (response, exception) => if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception)) @@ -2594,6 +2610,36 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + if (!authorizer.isEmpty) { + val topicsToCheck = response.groups.stream() + .flatMap(group => group.members.stream) + .flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment)) + .flatMap(assignment => assignment.topicPartitions.stream) + .map(topicPartition => topicPartition.topicName) + .collect(Collectors.toSet[String]) + .asScala + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, + topicsToCheck)(identity) + val updatedGroups = response.groups.stream().map { group => + val hasUnauthorizedTopic = group.members.stream() + .flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment)) + .flatMap(assignment => assignment.topicPartitions.stream()) + .anyMatch(tp => !authorizedTopics.contains(tp.topicName)) + + if (hasUnauthorizedTopic) { + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(group.groupId) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage("The group has described topic(s) that the client is not authorized to describe.") + .setMembers(List.empty.asJava) + } else { + group + } + }.collect(Collectors.toList[ConsumerGroupDescribeResponseData.DescribedGroup]) + response.setGroups(updatedGroups) + } + requestHelper.sendMaybeThrottle(request, new ConsumerGroupDescribeResponse(response)) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 3aec9be1f49..d0ee3315b22 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -601,7 +601,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { private def consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() .setGroupId(group) - .setMemberEpoch(0)).build() + .setMemberEpoch(0) + .setSubscribedTopicNames(List(topic).asJava)).build() private def consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder( new ConsumerGroupDescribeRequestData() @@ -2492,11 +2493,12 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @ParameterizedTest @ValueSource(strings = Array("kraft")) - def testConsumerGroupHeartbeatWithReadAcl(quorum: String): Unit = { + def testConsumerGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = { addAndVerifyAcls(groupReadAcl(groupResource), groupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) val request = consumerGroupHeartbeatRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) } @@ -2505,50 +2507,115 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { def testConsumerGroupHeartbeatWithOperationAll(quorum: String): Unit = { val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) val request = consumerGroupHeartbeatRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - def testConsumerGroupHeartbeatWithoutReadAcl(quorum: String): Unit = { + def testConsumerGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = { removeAllClientAcls() val request = consumerGroupHeartbeatRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - def testConsumerGroupDescribeWithDescribeAcl(quorum: String): Unit = { + def testConsumerGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = { + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) + + val request = consumerGroupHeartbeatRequest + + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = { + addAndVerifyAcls(groupReadAcl(groupResource), groupResource) + + val request = consumerGroupHeartbeatRequest + + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + private def createConsumerGroupToDescribe(): Unit = { + createTopicWithBrokerPrincipal(topic) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource) + addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) + consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer") + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group) + val consumer = createConsumer() + consumer.subscribe(Collections.singleton(topic)) + consumer.poll(Duration.ofMillis(500L)) + removeAllClientAcls() + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = { + createConsumerGroupToDescribe() + addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource) + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) val request = consumerGroupDescribeRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) } @ParameterizedTest @ValueSource(strings = Array("kraft")) def testConsumerGroupDescribeWithOperationAll(quorum: String): Unit = { + createConsumerGroupToDescribe() + val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW) addAndVerifyAcls(Set(allowAllOpsAcl), groupResource) + addAndVerifyAcls(Set(allowAllOpsAcl), topicResource) val request = consumerGroupDescribeRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = true) } @ParameterizedTest @ValueSource(strings = Array("kraft")) - def testConsumerGroupDescribeWithoutDescribeAcl(quorum: String): Unit = { - removeAllClientAcls() + def testConsumerGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = { + createConsumerGroupToDescribe() + + addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource) val request = consumerGroupDescribeRequest - val resource = Set[ResourceType](GROUP) + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupDescribeWithoutTopicDescribeAcl(quorum: String): Unit = { + createConsumerGroupToDescribe() + + addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource) + + val request = consumerGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) + sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) + } + + @ParameterizedTest + @ValueSource(strings = Array("kraft")) + def testConsumerGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = { + createConsumerGroupToDescribe() + + val request = consumerGroupDescribeRequest + val resource = Set[ResourceType](GROUP, TOPIC) sendRequestAndVerifyResponseError(request, resource, isAuthorized = false) } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index fe9336a04f4..aa6d208ef8b 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException import org.apache.kafka.metadata.authorizer.StandardAuthorizer import kafka.utils._ import org.apache.kafka.clients.admin.Admin -import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl.AclOperation._ @@ -430,7 +430,18 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // Verify that records are consumed if all topics are authorized consumer.subscribe(List(topic).asJava) - consumeRecordsIgnoreOneAuthorizationException(consumer) + if (groupProtocol.equals(GroupProtocol.CLASSIC)) { + consumeRecordsIgnoreOneAuthorizationException(consumer) + } else { + TestUtils.waitUntilTrue(() => { + try { + consumeRecords(consumer, numRecords, 0, topic) + true + } catch { + case _: TopicAuthorizationException => false + } + }, "Consumer didn't manage to consume the records within timeout.") + } } private def noConsumeWithoutDescribeAclSetup(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index c31a75ad8da..85cb53db00f 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -39,7 +39,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartiti import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource, AlterConfigsResourceCollection => LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, AlterableConfigCollection => LAlterableConfigCollection} import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse} import org.apache.kafka.common.message.ApiMessageType.ListenerType -import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup, TopicPartitions} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic} @@ -9852,7 +9852,7 @@ class KafkaApisTest extends Logging { } @Test - def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = { + def testConsumerGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = { metadataCache = mock(classOf[KRaftMetadataCache]) val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") @@ -9872,6 +9872,46 @@ class KafkaApisTest extends Logging { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = { + metadataCache = mock(classOf[KRaftMetadataCache]) + val groupId = "group" + val fooTopicName = "foo" + val barTopicName = "bar" + val zarTopicName = "zar" + + val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setSubscribedTopicNames(List(fooTopicName, barTopicName, zarTopicName).asJava) + + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + val acls = Map( + groupId -> AuthorizationResult.ALLOWED, + fooTopicName -> AuthorizationResult.ALLOWED, + barTopicName -> AuthorizationResult.DENIED, + ) + when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] + )).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => + acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava + } + + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + featureVersions = Seq(GroupVersion.GV_1) + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode) + } + @Test def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = { val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group") @@ -10043,6 +10083,8 @@ class KafkaApisTest extends Logging { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" metadataCache = mock(classOf[KRaftMetadataCache]) val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava @@ -10061,10 +10103,44 @@ class KafkaApisTest extends Logging { ) kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + val member0 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member0") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + + val member1 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member1") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName), + new TopicPartitions().setTopicName(barTopicName)).asJava)) + + val member2 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member2") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(barTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + future.complete(List( - new DescribedGroup().setGroupId(groupIds.get(0)), - new DescribedGroup().setGroupId(groupIds.get(1)), - new DescribedGroup().setGroupId(groupIds.get(2)) + new DescribedGroup() + .setGroupId(groupIds.get(0)) + .setMembers(List(member0).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(1)) + .setMembers(List(member0, member1).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(2)) + .setMembers(List(member2).asJava) ).asJava) var authorizedOperationsInt = Int.MinValue @@ -10076,9 +10152,15 @@ class KafkaApisTest extends Logging { // Can't reuse the above list here because we would not test the implementation in KafkaApis then val describedGroups = List( - new DescribedGroup().setGroupId(groupIds.get(0)), - new DescribedGroup().setGroupId(groupIds.get(1)), - new DescribedGroup().setGroupId(groupIds.get(2)) + new DescribedGroup() + .setGroupId(groupIds.get(0)) + .setMembers(List(member0).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(1)) + .setMembers(List(member0, member1).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(2)) + .setMembers(List(member2).asJava) ).map(group => group.setAuthorizedOperations(authorizedOperationsInt)) val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() .setGroups(describedGroups.asJava) @@ -10294,6 +10376,108 @@ class KafkaApisTest extends Logging { assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode) } + @Test + def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = { + val fooTopicName = "foo" + val barTopicName = "bar" + val errorMessage = "The group has described topic(s) that the client is not authorized to describe." + + metadataCache = mock(classOf[KRaftMetadataCache]) + + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + .setGroupIds(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + val acls = Map( + groupIds.get(0) -> AuthorizationResult.ALLOWED, + groupIds.get(1) -> AuthorizationResult.ALLOWED, + groupIds.get(2) -> AuthorizationResult.ALLOWED, + fooTopicName -> AuthorizationResult.ALLOWED, + barTopicName -> AuthorizationResult.DENIED, + ) + when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] + )).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => + acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava + } + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + kafkaApis = createKafkaApis( + authorizer = Some(authorizer), + featureVersions = Seq(GroupVersion.GV_1) + ) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val member0 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member0") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + + val member1 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member1") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName), + new TopicPartitions().setTopicName(barTopicName)).asJava)) + + val member2 = new ConsumerGroupDescribeResponseData.Member() + .setMemberId("member2") + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(barTopicName)).asJava)) + .setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions(List( + new TopicPartitions().setTopicName(fooTopicName)).asJava)) + + future.complete(List( + new DescribedGroup() + .setGroupId(groupIds.get(0)) + .setMembers(List(member0).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(1)) + .setMembers(List(member0, member1).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(2)) + .setMembers(List(member2).asJava) + ).asJava) + + val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() + .setGroups(List( + new DescribedGroup() + .setGroupId(groupIds.get(0)) + .setMembers(List(member0).asJava), + new DescribedGroup() + .setGroupId(groupIds.get(1)) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage(errorMessage), + new DescribedGroup() + .setGroupId(groupIds.get(2)) + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage(errorMessage) + ).asJava) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + assertEquals(expectedConsumerGroupDescribeResponseData, response.data) + } + @Test def testGetTelemetrySubscriptions(): Unit = { val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder( diff --git a/docs/ops.html b/docs/ops.html index 3177b49d487..be116d9f292 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -127,6 +127,8 @@ topic1 0 854144 855809 1665 consu topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 + Note that if the consumer group uses the consumer protocol, the admin client needs DESCRIBE access to all the topics used in the group (topics the members are subscribed to). In contrast, the classic protocol does not require all topics DESCRIBE authorization. + There are a number of additional "describe" options that can be used to provide more detailed information about a consumer group:
  • --members: This option provides the list of all active members in the consumer group. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 0e7004ba40c..21b9cddfea6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -84,6 +84,7 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult; @@ -135,6 +136,7 @@ public class GroupCoordinatorService implements GroupCoordinator { private GroupCoordinatorMetrics groupCoordinatorMetrics; private GroupConfigManager groupConfigManager; private Persister persister; + private Optional authorizer; public Builder( int nodeId, @@ -184,6 +186,11 @@ public class GroupCoordinatorService implements GroupCoordinator { return this; } + public Builder withAuthorizer(Optional authorizer) { + this.authorizer = authorizer; + return this; + } + public GroupCoordinatorService build() { requireNonNull(config, new IllegalArgumentException("Config must be set.")); requireNonNull(writer, new IllegalArgumentException("Writer must be set.")); @@ -194,12 +201,14 @@ public class GroupCoordinatorService implements GroupCoordinator { requireNonNull(groupCoordinatorMetrics, new IllegalArgumentException("GroupCoordinatorMetrics must be set.")); requireNonNull(groupConfigManager, new IllegalArgumentException("GroupConfigManager must be set.")); requireNonNull(persister, new IllegalArgumentException("Persister must be set.")); + requireNonNull(authorizer, new IllegalArgumentException("Authorizer must be set.")); String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); CoordinatorShardBuilderSupplier supplier = () -> - new GroupCoordinatorShard.Builder(config, groupConfigManager); + new GroupCoordinatorShard.Builder(config, groupConfigManager) + .withAuthorizer(authorizer); CoordinatorEventProcessor processor = new MultiThreadedEventProcessor( logContext, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 1cea887a000..bbe2f1431ee 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -115,6 +115,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.timeline.SnapshotRegistry; @@ -126,6 +127,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -153,6 +155,7 @@ public class GroupCoordinatorShard implements CoordinatorShard executor; private CoordinatorMetrics coordinatorMetrics; private TopicPartition topicPartition; + private Optional authorizer; public Builder( GroupCoordinatorConfig config, @@ -216,6 +219,13 @@ public class GroupCoordinatorShard implements CoordinatorShard withAuthorizer( + Optional authorizer + ) { + this.authorizer = authorizer; + return this; + } + @SuppressWarnings("NPathComplexity") @Override public GroupCoordinatorShard build() { @@ -236,6 +246,8 @@ public class GroupCoordinatorShard implements CoordinatorShard authorizer = null; Builder withLogContext(LogContext logContext) { this.logContext = logContext; @@ -312,11 +321,17 @@ public class GroupMetadataManager { return this; } + Builder withAuthorizer(Optional authorizer) { + this.authorizer = authorizer; + return this; + } + GroupMetadataManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (time == null) time = Time.SYSTEM; + if (authorizer == null) authorizer = Optional.empty(); if (timer == null) throw new IllegalArgumentException("Timer must be set."); @@ -341,7 +356,8 @@ public class GroupMetadataManager { metadataImage, config, groupConfigManager, - shareGroupAssignor + shareGroupAssignor, + authorizer ); } } @@ -447,6 +463,11 @@ public class GroupMetadataManager { */ private final ShareGroupPartitionAssignor shareGroupAssignor; + /** + * The authorizer to validate the regex subscription topics. + */ + private final Optional authorizer; + private GroupMetadataManager( SnapshotRegistry snapshotRegistry, LogContext logContext, @@ -457,7 +478,8 @@ public class GroupMetadataManager { MetadataImage metadataImage, GroupCoordinatorConfig config, GroupConfigManager groupConfigManager, - ShareGroupPartitionAssignor shareGroupAssignor + ShareGroupPartitionAssignor shareGroupAssignor, + Optional authorizer ) { this.logContext = logContext; this.log = logContext.logger(GroupMetadataManager.class); @@ -478,6 +500,7 @@ public class GroupMetadataManager { this.groupConfigManager = groupConfigManager; this.shareGroupAssignor = shareGroupAssignor; this.streamsGroupSessionTimeoutMs = 45000; + this.authorizer = authorizer; } /** @@ -1791,14 +1814,13 @@ public class GroupMetadataManager { * is larger than the current target assignment epoch. * 3) The member's assignment is reconciled with the target assignment. * + * @param context The request context. * @param groupId The group id from the request. * @param memberId The member id from the request. * @param memberEpoch The member epoch from the request. * @param instanceId The instance id from the request or null. * @param rackId The rack id from the request or null. * @param rebalanceTimeoutMs The rebalance timeout from the request or -1. - * @param clientId The client id. - * @param clientHost The client host. * @param subscribedTopicNames The list of subscribed topic names from the request * or null. * @param subscribedTopicRegex The regular expression based subscription from the request @@ -1810,14 +1832,13 @@ public class GroupMetadataManager { * a list of records to update the state machine. */ private CoordinatorResult consumerGroupHeartbeat( + RequestContext context, String groupId, String memberId, int memberEpoch, String instanceId, String rackId, int rebalanceTimeoutMs, - String clientId, - String clientHost, List subscribedTopicNames, String subscribedTopicRegex, String assignorName, @@ -1868,8 +1889,8 @@ public class GroupMetadataManager { .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName)) .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)) - .setClientId(clientId) - .setClientHost(clientHost) + .setClientId(context.clientId()) + .setClientHost(context.clientAddress.toString()) .setClassicMemberMetadata(null) .build(); @@ -1885,6 +1906,7 @@ public class GroupMetadataManager { ); bumpGroupEpoch |= maybeUpdateRegularExpressions( + context, group, member, updatedMember, @@ -2472,6 +2494,7 @@ public class GroupMetadataManager { * group. We align the refreshment of the regular expression in order to have * them trigger only one rebalance per update. * + * @param context The request context. * @param group The consumer group. * @param member The old member. * @param updatedMember The new member. @@ -2479,6 +2502,7 @@ public class GroupMetadataManager { * @return Whether a rebalance must be triggered. */ private boolean maybeUpdateRegularExpressions( + RequestContext context, ConsumerGroup group, ConsumerGroupMember member, ConsumerGroupMember updatedMember, @@ -2564,8 +2588,8 @@ public class GroupMetadataManager { Set regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet()); executor.schedule( key, - () -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes), - (result, exception) -> handleRegularExpressionsResult(groupId, result, exception) + () -> refreshRegularExpressions(context, groupId, log, time, metadataImage, authorizer, regexes), + (result, exception) -> handleRegularExpressionsResult(groupId, memberId, result, exception) ); } @@ -2577,20 +2601,24 @@ public class GroupMetadataManager { * as an asynchronous task in the executor. Hence, it should not access any state from * the manager. * - * @param groupId The group id. - * @param log The log instance. - * @param time The time instance. - * @param image The metadata image to use for listing the topics. - * @param regexes The list of regular expressions that must be resolved. + * @param context The request context. + * @param groupId The group id. + * @param log The log instance. + * @param time The time instance. + * @param image The metadata image to use for listing the topics. + * @param authorizer The authorizer. + * @param regexes The list of regular expressions that must be resolved. * @return The list of resolved regular expressions. * * public for benchmarks. */ public static Map refreshRegularExpressions( + RequestContext context, String groupId, Logger log, Time time, MetadataImage image, + Optional authorizer, Set regexes ) { long startTimeMs = time.milliseconds(); @@ -2619,6 +2647,12 @@ public class GroupMetadataManager { } } + filterTopicDescribeAuthorizedTopics( + context, + authorizer, + resolvedRegexes + ); + long version = image.provenance().lastContainedOffset(); Map result = new HashMap<>(resolvedRegexes.size()); for (Map.Entry> resolvedRegex : resolvedRegexes.entrySet()) { @@ -2635,15 +2669,58 @@ public class GroupMetadataManager { return result; } + /** + * This method filters the topics in the resolved regexes + * that the member is authorized to describe. + * + * @param context The request context. + * @param authorizer The authorizer. + * @param resolvedRegexes The map of the regex pattern and its set of matched topics. + */ + private static void filterTopicDescribeAuthorizedTopics( + RequestContext context, + Optional authorizer, + Map> resolvedRegexes + ) { + if (authorizer.isEmpty()) return; + + Map topicNameCount = new HashMap<>(); + resolvedRegexes.values().forEach(topicNames -> + topicNames.forEach(topicName -> + topicNameCount.compute(topicName, Utils::incValue) + ) + ); + + List actions = topicNameCount.entrySet().stream().map(entry -> { + ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL); + return new Action(DESCRIBE, resource, entry.getValue(), true, false); + }).collect(Collectors.toList()); + + List authorizationResults = authorizer.get().authorize(context, actions); + Set deniedTopics = new HashSet<>(); + IntStream.range(0, actions.size()).forEach(i -> { + if (authorizationResults.get(i) == AuthorizationResult.DENIED) { + String deniedTopic = actions.get(i).resourcePattern().name(); + deniedTopics.add(deniedTopic); + } + }); + + resolvedRegexes.forEach((__, topicNames) -> topicNames.removeAll(deniedTopics)); + } + + /** * Handle the result of the asynchronous tasks which resolves the regular expressions. * + * @param groupId The group id. + * @param memberId The member id. * @param resolvedRegularExpressions The resolved regular expressions. * @param exception The exception if the resolution failed. * @return A CoordinatorResult containing the records to mutate the group state. */ private CoordinatorResult handleRegularExpressionsResult( String groupId, + String memberId, Map resolvedRegularExpressions, Throwable exception ) { @@ -2654,8 +2731,8 @@ public class GroupMetadataManager { } if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Received updated regular expressions: {}.", - groupId, resolvedRegularExpressions); + log.debug("[GroupId {}] Received updated regular expressions based on the context of member {}: {}.", + groupId, memberId, resolvedRegularExpressions); } List records = new ArrayList<>(); @@ -3803,14 +3880,13 @@ public class GroupMetadataManager { } else { // Otherwise, it is a regular heartbeat. return consumerGroupHeartbeat( + context, request.groupId(), request.memberId(), request.memberEpoch(), request.instanceId(), request.rackId(), request.rebalanceTimeoutMs(), - context.clientId(), - context.clientAddress.toString(), request.subscribedTopicNames(), request.subscribedTopicRegex(), request.serverAssignor(), diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 7d69b6e221c..8ec5ec2eb4e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -107,6 +107,9 @@ import org.apache.kafka.coordinator.group.streams.TasksTuple; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.server.authorizer.Action; +import org.apache.kafka.server.authorizer.AuthorizationResult; +import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters; import org.apache.kafka.server.share.persister.GroupTopicPartitionData; import org.apache.kafka.server.share.persister.PartitionFactory; @@ -16712,6 +16715,232 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + Authorizer authorizer = mock(Authorizer.class); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L)) + .withAuthorizer(authorizer) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withMember(new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build()) + .withSubscriptionMetadata(Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( + Set.of(fooTopicName), 0L, 0L)) + .withAssignmentEpoch(10)) + .build(); + + // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS + context.time.sleep(10001L); + + Map acls = new HashMap<>(); + acls.put(fooTopicName, AuthorizationResult.ALLOWED); + acls.put(barTopicName, AuthorizationResult.DENIED); + when(authorizer.authorize(any(), any())).thenAnswer(invocation -> { + List actions = invocation.getArgument(1, List.class); + return actions.stream() + .map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED)) + .collect(Collectors.toList()); + }); + + // Member 2 heartbeats with a different regular expression. + CoordinatorResult result1 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList()), + ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion() + ); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(Collections.singletonList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(3, 4, 5))))), + result1.response() + ); + + ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*|bar*") + .setServerAssignorName("range") + .build(); + + assertRecordsEquals( + List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + ), + result1.records() + ); + + // Execute pending tasks. + assertEquals( + List.of( + new MockCoordinatorExecutor.ExecutorResult<>( + groupId + "-regex", + new CoordinatorResult<>(List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo*|bar*", + new ResolvedRegularExpression( + Set.of("foo"), + 12345L, + context.time.milliseconds() + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) + )) + ) + ), + context.processTasks() + ); + + // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS + context.time.sleep(10001L); + + // Access to the bar topic is granted. + acls.put(barTopicName, AuthorizationResult.ALLOWED); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of( + memberId1, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + )), + memberId2, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5) + )) + ))); + + // Member 2 heartbeats again with a new regex. + CoordinatorResult result2 = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo|bar*") + .setServerAssignor("range") + .setTopicPartitions(Collections.emptyList()), + ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion() + ); + + expectedMember2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo|bar*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5))) + .build(); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId2) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(3, 4, 5))))), + result2.response() + ); + + assertRecordsEquals( + List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ), + result2.records() + ); + + // A regex refresh is triggered and the bar topic is included. + assertRecordsEquals( + List.of( + // The resolution of the new regex is persisted. + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord( + groupId, + "foo|bar*", + new ResolvedRegularExpression( + Set.of("foo", "bar"), + 12345L, + context.time.milliseconds() + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( + groupId, + Map.of( + fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), + barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) + ) + ), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12) + ), + context.processTasks().get(0).result.records() + ); + } + @Test public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() { String groupId = "fooup"; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 1b12af3f7b9..4f245296d47 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -112,6 +112,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -123,6 +124,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -468,6 +470,7 @@ public class GroupMetadataManagerTestContext { private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share"); private final List shareGroupBuilders = new ArrayList<>(); private final Map config = new HashMap<>(); + private Optional authorizer = Optional.empty(); public Builder withConfig(String key, Object value) { config.put(key, value); @@ -499,6 +502,11 @@ public class GroupMetadataManagerTestContext { return this; } + public Builder withAuthorizer(Authorizer authorizer) { + this.authorizer = Optional.of(authorizer); + return this; + } + public GroupMetadataManagerTestContext build() { if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (groupConfigManager == null) groupConfigManager = createConfigManager(); @@ -528,6 +536,7 @@ public class GroupMetadataManagerTestContext { .withGroupCoordinatorMetricsShard(metrics) .withShareGroupAssignor(shareGroupAssignor) .withGroupConfigManager(groupConfigManager) + .withAuthorizer(authorizer) .build(), groupConfigManager ); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java index 08db52e4e60..35ee42836c8 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/RegexResolutionBenchmark.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -118,10 +119,12 @@ public class RegexResolutionBenchmark { @OutputTimeUnit(TimeUnit.MILLISECONDS) public void run() { GroupMetadataManager.refreshRegularExpressions( + null, GROUP_ID, LOG, TIME, image, + Optional.empty(), regexes ); }