KAFKA-18613: Auto-creation of internal topics in streams group heartbeat (#18981)

Implements auto-topic creation when handling the streams group
heartbeat.

Inside KafkaApis, the handler for streamsGroupHeartbeat uses the result
of the streams group heartbeat inside the group coordinator to attempt
to create all missing internal topics using AutoTopicCreationManager.
CREATE TOPIC ACLs are checked. The unit tests class
AutoTopicCreationManagerTest is brought back (it was recently deleted
during a ZK removal PR), but testing only the kraft-related
functionality.

Reviewers: Bruno Cadonna <bruno@confluent.io>

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
This commit is contained in:
Lucas Brutschy 2025-03-03 08:48:00 +01:00 committed by GitHub
parent bf660fdeb6
commit a04dd21f26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 520 additions and 3 deletions

View File

@ -45,6 +45,12 @@ trait AutoTopicCreationManager {
controllerMutationQuota: ControllerMutationQuota,
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic]
def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
requestContext: RequestContext
): Unit
}
class DefaultAutoTopicCreationManager(
@ -83,9 +89,30 @@ class DefaultAutoTopicCreationManager(
uncreatableTopicResponses ++ creatableTopicResponses
}
override def createStreamsInternalTopics(
topics: Map[String, CreatableTopic],
requestContext: RequestContext
): Unit = {
for ((_, creatableTopic) <- topics) {
if (creatableTopic.numPartitions() == -1) {
creatableTopic
.setNumPartitions(config.numPartitions)
}
if (creatableTopic.replicationFactor() == -1) {
creatableTopic
.setReplicationFactor(config.defaultReplicationFactor.shortValue)
}
}
if (topics.nonEmpty) {
sendCreateTopicRequest(topics, Some(requestContext))
}
}
private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
metadataRequestContext: Option[RequestContext]
requestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
@ -114,7 +141,7 @@ class DefaultAutoTopicCreationManager(
}
}
val request = metadataRequestContext.map { context =>
val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>

View File

@ -2664,6 +2664,8 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendMaybeThrottle(request, streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val requestContext = request.context
if (streamsGroupHeartbeatRequest.data().topology() != null) {
val requiredTopics: Seq[String] =
streamsGroupHeartbeatRequest.data().topology().subtopologies().iterator().asScala.flatMap(subtopology =>
@ -2707,7 +2709,24 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseData = response.data()
val topicsToCreate = response.creatableTopics().asScala
if (topicsToCreate.nonEmpty) {
throw new UnsupportedOperationException("Internal topic auto-creation not yet implemented.")
val createTopicUnauthorized =
if(!authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false))
authHelper.partitionSeqByAuthorized(request.context, CREATE, TOPIC, topicsToCreate.keys.toSeq)(identity[String])._2
else Set.empty
if (createTopicUnauthorized.nonEmpty) {
if (responseData.status() == null) {
responseData.setStatus(new util.ArrayList());
}
responseData.status().add(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Unauthorized to CREATE on topics " + createTopicUnauthorized.mkString(",") + ".")
)
} else {
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext);
}
}
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))

View File

