KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe (#18989)

This patch filters out the topic describe unauthorized topics from the
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.

In ConsumerGroupHeartbeat, 
- if the request has `subscribedTopicNames` set, we directly check the
authz in `KafkaApis` and return a topic auth failure in the response if
any of the topics is denied.
- Otherwise, we check the authz only if a regex refresh is triggered and
we do it based on the acl of the consumer that triggered the refresh. If
any of the topic is denied, we filter it out from the resolved
subscription.

In ConsumerGroupDescribe, we check the authz of the coordinator
response. If any of the topic in the group is denied, we remove the
described info and add a topic auth failure to the described group.
(similar to the group auth failure)

Reviewers: David Jacot <djacot@confluent.io>, Lianet Magrans
<lmagrans@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>,
TengYao Chi <kitingiao@gmail.com>
This commit is contained in:
Dongnuo Lyu 2025-02-26 13:05:36 -05:00 committed by GitHub
parent d17fbe1af6
commit 36f19057e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 700 additions and 43 deletions

View File

@ -58,6 +58,7 @@
<allow pkg="org.apache.kafka.coordinator.common" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />

View File

@ -40,6 +40,8 @@
<!-- anyone can use public classes -->
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.acl" />
<allow pkg="org.apache.kafka.common.resource" />
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.utils" />
@ -63,6 +65,7 @@
<allow pkg="org.apache.kafka.coordinator.group" />
<allow pkg="org.apache.kafka.deferred" />
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.server.authorizer"/>
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.share.persister"/>

View File

@ -55,6 +55,7 @@
<allow pkg="org.apache.kafka.coordinator.share.metrics" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server.authorizer" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.share" />

View File

@ -28,6 +28,7 @@
// - INVALID_REQUEST (version 0+)
// - INVALID_GROUP_ID (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

View File

@ -30,6 +30,7 @@
// - UNSUPPORTED_ASSIGNOR (version 0+)
// - UNRELEASED_INSTANCE_ID (version 0+)
// - GROUP_MAX_SIZE_REACHED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - INVALID_REGULAR_EXPRESSION (version 1+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -645,6 +645,7 @@ class BrokerServer(
.withGroupCoordinatorMetrics(new GroupCoordinatorMetrics(KafkaYammerMetrics.defaultRegistry, metrics))
.withGroupConfigManager(groupConfigManager)
.withPersister(persister)
.withAuthorizer(authorizer.toJava)
.build()
} else {
GroupCoordinatorAdapter(

View File

@ -72,6 +72,7 @@ import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.stream.Collectors
import java.util.{Collections, Optional}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
@ -2531,9 +2532,24 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
if (consumerGroupHeartbeatRequest.data.subscribedTopicNames != null &&
!consumerGroupHeartbeatRequest.data.subscribedTopicNames.isEmpty) {
// Check the authorization if the subscribed topic names are provided.
// Clients are not allowed to see topics that are not authorized for Describe.
val subscribedTopicSet = consumerGroupHeartbeatRequest.data.subscribedTopicNames.asScala.toSet
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
subscribedTopicSet)(identity)
if (authorizedTopics.size < subscribedTopicSet.size) {
val responseData = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
requestHelper.sendMaybeThrottle(request, new ConsumerGroupHeartbeatResponse(responseData))
return CompletableFuture.completedFuture[Unit](())
}
}
groupCoordinator.consumerGroupHeartbeat(
request.context,
consumerGroupHeartbeatRequest.data,
consumerGroupHeartbeatRequest.data
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(exception))
@ -2594,6 +2610,36 @@ class KafkaApis(val requestChannel: RequestChannel,
response.groups.addAll(results)
}
// Clients are not allowed to see topics that are not authorized for Describe.
if (!authorizer.isEmpty) {
val topicsToCheck = response.groups.stream()
.flatMap(group => group.members.stream)
.flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment))
.flatMap(assignment => assignment.topicPartitions.stream)
.map(topicPartition => topicPartition.topicName)
.collect(Collectors.toSet[String])
.asScala
val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
topicsToCheck)(identity)
val updatedGroups = response.groups.stream().map { group =>
val hasUnauthorizedTopic = group.members.stream()
.flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment))
.flatMap(assignment => assignment.topicPartitions.stream())
.anyMatch(tp => !authorizedTopics.contains(tp.topicName))
if (hasUnauthorizedTopic) {
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(group.groupId)
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("The group has described topic(s) that the client is not authorized to describe.")
.setMembers(List.empty.asJava)
} else {
group
}
}.collect(Collectors.toList[ConsumerGroupDescribeResponseData.DescribedGroup])
response.setGroups(updatedGroups)
}
requestHelper.sendMaybeThrottle(request, new ConsumerGroupDescribeResponse(response))
}
}

