diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 7802293c9bb..3137e3e500a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -101,6 +101,19 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return SendBuilder.buildRequestSend(header, data()); } + /** + * Serializes header and body without prefixing with size (unlike `toSend`, which does include a size prefix). + */ + public final ByteBuffer serializeWithHeader(RequestHeader header) { + if (header.apiKey() != apiKey) { + throw new IllegalArgumentException("Could not build request " + apiKey + " with header api key " + header.apiKey()); + } + if (header.apiVersion() != version) { + throw new IllegalArgumentException("Could not build request version " + version + " with header version " + header.apiVersion()); + } + return RequestUtils.serialize(header.data(), header.headerVersion(), data(), version); + } + // Visible for testing public final ByteBuffer serialize() { return MessageUtil.toByteBuffer(data(), version); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 725ae054d9b..8f567aa9154 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -71,6 +71,7 @@ import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartit import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs; @@ -983,6 +984,52 @@ public class RequestResponseTest { data.write(writer, cache, (short) 2); } + @Test + public void testSerializeWithHeader() { + CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1); + topicsToCreate.add(new CreatableTopic() + .setName("topic") + .setNumPartitions(3) + .setReplicationFactor((short) 2)); + + CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + .setTimeoutMs(10) + .setTopics(topicsToCreate) + ).build(); + + short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion(); + RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, requestVersion, "client", 2); + ByteBuffer serializedRequest = createTopicsRequest.serializeWithHeader(requestHeader); + + RequestHeader parsedHeader = RequestHeader.parse(serializedRequest); + assertEquals(requestHeader, parsedHeader); + + RequestAndSize parsedRequest = AbstractRequest.parseRequest( + ApiKeys.CREATE_TOPICS, requestVersion, serializedRequest); + + assertEquals(createTopicsRequest.data(), parsedRequest.request.data()); + } + + @Test + public void testSerializeWithInconsistentHeaderApiKey() { + CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + ).build(); + short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion(); + RequestHeader requestHeader = new RequestHeader(DELETE_TOPICS, requestVersion, "client", 2); + assertThrows(IllegalArgumentException.class, () -> createTopicsRequest.serializeWithHeader(requestHeader)); + } + + @Test + public void testSerializeWithInconsistentHeaderVersion() { + CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder( + new CreateTopicsRequestData() + ).build((short) 2); + RequestHeader requestHeader = new RequestHeader(CREATE_TOPICS, (short) 1, "client", 2); + assertThrows(IllegalArgumentException.class, () -> createTopicsRequest.serializeWithHeader(requestHeader)); + } + @Test public void testJoinGroupRequestVersion0RebalanceTimeout() { final short version = 0; diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index bac9fb7bc27..616b7a22de6 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -17,9 +17,9 @@ package kafka.server -import java.util.{Collections, Properties} import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicReference +import java.util.{Collections, Properties} import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator @@ -32,8 +32,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreateableTopicConfig, CreateableTopicConfigCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader} import scala.collection.{Map, Seq, Set, mutable} import scala.jdk.CollectionConverters._ @@ -42,7 +42,8 @@ trait AutoTopicCreationManager { def createTopics( topicNames: Set[String], - controllerMutationQuota: ControllerMutationQuota + controllerMutationQuota: ControllerMutationQuota, + metadataRequestContext: Option[RequestContext] ): Seq[MetadataResponseTopic] } @@ -77,16 +78,27 @@ class DefaultAutoTopicCreationManager( private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) + /** + * Initiate auto topic creation for the given topics. + * + * @param topics the topics to create + * @param controllerMutationQuota the controller mutation quota for topic creation + * @param metadataRequestContext defined when creating topics on behalf of the client. The goal here is to preserve + * original client principal for auditing, thus needing to wrap a plain CreateTopicsRequest + * inside Envelope to send to the controller when forwarding is enabled. + * @return auto created topic metadata responses + */ override def createTopics( topics: Set[String], - controllerMutationQuota: ControllerMutationQuota + controllerMutationQuota: ControllerMutationQuota, + metadataRequestContext: Option[RequestContext] ): Seq[MetadataResponseTopic] = { val (creatableTopics, uncreatableTopicResponses) = filterCreatableTopics(topics) val creatableTopicResponses = if (creatableTopics.isEmpty) { Seq.empty } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) { - sendCreateTopicRequest(creatableTopics) + sendCreateTopicRequest(creatableTopics, metadataRequestContext) } else { createTopicsInZk(creatableTopics, controllerMutationQuota) } @@ -145,7 +157,8 @@ class DefaultAutoTopicCreationManager( } private def sendCreateTopicRequest( - creatableTopics: Map[String, CreatableTopic] + creatableTopics: Map[String, CreatableTopic], + metadataRequestContext: Option[RequestContext] ): Seq[MetadataResponseTopic] = { val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size) topicsToCreate.addAll(creatableTopics.values.asJavaCollection) @@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager( .setTopics(topicsToCreate) ) - channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler { + val requestCompletionHandler = new ControllerRequestCompletionHandler { override def onTimeout(): Unit = { debug(s"Auto topic creation timed out for ${creatableTopics.keys}.") clearInflightRequests(creatableTopics) } override def onComplete(response: ClientResponse): Unit = { - debug(s"Auto topic creation completed for ${creatableTopics.keys}.") + debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.") clearInflightRequests(creatableTopics) } - }) + } + + val channelManager = this.channelManager.getOrElse { + throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.") + } + + val request = metadataRequestContext.map { context => + val requestVersion = + channelManager.controllerApiVersions() match { + case None => + // We will rely on the Metadata request to be retried in the case + // that the latest version is not usable by the controller. + ApiKeys.CREATE_TOPICS.latestVersion() + case Some(nodeApiVersions) => + nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS) + } + + // Borrow client information such as client id and correlation id from the original request, + // in order to correlate the create request with the original metadata request. + val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, + requestVersion, + context.clientId, + context.correlationId) + ForwardingManager.buildEnvelopeRequest(context, + createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) + }.getOrElse(createTopicsRequest) + + channelManager.sendRequest(request, requestCompletionHandler) val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic => new MetadataResponseTopic() diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala index 788dd4c06c8..e84592b292d 100644 --- a/core/src/main/scala/kafka/server/ForwardingManager.scala +++ b/core/src/main/scala/kafka/server/ForwardingManager.scala @@ -24,7 +24,7 @@ import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, NodeApiVersions} import org.apache.kafka.common.errors.TimeoutException import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader} import scala.compat.java8.OptionConverters._ @@ -43,6 +43,20 @@ object ForwardingManager { ): ForwardingManager = { new ForwardingManagerImpl(channelManager) } + + private[server] def buildEnvelopeRequest(context: RequestContext, + forwardRequestBuffer: ByteBuffer): EnvelopeRequest.Builder = { + val principalSerde = context.principalSerde.asScala.getOrElse( + throw new IllegalArgumentException(s"Cannot deserialize principal from request context $context " + + "since there is no serde defined") + ) + val serializedPrincipal = principalSerde.serialize(context.principal) + new EnvelopeRequest.Builder( + forwardRequestBuffer, + serializedPrincipal, + context.clientAddress.getAddress + ) + } } class ForwardingManagerImpl( @@ -61,18 +75,9 @@ class ForwardingManagerImpl( request: RequestChannel.Request, responseCallback: Option[AbstractResponse] => Unit ): Unit = { - val principalSerde = request.context.principalSerde.asScala.getOrElse( - throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " + - "since there is no serde defined") - ) - val serializedPrincipal = principalSerde.serialize(request.context.principal) - val forwardRequestBuffer = request.buffer.duplicate() - forwardRequestBuffer.flip() - val envelopeRequest = new EnvelopeRequest.Builder( - forwardRequestBuffer, - serializedPrincipal, - request.context.clientAddress.getAddress - ) + val requestBuffer = request.buffer.duplicate() + requestBuffer.flip() + val envelopeRequest = ForwardingManager.buildEnvelopeRequest(request.context, requestBuffer) class ForwardingResponseHandler extends ControllerRequestCompletionHandler { override def onComplete(clientResponse: ClientResponse): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5aad1f6cc7d..42a203c4a57 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1113,7 +1113,7 @@ class KafkaApis(val requestChannel: RequestChannel, val nonExistingTopics = topics.diff(topicResponses.map(_.name).toSet) val nonExistingTopicResponses = if (allowAutoTopicCreation) { val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) - autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota) + autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context)) } else { nonExistingTopics.map { topic => val error = try { @@ -1341,7 +1341,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topicMetadata.headOption.isEmpty) { val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) - autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota) + autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota, None) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) } else { diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala index 0bd97c36e25..39f1f9e4537 100644 --- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala @@ -17,25 +17,32 @@ package kafka.server -import java.util.Properties +import java.net.InetAddress +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean +import java.util.{Collections, Optional, Properties} import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.utils.TestUtils import kafka.utils.TestUtils.createBroker +import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, RequestCompletionHandler} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} -import org.apache.kafka.common.message.CreateTopicsRequestData +import org.apache.kafka.common.message.{ApiVersionsResponseData, CreateTopicsRequestData} import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.network.{ClientInformation, ListenerName} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ -import org.junit.jupiter.api.Assertions.assertEquals +import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} +import org.apache.kafka.common.utils.{SecurityUtils, Utils} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.ArgumentMatchers.any import org.mockito.invocation.InvocationOnMock -import org.mockito.{ArgumentMatchers, Mockito} +import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import scala.collection.{Map, Seq} @@ -219,6 +226,118 @@ class AutoTopicCreationManagerTest { testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true) } + @Test + def testTopicCreationWithMetadataContextPassPrincipal(): Unit = { + val topicName = "topic" + + val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") + val serializeIsCalled = new AtomicBoolean(false) + val principalSerde = new KafkaPrincipalSerde { + override def serialize(principal: KafkaPrincipal): Array[Byte] = { + assertEquals(principal, userPrincipal) + serializeIsCalled.set(true) + Utils.utf8(principal.toString) + } + override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) + } + + val requestContext = initializeRequestContext(topicName, userPrincipal, Optional.of(principalSerde)) + + autoTopicCreationManager.createTopics( + Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) + + assertTrue(serializeIsCalled.get()) + + val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) + Mockito.verify(brokerToController).sendRequest( + argumentCaptor.capture(), + any(classOf[ControllerRequestCompletionHandler])) + val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion()) + assertEquals(userPrincipal, SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal))) + } + + @Test + def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit = { + val topicName = "topic" + + val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.empty()) + + // Throw upon undefined principal serde when building the forward request + assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics( + Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))) + } + + @Test + def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): Unit = { + val topicName = "topic" + + val principalSerde = new KafkaPrincipalSerde { + override def serialize(principal: KafkaPrincipal): Array[Byte] = { + Utils.utf8(principal.toString) + } + override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes)) + } + + val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde)) + autoTopicCreationManager.createTopics( + Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) + autoTopicCreationManager.createTopics( + Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) + + // Should only trigger once + val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler]) + Mockito.verify(brokerToController).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Complete with unsupported version will not trigger a retry, but cleanup the inflight topics instead + val header = new RequestHeader(ApiKeys.ENVELOPE, 0, "client", 1) + val response = new EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION) + val clientResponse = new ClientResponse(header, null, null, + 0, 0, false, null, null, response) + argumentCaptor.getValue.asInstanceOf[RequestCompletionHandler].onComplete(clientResponse) + Mockito.verify(brokerToController, Mockito.times(1)).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + + // Could do the send again as inflight topics are cleared. + autoTopicCreationManager.createTopics( + Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)) + Mockito.verify(brokerToController, Mockito.times(2)).sendRequest( + any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), + argumentCaptor.capture()) + } + + private def initializeRequestContext(topicName: String, + kafkaPrincipal: KafkaPrincipal, + principalSerde: Optional[KafkaPrincipalSerde]): RequestContext = { + + autoTopicCreationManager = new DefaultAutoTopicCreationManager( + config, + Some(brokerToController), + Some(adminManager), + Some(controller), + groupCoordinator, + transactionCoordinator) + + val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection + topicsCollection.add(getNewTopic(topicName)) + val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion() + .setApiKey(ApiKeys.CREATE_TOPICS.id) + .setMinVersion(0) + .setMaxVersion(0) + Mockito.when(brokerToController.controllerApiVersions()) + .thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion)))) + + Mockito.when(controller.isActive).thenReturn(false) + + val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion, + "clientId", 0) + new RequestContext(requestHeader, "1", InetAddress.getLocalHost, + kafkaPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, principalSerde) + } + private def testErrorWithCreationInZk(error: Errors, topicName: String, isInternal: Boolean, @@ -261,9 +380,10 @@ class AutoTopicCreationManagerTest { private def createTopicAndVerifyResult(error: Errors, topicName: String, - isInternal: Boolean): Unit = { + isInternal: Boolean, + metadataContext: Option[RequestContext] = None): Unit = { val topicResponses = autoTopicCreationManager.createTopics( - Set(topicName), UnboundedControllerMutationQuota) + Set(topicName), UnboundedControllerMutationQuota, metadataContext) val expectedResponses = Seq(new MetadataResponseTopic() .setErrorCode(error.code()) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 177a33a9344..8280380b712 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -892,7 +892,7 @@ class KafkaApisTest { val capturedResponse = expectNoThrottling(request) - verifyTopicCreation(topicName, true, true, request) + val capturedRequest = verifyTopicCreation(topicName, true, true, request) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer, autoTopicCreationManager, forwardingManager, controller, clientControllerQuotaManager, groupCoordinator, txnCoordinator) @@ -903,6 +903,8 @@ class KafkaApisTest { val response = capturedResponse.getValue.asInstanceOf[FindCoordinatorResponse] assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.error()) + assertTrue(capturedRequest.getValue.isEmpty) + verify(authorizer, autoTopicCreationManager) } @@ -993,7 +995,7 @@ class KafkaApisTest { val capturedResponse = expectNoThrottling(request) - verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request) + val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request) EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer, autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator) @@ -1011,26 +1013,34 @@ class KafkaApisTest { assertEquals(expectedMetadataResponse, response.topicMetadata()) + if (enableAutoTopicCreation) { + assertTrue(capturedRequest.getValue.isDefined) + assertEquals(request.context, capturedRequest.getValue.get) + } + verify(authorizer, autoTopicCreationManager) } private def verifyTopicCreation(topicName: String, enableAutoTopicCreation: Boolean, isInternal: Boolean, - request: RequestChannel.Request) = { + request: RequestChannel.Request): Capture[Option[RequestContext]] = { + val capturedRequest = EasyMock.newCapture[Option[RequestContext]]() if (enableAutoTopicCreation) { EasyMock.expect(clientControllerQuotaManager.newPermissiveQuotaFor(EasyMock.eq(request))) .andReturn(UnboundedControllerMutationQuota) EasyMock.expect(autoTopicCreationManager.createTopics( EasyMock.eq(Set(topicName)), - EasyMock.eq(UnboundedControllerMutationQuota))).andReturn( + EasyMock.eq(UnboundedControllerMutationQuota), + EasyMock.capture(capturedRequest))).andReturn( Seq(new MetadataResponseTopic() .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) .setIsInternal(isInternal) .setName(topicName)) ).once() } + capturedRequest } private def setupBrokerMetadata(hasEnoughLiveBrokers: Boolean, numBrokersNeeded: Int): Unit = {