KAFKA-12294; forward auto topic request within envelope on behalf of clients (#10142)

When auto-creating topics in KIP-500, the broker will send a `CreateTopics` request to the controller. It is useful in this case to preserve the original principal from the corresponding `Metadata` request by wrapping the `CreateTopics` request in an envelope so that the controller may repeat the authorization and to improve auditability. This follows a similar pattern to how standard `CreateTopics` requests are forwarded to the controller.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Boyang Chen 2021-04-05 15:54:57 -07:00 committed by GitHub
parent 66ba91733c
commit cad514bff9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 36 deletions

View File

@ -101,6 +101,19 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return SendBuilder.buildRequestSend(header, data());
}
/**
* Serializes header and body without prefixing with size (unlike `toSend`, which does include a size prefix).
*/
public final ByteBuffer serializeWithHeader(RequestHeader header) {
if (header.apiKey() != apiKey) {
throw new IllegalArgumentException("Could not build request " + apiKey + " with header api key " + header.apiKey());
}
if (header.apiVersion() != version) {
throw new IllegalArgumentException("Could not build request version " + version + " with header version " + header.apiVersion());
}
return RequestUtils.serialize(header.data(), header.headerVersion(), data(), version);
}
// Visible for testing
public final ByteBuffer serialize() {
return MessageUtil.toByteBuffer(data(), version);

View File

@ -71,6 +71,7 @@ import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartit
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
@ -983,6 +984,52 @@ public class RequestResponseTest {
data.write(writer, cache, (short) 2);
}
@Test
public void testSerializeWithHeader() {
CreatableTopicCollection topicsToCreate = new CreatableTopicCollection(1);
topicsToCreate.add(new CreatableTopic()
.setName("topic")
.setNumPartitions(3)
.setReplicationFactor((short) 2));
CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTimeoutMs(10)
.setTopics(topicsToCreate)
).build();
short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
RequestHeader requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, requestVersion, "client", 2);
ByteBuffer serializedRequest = createTopicsRequest.serializeWithHeader(requestHeader);
RequestHeader parsedHeader = RequestHeader.parse(serializedRequest);
assertEquals(requestHeader, parsedHeader);
RequestAndSize parsedRequest = AbstractRequest.parseRequest(
ApiKeys.CREATE_TOPICS, requestVersion, serializedRequest);
assertEquals(createTopicsRequest.data(), parsedRequest.request.data());
}
@Test
public void testSerializeWithInconsistentHeaderApiKey() {
CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
).build();
short requestVersion = ApiKeys.CREATE_TOPICS.latestVersion();
RequestHeader requestHeader = new RequestHeader(DELETE_TOPICS, requestVersion, "client", 2);
assertThrows(IllegalArgumentException.class, () -> createTopicsRequest.serializeWithHeader(requestHeader));
}
@Test
public void testSerializeWithInconsistentHeaderVersion() {
CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
).build((short) 2);
RequestHeader requestHeader = new RequestHeader(CREATE_TOPICS, (short) 1, "client", 2);
assertThrows(IllegalArgumentException.class, () -> createTopicsRequest.serializeWithHeader(requestHeader));
}
@Test
public void testJoinGroupRequestVersion0RebalanceTimeout() {
final short version = 0;

View File

@ -17,9 +17,9 @@
package kafka.server
import java.util.{Collections, Properties}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
@ -32,8 +32,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreateableTopicConfig, CreateableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader}
import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._
@ -42,7 +42,8 @@ trait AutoTopicCreationManager {
def createTopics(
topicNames: Set[String],
controllerMutationQuota: ControllerMutationQuota
controllerMutationQuota: ControllerMutationQuota,
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic]
}
@ -77,16 +78,27 @@ class DefaultAutoTopicCreationManager(
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
/**
* Initiate auto topic creation for the given topics.
*
* @param topics the topics to create
* @param controllerMutationQuota the controller mutation quota for topic creation
* @param metadataRequestContext defined when creating topics on behalf of the client. The goal here is to preserve
* original client principal for auditing, thus needing to wrap a plain CreateTopicsRequest
* inside Envelope to send to the controller when forwarding is enabled.
* @return auto created topic metadata responses
*/
override def createTopics(
topics: Set[String],
controllerMutationQuota: ControllerMutationQuota
controllerMutationQuota: ControllerMutationQuota,
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
val (creatableTopics, uncreatableTopicResponses) = filterCreatableTopics(topics)
val creatableTopicResponses = if (creatableTopics.isEmpty) {
Seq.empty
} else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) {
sendCreateTopicRequest(creatableTopics)
sendCreateTopicRequest(creatableTopics, metadataRequestContext)
} else {
createTopicsInZk(creatableTopics, controllerMutationQuota)
}
@ -145,7 +157,8 @@ class DefaultAutoTopicCreationManager(
}
private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic]
creatableTopics: Map[String, CreatableTopic],
metadataRequestContext: Option[RequestContext]
): Seq[MetadataResponseTopic] = {
val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
@ -156,17 +169,44 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)
channelManager.get.sendRequest(createTopicsRequest, new ControllerRequestCompletionHandler {
val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
clearInflightRequests(creatableTopics)
}
override def onComplete(response: ClientResponse): Unit = {
debug(s"Auto topic creation completed for ${creatableTopics.keys}.")
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody.toString}.")
clearInflightRequests(creatableTopics)
}
})
}
val channelManager = this.channelManager.getOrElse {
throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
}
val request = metadataRequestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions() match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
channelManager.sendRequest(request, requestCompletionHandler)
val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
new MetadataResponseTopic()