View File

@ -601,7 +601,8 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
private def consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(group)
.setMemberEpoch(0)).build()
.setMemberEpoch(0)
.setSubscribedTopicNames(List(topic).asJava)).build()
private def consumerGroupDescribeRequest = new ConsumerGroupDescribeRequest.Builder(
new ConsumerGroupDescribeRequestData()
@ -2492,11 +2493,12 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithReadAcl(quorum: String): Unit = {
def testConsumerGroupHeartbeatWithGroupReadAndTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ -2505,50 +2507,115 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
def testConsumerGroupHeartbeatWithOperationAll(quorum: String): Unit = {
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithoutReadAcl(quorum: String): Unit = {
def testConsumerGroupHeartbeatWithoutGroupReadOrTopicDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithDescribeAcl(quorum: String): Unit = {
def testConsumerGroupHeartbeatWithoutGroupReadAcl(quorum: String): Unit = {
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupHeartbeatWithoutTopicDescribeAcl(quorum: String): Unit = {
addAndVerifyAcls(groupReadAcl(groupResource), groupResource)
val request = consumerGroupHeartbeatRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
private def createConsumerGroupToDescribe(): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, group)
val consumer = createConsumer()
consumer.subscribe(Collections.singleton(topic))
consumer.poll(Duration.ofMillis(500L))
removeAllClientAcls()
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithGroupDescribeAndTopicDescribeAcl(quorum: String): Unit = {
createConsumerGroupToDescribe()
addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithOperationAll(quorum: String): Unit = {
createConsumerGroupToDescribe()
val allowAllOpsAcl = new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, ALL, ALLOW)
addAndVerifyAcls(Set(allowAllOpsAcl), groupResource)
addAndVerifyAcls(Set(allowAllOpsAcl), topicResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = true)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithoutDescribeAcl(quorum: String): Unit = {
removeAllClientAcls()
def testConsumerGroupDescribeWithoutGroupDescribeAcl(quorum: String): Unit = {
createConsumerGroupToDescribe()
addAndVerifyAcls(topicDescribeAcl(topicResource), topicResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP)
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithoutTopicDescribeAcl(quorum: String): Unit = {
createConsumerGroupToDescribe()
addAndVerifyAcls(groupDescribeAcl(groupResource), groupResource)
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConsumerGroupDescribeWithoutGroupDescribeOrTopicDescribeAcl(quorum: String): Unit = {
createConsumerGroupToDescribe()
val request = consumerGroupDescribeRequest
val resource = Set[ResourceType](GROUP, TOPIC)
sendRequestAndVerifyResponseError(request, resource, isAuthorized = false)
}

View File

@ -24,7 +24,7 @@ import java.util.concurrent.ExecutionException
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import kafka.utils._
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
@ -430,7 +430,18 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// Verify that records are consumed if all topics are authorized
consumer.subscribe(List(topic).asJava)
consumeRecordsIgnoreOneAuthorizationException(consumer)
if (groupProtocol.equals(GroupProtocol.CLASSIC)) {
consumeRecordsIgnoreOneAuthorizationException(consumer)
} else {
TestUtils.waitUntilTrue(() => {
try {
consumeRecords(consumer, numRecords, 0, topic)
true
} catch {
case _: TopicAuthorizationException => false
}
}, "Consumer didn't manage to consume the records within timeout.")
}
}
private def noConsumeWithoutDescribeAclSetup(): Unit = {

View File

@ -39,7 +39,7 @@ import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartiti
import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => LAlterConfigsResource, AlterConfigsResourceCollection => LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, AlterableConfigCollection => LAlterableConfigCollection}
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => LAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{DescribedGroup, TopicPartitions}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData.{DescribeShareGroupOffsetsRequestGroup, DescribeShareGroupOffsetsRequestTopic}
@ -9852,7 +9852,7 @@ class KafkaApisTest extends Logging {
}
@Test
def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
def testConsumerGroupHeartbeatRequestGroupAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group")
@ -9872,6 +9872,46 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testConsumerGroupHeartbeatRequestTopicAuthorizationFailed(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val groupId = "group"
val fooTopicName = "foo"
val barTopicName = "bar"
val zarTopicName = "zar"
val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setSubscribedTopicNames(List(fooTopicName, barTopicName, zarTopicName).asJava)
val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupId -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, response.data.errorCode)
}
@Test
def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
@ -10043,6 +10083,8 @@ class KafkaApisTest extends Logging {
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {
val fooTopicName = "foo"
val barTopicName = "bar"
metadataCache = mock(classOf[KRaftMetadataCache])
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
@ -10061,10 +10103,44 @@ class KafkaApisTest extends Logging {
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val member0 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member0")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
val member1 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName),
new TopicPartitions().setTopicName(barTopicName)).asJava))
val member2 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member2")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(barTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
future.complete(List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
new DescribedGroup()
.setGroupId(groupIds.get(0))
.setMembers(List(member0).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(1))
.setMembers(List(member0, member1).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(2))
.setMembers(List(member2).asJava)
).asJava)
var authorizedOperationsInt = Int.MinValue
@ -10076,9 +10152,15 @@ class KafkaApisTest extends Logging {
// Can't reuse the above list here because we would not test the implementation in KafkaApis then
val describedGroups = List(
new DescribedGroup().setGroupId(groupIds.get(0)),
new DescribedGroup().setGroupId(groupIds.get(1)),
new DescribedGroup().setGroupId(groupIds.get(2))
new DescribedGroup()
.setGroupId(groupIds.get(0))
.setMembers(List(member0).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(1))
.setMembers(List(member0, member1).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(2))
.setMembers(List(member2).asJava)
).map(group => group.setAuthorizedOperations(authorizedOperationsInt))
val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(describedGroups.asJava)
@ -10294,6 +10376,108 @@ class KafkaApisTest extends Logging {
assertEquals(Errors.FENCED_MEMBER_EPOCH.code, response.data.groups.get(0).errorCode)
}
@Test
def testConsumerGroupDescribeFilterUnauthorizedTopics(): Unit = {
val fooTopicName = "foo"
val barTopicName = "bar"
val errorMessage = "The group has described topic(s) that the client is not authorized to describe."
metadataCache = mock(classOf[KRaftMetadataCache])
val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData()
.setGroupIds(groupIds)
val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
val acls = Map(
groupIds.get(0) -> AuthorizationResult.ALLOWED,
groupIds.get(1) -> AuthorizationResult.ALLOWED,
groupIds.get(2) -> AuthorizationResult.ALLOWED,
fooTopicName -> AuthorizationResult.ALLOWED,
barTopicName -> AuthorizationResult.DENIED,
)
when(authorizer.authorize(
any[RequestContext],
any[util.List[Action]]
)).thenAnswer { invocation =>
val actions = invocation.getArgument(1, classOf[util.List[Action]])
actions.asScala.map { action =>
acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
}.asJava
}
val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
when(groupCoordinator.consumerGroupDescribe(
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
featureVersions = Seq(GroupVersion.GV_1)
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val member0 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member0")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
val member1 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName),
new TopicPartitions().setTopicName(barTopicName)).asJava))
val member2 = new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member2")
.setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(barTopicName)).asJava))
.setTargetAssignment(new ConsumerGroupDescribeResponseData.Assignment()
.setTopicPartitions(List(
new TopicPartitions().setTopicName(fooTopicName)).asJava))
future.complete(List(
new DescribedGroup()
.setGroupId(groupIds.get(0))
.setMembers(List(member0).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(1))
.setMembers(List(member0, member1).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(2))
.setMembers(List(member2).asJava)
).asJava)
val expectedConsumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData()
.setGroups(List(
new DescribedGroup()
.setGroupId(groupIds.get(0))
.setMembers(List(member0).asJava),
new DescribedGroup()
.setGroupId(groupIds.get(1))
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(errorMessage),
new DescribedGroup()
.setGroupId(groupIds.get(2))
.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage(errorMessage)
).asJava)
val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
assertEquals(expectedConsumerGroupDescribeResponseData, response.data)
}
@Test
def testGetTelemetrySubscriptions(): Unit = {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(

View File

@ -127,6 +127,8 @@ topic1 0 854144 855809 1665 consu
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4</code></pre>
Note that if the consumer group uses the consumer protocol, the admin client needs DESCRIBE access to all the topics used in the group (topics the members are subscribed to). In contrast, the classic protocol does not require all topics DESCRIBE authorization.
There are a number of additional "describe" options that can be used to provide more detailed information about a consumer group:
<ul>
<li>--members: This option provides the list of all active members in the consumer group.

View File

@ -84,6 +84,7 @@ import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
@ -135,6 +136,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
private GroupCoordinatorMetrics groupCoordinatorMetrics;
private GroupConfigManager groupConfigManager;
private Persister persister;
private Optional<Authorizer> authorizer;
public Builder(
int nodeId,
@ -184,6 +186,11 @@ public class GroupCoordinatorService implements GroupCoordinator {
return this;
}
public Builder withAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
return this;
}
public GroupCoordinatorService build() {
requireNonNull(config, new IllegalArgumentException("Config must be set."));
requireNonNull(writer, new IllegalArgumentException("Writer must be set."));
@ -194,12 +201,14 @@ public class GroupCoordinatorService implements GroupCoordinator {
requireNonNull(groupCoordinatorMetrics, new IllegalArgumentException("GroupCoordinatorMetrics must be set."));
requireNonNull(groupConfigManager, new IllegalArgumentException("GroupConfigManager must be set."));
requireNonNull(persister, new IllegalArgumentException("Persister must be set."));
requireNonNull(authorizer, new IllegalArgumentException("Authorizer must be set."));
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
CoordinatorShardBuilderSupplier<GroupCoordinatorShard, CoordinatorRecord> supplier = () ->
new GroupCoordinatorShard.Builder(config, groupConfigManager);
new GroupCoordinatorShard.Builder(config, groupConfigManager)
.withAuthorizer(authorizer);
CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,

View File

@ -115,6 +115,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -126,6 +127,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@ -153,6 +155,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private CoordinatorExecutor<CoordinatorRecord> executor;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;
private Optional<Authorizer> authorizer;
public Builder(
GroupCoordinatorConfig config,
@ -216,6 +219,13 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return this;
}
public CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> withAuthorizer(
Optional<Authorizer> authorizer
) {
this.authorizer = authorizer;
return this;
}
@SuppressWarnings("NPathComplexity")
@Override
public GroupCoordinatorShard build() {
@ -236,6 +246,8 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
throw new IllegalArgumentException("TopicPartition must be set.");
if (groupConfigManager == null)
throw new IllegalArgumentException("GroupConfigManager must be set.");
if (authorizer == null)
throw new IllegalArgumentException("Authorizer must be set.");
GroupCoordinatorMetricsShard metricsShard = ((GroupCoordinatorMetrics) coordinatorMetrics)
.newMetricsShard(snapshotRegistry, topicPartition);
@ -249,6 +261,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.withConfig(config)
.withGroupConfigManager(groupConfigManager)
.withGroupCoordinatorMetricsShard(metricsShard)
.withAuthorizer(authorizer)
.build();
OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder()

View File

@ -63,6 +63,7 @@ import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
@ -144,6 +145,9 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionFactory;
@ -174,8 +178,10 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
@ -184,6 +190,8 @@ import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CON
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CLASSIC;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.SHARE;
@ -261,6 +269,7 @@ public class GroupMetadataManager {
private MetadataImage metadataImage = null;
private ShareGroupPartitionAssignor shareGroupAssignor = null;
private GroupCoordinatorMetricsShard metrics;
private Optional<Authorizer> authorizer = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@ -312,11 +321,17 @@ public class GroupMetadataManager {
return this;
}
Builder withAuthorizer(Optional<Authorizer> authorizer) {
this.authorizer = authorizer;
return this;
}
GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;
if (authorizer == null) authorizer = Optional.empty();
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
@ -341,7 +356,8 @@ public class GroupMetadataManager {
metadataImage,
config,
groupConfigManager,
shareGroupAssignor
shareGroupAssignor,
authorizer
);
}
}
@ -447,6 +463,11 @@ public class GroupMetadataManager {
*/
private final ShareGroupPartitionAssignor shareGroupAssignor;
/**
* The authorizer to validate the regex subscription topics.
*/
private final Optional<Authorizer> authorizer;
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
@ -457,7 +478,8 @@ public class GroupMetadataManager {
MetadataImage metadataImage,
GroupCoordinatorConfig config,
GroupConfigManager groupConfigManager,
ShareGroupPartitionAssignor shareGroupAssignor
ShareGroupPartitionAssignor shareGroupAssignor,
Optional<Authorizer> authorizer
) {
this.logContext = logContext;
this.log = logContext.logger(GroupMetadataManager.class);
@ -478,6 +500,7 @@ public class GroupMetadataManager {
this.groupConfigManager = groupConfigManager;
this.shareGroupAssignor = shareGroupAssignor;
this.streamsGroupSessionTimeoutMs = 45000;
this.authorizer = authorizer;
}
/**
@ -1791,14 +1814,13 @@ public class GroupMetadataManager {
* is larger than the current target assignment epoch.
* 3) The member's assignment is reconciled with the target assignment.
*
* @param context The request context.
* @param groupId The group id from the request.
* @param memberId The member id from the request.
* @param memberEpoch The member epoch from the request.
* @param instanceId The instance id from the request or null.
* @param rackId The rack id from the request or null.
* @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
* @param clientId The client id.
* @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from the request
* or null.
* @param subscribedTopicRegex The regular expression based subscription from the request
@ -1810,14 +1832,13 @@ public class GroupMetadataManager {
* a list of records to update the state machine.
*/
private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(
RequestContext context,
String groupId,
String memberId,
int memberEpoch,
String instanceId,
String rackId,
int rebalanceTimeoutMs,
String clientId,
String clientHost,
List<String> subscribedTopicNames,
String subscribedTopicRegex,
String assignorName,
@ -1868,8 +1889,8 @@ public class GroupMetadataManager {
.maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
.setClientId(clientId)
.setClientHost(clientHost)
.setClientId(context.clientId())
.setClientHost(context.clientAddress.toString())
.setClassicMemberMetadata(null)
.build();
@ -1885,6 +1906,7 @@ public class GroupMetadataManager {
);
bumpGroupEpoch |= maybeUpdateRegularExpressions(
context,
group,
member,
updatedMember,
@ -2472,6 +2494,7 @@ public class GroupMetadataManager {
* group. We align the refreshment of the regular expression in order to have
* them trigger only one rebalance per update.
*
* @param context The request context.
* @param group The consumer group.
* @param member The old member.
* @param updatedMember The new member.
@ -2479,6 +2502,7 @@ public class GroupMetadataManager {
* @return Whether a rebalance must be triggered.
*/
private boolean maybeUpdateRegularExpressions(
RequestContext context,
ConsumerGroup group,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
@ -2564,8 +2588,8 @@ public class GroupMetadataManager {
Set<String> regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
executor.schedule(
key,
() -> refreshRegularExpressions(groupId, log, time, metadataImage, regexes),
(result, exception) -> handleRegularExpressionsResult(groupId, result, exception)
() -> refreshRegularExpressions(context, groupId, log, time, metadataImage, authorizer, regexes),
(result, exception) -> handleRegularExpressionsResult(groupId, memberId, result, exception)
);
}
@ -2577,20 +2601,24 @@ public class GroupMetadataManager {
* as an asynchronous task in the executor. Hence, it should not access any state from
* the manager.
*
* @param groupId The group id.
* @param log The log instance.
* @param time The time instance.
* @param image The metadata image to use for listing the topics.
* @param regexes The list of regular expressions that must be resolved.
* @param context The request context.
* @param groupId The group id.
* @param log The log instance.
* @param time The time instance.
* @param image The metadata image to use for listing the topics.
* @param authorizer The authorizer.
* @param regexes The list of regular expressions that must be resolved.
* @return The list of resolved regular expressions.
*
* public for benchmarks.
*/
public static Map<String, ResolvedRegularExpression> refreshRegularExpressions(
RequestContext context,
String groupId,
Logger log,
Time time,
MetadataImage image,
Optional<Authorizer> authorizer,
Set<String> regexes
) {
long startTimeMs = time.milliseconds();
@ -2619,6 +2647,12 @@ public class GroupMetadataManager {
}
}
filterTopicDescribeAuthorizedTopics(
context,
authorizer,
resolvedRegexes
);
long version = image.provenance().lastContainedOffset();
Map<String, ResolvedRegularExpression> result = new HashMap<>(resolvedRegexes.size());
for (Map.Entry<String, Set<String>> resolvedRegex : resolvedRegexes.entrySet()) {
@ -2635,15 +2669,58 @@ public class GroupMetadataManager {
return result;
}
/**
* This method filters the topics in the resolved regexes
* that the member is authorized to describe.
*
* @param context The request context.
* @param authorizer The authorizer.
* @param resolvedRegexes The map of the regex pattern and its set of matched topics.
*/
private static void filterTopicDescribeAuthorizedTopics(
RequestContext context,
Optional<Authorizer> authorizer,
Map<String, Set<String>> resolvedRegexes
) {
if (authorizer.isEmpty()) return;
Map<String, Integer> topicNameCount = new HashMap<>();
resolvedRegexes.values().forEach(topicNames ->
topicNames.forEach(topicName ->
topicNameCount.compute(topicName, Utils::incValue)
)
);
List<Action> actions = topicNameCount.entrySet().stream().map(entry -> {
ResourcePattern resource = new ResourcePattern(TOPIC, entry.getKey(), LITERAL);
return new Action(DESCRIBE, resource, entry.getValue(), true, false);
}).collect(Collectors.toList());
List<AuthorizationResult> authorizationResults = authorizer.get().authorize(context, actions);
Set<String> deniedTopics = new HashSet<>();
IntStream.range(0, actions.size()).forEach(i -> {
if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
String deniedTopic = actions.get(i).resourcePattern().name();
deniedTopics.add(deniedTopic);
}
});
resolvedRegexes.forEach((__, topicNames) -> topicNames.removeAll(deniedTopics));
}
/**
* Handle the result of the asynchronous tasks which resolves the regular expressions.
*
* @param groupId The group id.
* @param memberId The member id.
* @param resolvedRegularExpressions The resolved regular expressions.
* @param exception The exception if the resolution failed.
* @return A CoordinatorResult containing the records to mutate the group state.
*/
private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResult(
String groupId,
String memberId,
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
Throwable exception
) {
@ -2654,8 +2731,8 @@ public class GroupMetadataManager {
}
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Received updated regular expressions: {}.",
groupId, resolvedRegularExpressions);
log.debug("[GroupId {}] Received updated regular expressions based on the context of member {}: {}.",
groupId, memberId, resolvedRegularExpressions);
}
List<CoordinatorRecord> records = new ArrayList<>();
@ -3803,14 +3880,13 @@ public class GroupMetadataManager {
} else {
// Otherwise, it is a regular heartbeat.
return consumerGroupHeartbeat(
context,
request.groupId(),
request.memberId(),
request.memberEpoch(),
request.instanceId(),
request.rackId(),
request.rebalanceTimeoutMs(),
context.clientId(),
context.clientAddress.toString(),
request.subscribedTopicNames(),
request.subscribedTopicRegex(),
request.serverAssignor(),

View File

@ -107,6 +107,9 @@ import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionFactory;
@ -16712,6 +16715,232 @@ public class GroupMetadataManagerTest {
);
}
@Test
public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();
String memberId2 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
String barTopicName = "bar";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
Authorizer authorizer = mock(Authorizer.class);
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build(12345L))
.withAuthorizer(authorizer)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(List.of("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build())
.withMember(new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build())
.withSubscriptionMetadata(Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)))
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignment(memberId2, mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.withResolvedRegularExpression("foo*", new ResolvedRegularExpression(
Set.of(fooTopicName), 0L, 0L))
.withAssignmentEpoch(10))
.build();
// sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
context.time.sleep(10001L);
Map<String, AuthorizationResult> acls = new HashMap<>();
acls.put(fooTopicName, AuthorizationResult.ALLOWED);
acls.put(barTopicName, AuthorizationResult.DENIED);
when(authorizer.authorize(any(), any())).thenAnswer(invocation -> {
List<Action> actions = invocation.getArgument(1, List.class);
return actions.stream()
.map(action -> acls.getOrDefault(action.resourcePattern().name(), AuthorizationResult.DENIED))
.collect(Collectors.toList());
});
// Member 2 heartbeats with a different regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Collections.singletonList(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
result1.response()
);
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.build();
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")
),
result1.records()
);
// Execute pending tasks.
assertEquals(
List.of(
new MockCoordinatorExecutor.ExecutorResult<>(
groupId + "-regex",
new CoordinatorResult<>(List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo*|bar*",
new ResolvedRegularExpression(
Set.of("foo"),
12345L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
))
)
),
context.processTasks()
);
// sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS
context.time.sleep(10001L);
// Access to the bar topic is granted.
acls.put(barTopicName, AuthorizationResult.ALLOWED);
assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)
)),
memberId2, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)
))
)));
// Member 2 heartbeats again with a new regex.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo|bar*")
.setServerAssignor("range")
.setTopicPartitions(Collections.emptyList()),
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
);
expectedMember2 = new ConsumerGroupMember.Builder(memberId2)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5)))
.build();
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(3, 4, 5))))),
result2.response()
);
assertRecordsEquals(
List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
),
result2.records()
);
// A regex refresh is triggered and the bar topic is included.
assertRecordsEquals(
List.of(
// The resolution of the new regex is persisted.
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(
groupId,
"foo|bar*",
new ResolvedRegularExpression(
Set.of("foo", "bar"),
12345L,
context.time.milliseconds()
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(
groupId,
Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)
)
),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
),
context.processTasks().get(0).result.records()
);
}
@Test
public void testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
String groupId = "fooup";

View File

@ -112,6 +112,7 @@ import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -123,6 +124,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -468,6 +470,7 @@ public class GroupMetadataManagerTestContext {
private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share");
private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<>();
private final Map<String, Object> config = new HashMap<>();
private Optional<Authorizer> authorizer = Optional.empty();
public Builder withConfig(String key, Object value) {
config.put(key, value);
@ -499,6 +502,11 @@ public class GroupMetadataManagerTestContext {
return this;
}
public Builder withAuthorizer(Authorizer authorizer) {
this.authorizer = Optional.of(authorizer);
return this;
}
public GroupMetadataManagerTestContext build() {
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (groupConfigManager == null) groupConfigManager = createConfigManager();
@ -528,6 +536,7 @@ public class GroupMetadataManagerTestContext {
.withGroupCoordinatorMetricsShard(metrics)
.withShareGroupAssignor(shareGroupAssignor)
.withGroupConfigManager(groupConfigManager)
.withAuthorizer(authorizer)
.build(),
groupConfigManager
);

View File

@ -42,6 +42,7 @@ import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -118,10 +119,12 @@ public class RegexResolutionBenchmark {
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void run() {
GroupMetadataManager.refreshRegularExpressions(
null,
GROUP_ID,
LOG,
TIME,
image,
Optional.empty(),
regexes
);
}