@ -0,0 +1,391 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, RequestCompletionHandler}
import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.{ApiVersionsResponseData, CreateTopicsRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfig}
import org.apache.kafka.server.config.ServerConfigs
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.never
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq}
class AutoTopicCreationManagerTest {
private val requestTimeout = 100
private var config: KafkaConfig = _
private val metadataCache = Mockito.mock(classOf[MetadataCache])
private val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
private val groupCoordinator = Mockito.mock(classOf[GroupCoordinator])
private val transactionCoordinator = Mockito.mock(classOf[TransactionCoordinator])
private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator])
private var autoTopicCreationManager: AutoTopicCreationManager = _
private val internalTopicPartitions = 2
private val internalTopicReplicationFactor: Short = 2
@BeforeEach
def setup(): Unit = {
val props = TestUtils.createBrokerConfig(1)
props.setProperty(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout.toString)
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString)
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, internalTopicPartitions.toString)
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_REPLICATION_FACTOR_CONFIG , internalTopicPartitions.toString)
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
config = KafkaConfig.fromProps(props)
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
Mockito.when(metadataCache.getAliveBrokerNodes(any(classOf[ListenerName]))).thenReturn(aliveBrokers)
}
@Test
def testCreateOffsetTopic(): Unit = {
Mockito.when(groupCoordinator.groupMetadataTopicConfigs).thenReturn(new Properties)
testCreateTopic(GROUP_METADATA_TOPIC_NAME, isInternal = true, internalTopicPartitions, internalTopicReplicationFactor)
}
@Test
def testCreateTxnTopic(): Unit = {
Mockito.when(transactionCoordinator.transactionTopicConfigs).thenReturn(new Properties)
testCreateTopic(TRANSACTION_STATE_TOPIC_NAME, isInternal = true, internalTopicPartitions, internalTopicReplicationFactor)
}
@Test
def testCreateShareStateTopic(): Unit = {
Mockito.when(shareCoordinator.shareGroupStateTopicConfigs()).thenReturn(new Properties)
testCreateTopic(SHARE_GROUP_STATE_TOPIC_NAME, isInternal = true, internalTopicPartitions, internalTopicReplicationFactor)
}
@Test
def testCreateNonInternalTopic(): Unit = {
testCreateTopic("topic", isInternal = false)
}
private def testCreateTopic(topicName: String,
isInternal: Boolean,
numPartitions: Int = 1,
replicationFactor: Short = 1): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic(topicName, numPartitions, replicationFactor))
val requestBody = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTopics(topicsCollection)
.setTimeoutMs(requestTimeout))
// Calling twice with the same topic will only trigger one forwarding.
createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName, isInternal)
createTopicAndVerifyResult(Errors.UNKNOWN_TOPIC_OR_PARTITION, topicName, isInternal)
Mockito.verify(brokerToController).sendRequest(
ArgumentMatchers.eq(requestBody),
any(classOf[ControllerRequestCompletionHandler]))
}
@Test
def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
val topicName = "topic"
val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
val serializeIsCalled = new AtomicBoolean(false)
val principalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
assertEquals(principal, userPrincipal)
serializeIsCalled.set(true)
Utils.utf8(principal.toString)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
val requestContext = initializeRequestContext(userPrincipal, Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
assertTrue(serializeIsCalled.get())
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
argumentCaptor.capture(),
any(classOf[ControllerRequestCompletionHandler]))
val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
assertEquals(userPrincipal, SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
}
@Test
def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit = {
val topicName = "topic"
val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.empty())
// Throw upon undefined principal serde when building the forward request
assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)))
}
@Test
def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): Unit = {
val topicName = "topic"
val principalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
Utils.utf8(principal.toString)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
val requestContext = initializeRequestContext(KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
// Should only trigger once
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Complete with unsupported version will not trigger a retry, but cleanup the inflight topics instead
val header = new RequestHeader(ApiKeys.ENVELOPE, 0, "client", 1)
val response = new EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, response)
argumentCaptor.getValue.asInstanceOf[RequestCompletionHandler].onComplete(clientResponse)
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Could do the send again as inflight topics are cleared.
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
}
@Test
def testCreateStreamsInternalTopics(): Unit = {
val topicConfig = new CreatableTopicConfigCollection()
topicConfig.add(new CreatableTopicConfig().setName("cleanup.policy").setValue("compact"));
val topics = Map(
"stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(3).setReplicationFactor(2).setConfigs(topicConfig),
"stream-topic-2" -> new CreatableTopic().setName("stream-topic-2").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext)
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
argumentCaptor.capture(),
any(classOf[ControllerRequestCompletionHandler]))
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion(), "clientId", 0)
val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic("stream-topic-1", 3, 2.toShort).setConfigs(topicConfig))
topicsCollection.add(getNewTopic("stream-topic-2", 1, 1.toShort))
val requestBody = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTopics(topicsCollection)
.setTimeoutMs(requestTimeout))
.build(ApiKeys.CREATE_TOPICS.latestVersion())
val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
assertEquals(requestBody.data(), CreateTopicsRequest.parse(forwardedRequestBuffer, ApiKeys.CREATE_TOPICS.latestVersion()).data())
}
@Test
def testCreateStreamsInternalTopicsWithEmptyTopics(): Unit = {
val topics = Map.empty[String, CreatableTopic]
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext)
Mockito.verify(brokerToController, never()).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
any(classOf[ControllerRequestCompletionHandler]))
}
@Test
def testCreateStreamsInternalTopicsWithDefaultConfig(): Unit = {
val topics = Map(
"stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext);
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
argumentCaptor.capture(),
any(classOf[ControllerRequestCompletionHandler]))
val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, ApiKeys.CREATE_TOPICS.latestVersion(), "clientId", 0)
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic("stream-topic-1", config.numPartitions, config.defaultReplicationFactor.toShort))
val requestBody = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTopics(topicsCollection)
.setTimeoutMs(requestTimeout))
.build(ApiKeys.CREATE_TOPICS.latestVersion())
val forwardedRequestBuffer = capturedRequest.requestData().duplicate()
assertEquals(requestHeader, RequestHeader.parse(forwardedRequestBuffer));
assertEquals(requestBody.data(), CreateTopicsRequest.parse(forwardedRequestBuffer, ApiKeys.CREATE_TOPICS.latestVersion()).data())
}
@Test
def testCreateStreamsInternalTopicsPassesPrincipal(): Unit = {
val topics = Map(
"stream-topic-1" -> new CreatableTopic().setName("stream-topic-1").setNumPartitions(-1).setReplicationFactor(-1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext);
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
argumentCaptor.capture(),
any(classOf[ControllerRequestCompletionHandler]))
val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
assertEquals(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user"), SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
}
private def initializeRequestContextWithUserPrincipal(): RequestContext = {
val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
val principalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
Utils.utf8(principal.toString)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
initializeRequestContext(userPrincipal, Optional.of(principalSerde))
}
private def initializeRequestContext(kafkaPrincipal: KafkaPrincipal,
principalSerde: Optional[KafkaPrincipalSerde]): RequestContext = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
Some(shareCoordinator))
val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.CREATE_TOPICS.id)
.setMinVersion(ApiKeys.CREATE_TOPICS.oldestVersion())
.setMaxVersion(ApiKeys.CREATE_TOPICS.latestVersion())
Mockito.when(brokerToController.controllerApiVersions())
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
"clientId", 0)
new RequestContext(requestHeader, "1", InetAddress.getLocalHost, Optional.empty(),
kafkaPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, principalSerde)
}
private def createTopicAndVerifyResult(error: Errors,
topicName: String,
isInternal: Boolean,
metadataContext: Option[RequestContext] = None): Unit = {
val topicResponses = autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, metadataContext)
val expectedResponses = Seq(new MetadataResponseTopic()
.setErrorCode(error.code())
.setIsInternal(isInternal)
.setName(topicName))
assertEquals(expectedResponses, topicResponses)
}
private def getNewTopic(topicName: String, numPartitions: Int, replicationFactor: Short): CreatableTopic = {
new CreatableTopic()
.setName(topicName)
.setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor)
}
}