View File

@ -24,7 +24,7 @@ import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestHeader}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
import scala.compat.java8.OptionConverters._
@ -43,6 +43,20 @@ object ForwardingManager {
): ForwardingManager = {
new ForwardingManagerImpl(channelManager)
}
private[server] def buildEnvelopeRequest(context: RequestContext,
forwardRequestBuffer: ByteBuffer): EnvelopeRequest.Builder = {
val principalSerde = context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request context $context " +
"since there is no serde defined")
)
val serializedPrincipal = principalSerde.serialize(context.principal)
new EnvelopeRequest.Builder(
forwardRequestBuffer,
serializedPrincipal,
context.clientAddress.getAddress
)
}
}
class ForwardingManagerImpl(
@ -61,18 +75,9 @@ class ForwardingManagerImpl(
request: RequestChannel.Request,
responseCallback: Option[AbstractResponse] => Unit
): Unit = {
val principalSerde = request.context.principalSerde.asScala.getOrElse(
throw new IllegalArgumentException(s"Cannot deserialize principal from request $request " +
"since there is no serde defined")
)
val serializedPrincipal = principalSerde.serialize(request.context.principal)
val forwardRequestBuffer = request.buffer.duplicate()
forwardRequestBuffer.flip()
val envelopeRequest = new EnvelopeRequest.Builder(
forwardRequestBuffer,
serializedPrincipal,
request.context.clientAddress.getAddress
)
val requestBuffer = request.buffer.duplicate()
requestBuffer.flip()
val envelopeRequest = ForwardingManager.buildEnvelopeRequest(request.context, requestBuffer)
class ForwardingResponseHandler extends ControllerRequestCompletionHandler {
override def onComplete(clientResponse: ClientResponse): Unit = {

View File

@ -1113,7 +1113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val nonExistingTopics = topics.diff(topicResponses.map(_.name).toSet)
val nonExistingTopicResponses = if (allowAutoTopicCreation) {
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota)
autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context))
} else {
nonExistingTopics.map { topic =>
val error = try {
@ -1341,7 +1341,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topicMetadata.headOption.isEmpty) {
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota)
autoTopicCreationManager.createTopics(Seq(internalTopicName).toSet, controllerMutationQuota, None)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse(
Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
} else {

View File

@ -17,25 +17,32 @@
package kafka.server
import java.util.Properties
import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.group.GroupCoordinator
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
import kafka.utils.TestUtils.createBroker
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions, RequestCompletionHandler}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.{ApiVersionsResponseData, CreateTopicsRequestData}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests._
import org.junit.jupiter.api.Assertions.assertEquals
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.invocation.InvocationOnMock
import org.mockito.{ArgumentMatchers, Mockito}
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq}
@ -219,6 +226,118 @@ class AutoTopicCreationManagerTest {
testErrorWithCreationInZk(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.TRANSACTION_STATE_TOPIC_NAME, isInternal = true)
}
@Test
def testTopicCreationWithMetadataContextPassPrincipal(): Unit = {
val topicName = "topic"
val userPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
val serializeIsCalled = new AtomicBoolean(false)
val principalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
assertEquals(principal, userPrincipal)
serializeIsCalled.set(true)
Utils.utf8(principal.toString)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
val requestContext = initializeRequestContext(topicName, userPrincipal, Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
assertTrue(serializeIsCalled.get())
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest(
argumentCaptor.capture(),
any(classOf[ControllerRequestCompletionHandler]))
val capturedRequest = argumentCaptor.getValue.asInstanceOf[EnvelopeRequest.Builder].build(ApiKeys.ENVELOPE.latestVersion())
assertEquals(userPrincipal, SecurityUtils.parseKafkaPrincipal(Utils.utf8(capturedRequest.requestPrincipal)))
}
@Test
def testTopicCreationWithMetadataContextWhenPrincipalSerdeNotDefined(): Unit = {
val topicName = "topic"
val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.empty())
// Throw upon undefined principal serde when building the forward request
assertThrows(classOf[IllegalArgumentException], () => autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext)))
}
@Test
def testTopicCreationWithMetadataContextNoRetryUponUnsupportedVersion(): Unit = {
val topicName = "topic"
val principalSerde = new KafkaPrincipalSerde {
override def serialize(principal: KafkaPrincipal): Array[Byte] = {
Utils.utf8(principal.toString)
}
override def deserialize(bytes: Array[Byte]): KafkaPrincipal = SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes))
}
val requestContext = initializeRequestContext(topicName, KafkaPrincipal.ANONYMOUS, Optional.of(principalSerde))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
// Should only trigger once
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Complete with unsupported version will not trigger a retry, but cleanup the inflight topics instead
val header = new RequestHeader(ApiKeys.ENVELOPE, 0, "client", 1)
val response = new EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, response)
argumentCaptor.getValue.asInstanceOf[RequestCompletionHandler].onComplete(clientResponse)
Mockito.verify(brokerToController, Mockito.times(1)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Could do the send again as inflight topics are cleared.
autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota, Some(requestContext))
Mockito.verify(brokerToController, Mockito.times(2)).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
}
private def initializeRequestContext(topicName: String,
kafkaPrincipal: KafkaPrincipal,
principalSerde: Optional[KafkaPrincipalSerde]): RequestContext = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
Some(brokerToController),
Some(adminManager),
Some(controller),
groupCoordinator,
transactionCoordinator)
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic(topicName))
val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.CREATE_TOPICS.id)
.setMinVersion(0)
.setMaxVersion(0)
Mockito.when(brokerToController.controllerApiVersions())
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
Mockito.when(controller.isActive).thenReturn(false)
val requestHeader = new RequestHeader(ApiKeys.METADATA, ApiKeys.METADATA.latestVersion,
"clientId", 0)
new RequestContext(requestHeader, "1", InetAddress.getLocalHost,
kafkaPrincipal, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false, principalSerde)
}
private def testErrorWithCreationInZk(error: Errors,
topicName: String,
isInternal: Boolean,
@ -261,9 +380,10 @@ class AutoTopicCreationManagerTest {
private def createTopicAndVerifyResult(error: Errors,
topicName: String,
isInternal: Boolean): Unit = {
isInternal: Boolean,
metadataContext: Option[RequestContext] = None): Unit = {
val topicResponses = autoTopicCreationManager.createTopics(
Set(topicName), UnboundedControllerMutationQuota)
Set(topicName), UnboundedControllerMutationQuota, metadataContext)
val expectedResponses = Seq(new MetadataResponseTopic()
.setErrorCode(error.code())

View File

@ -892,7 +892,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(request)
verifyTopicCreation(topicName, true, true, request)
val capturedRequest = verifyTopicCreation(topicName, true, true, request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
autoTopicCreationManager, forwardingManager, controller, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
@ -903,6 +903,8 @@ class KafkaApisTest {
val response = capturedResponse.getValue.asInstanceOf[FindCoordinatorResponse]
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.error())
assertTrue(capturedRequest.getValue.isEmpty)
verify(authorizer, autoTopicCreationManager)
}
@ -993,7 +995,7 @@ class KafkaApisTest {
val capturedResponse = expectNoThrottling(request)
verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request)
val capturedRequest = verifyTopicCreation(topicName, enableAutoTopicCreation, isInternal, request)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer,
autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator)
@ -1011,26 +1013,34 @@ class KafkaApisTest {
assertEquals(expectedMetadataResponse, response.topicMetadata())
if (enableAutoTopicCreation) {
assertTrue(capturedRequest.getValue.isDefined)
assertEquals(request.context, capturedRequest.getValue.get)
}
verify(authorizer, autoTopicCreationManager)
}
private def verifyTopicCreation(topicName: String,
enableAutoTopicCreation: Boolean,
isInternal: Boolean,
request: RequestChannel.Request) = {
request: RequestChannel.Request): Capture[Option[RequestContext]] = {
val capturedRequest = EasyMock.newCapture[Option[RequestContext]]()
if (enableAutoTopicCreation) {
EasyMock.expect(clientControllerQuotaManager.newPermissiveQuotaFor(EasyMock.eq(request)))
.andReturn(UnboundedControllerMutationQuota)
EasyMock.expect(autoTopicCreationManager.createTopics(
EasyMock.eq(Set(topicName)),
EasyMock.eq(UnboundedControllerMutationQuota))).andReturn(
EasyMock.eq(UnboundedControllerMutationQuota),
EasyMock.capture(capturedRequest))).andReturn(
Seq(new MetadataResponseTopic()
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
.setIsInternal(isInternal)
.setName(topicName))
).once()
}
capturedRequest
}
private def setupBrokerMetadata(hasEnoughLiveBrokers: Boolean, numBrokersNeeded: Int): Unit = {