View File

@ -10080,6 +10080,86 @@ class KafkaApisTest extends Logging {
assertEquals("Use of Kafka internal topics __consumer_offsets,__transaction_state,__share_group_state in a Kafka Streams topology is prohibited.", response.data.errorMessage())
}
@Test
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreate(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
kafkaApis = createKafkaApis(
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val missingTopics = Map("test" -> new CreatableTopic())
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics.asJava))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
verify(autoTopicCreationManager).createStreamsInternalTopics(missingTopics, requestChannelRequest.context)
}
@Test
def testStreamsGroupHeartbeatRequestWithInternalTopicsToCreateMissingCreateACL(): Unit = {
metadataCache = mock(classOf[KRaftMetadataCache])
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group");
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], isNotNull[util.List[Action]])).thenAnswer(invocation => {
val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]]
actions.asScala.map { action =>
if (action.resourcePattern.name.equals("test") && action.operation() == AclOperation.CREATE && action.resourcePattern().resourceType() == ResourceType.TOPIC) {
AuthorizationResult.DENIED
} else if (action.operation() == AclOperation.CREATE && action.resourcePattern().resourceType() == ResourceType.CLUSTER) {
AuthorizationResult.DENIED
} else {
AuthorizationResult.ALLOWED
}
}.asJava
})
kafkaApis = createKafkaApis(
authorizer = Some(authorizer),
overrideProperties = Map(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -> "classic,streams")
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val missingTopics = Map("test" -> new CreatableTopic())
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics.asJava))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.NONE.code, response.data.errorCode())
assertEquals(null, response.data.errorMessage())
assertEquals(
java.util.List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Unauthorized to CREATE on topics test.")
),
response.data.status()
)
}
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {