MINOR: Handle envelope response in AutoTopicCreationManager (#20569)

In the create topic request we send a CreateTopic request in an
Envelope, so we need to also unpack the response correctly

Reviewers: Lucas Brutschy <lucasbru@apache.org>
This commit is contained in:
Jinhe Zhang 2025-09-25 05:06:22 -04:00 committed by GitHub
parent 444ceeb325
commit 14917ae727
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 427 additions and 46 deletions

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.requests.{AbstractResponse, CreateTopicsRequest, CreateTopicsResponse, EnvelopeResponse, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
@ -198,6 +198,22 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)
// Capture request header information for proper envelope response parsing
val requestHeaderForParsing = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
}
val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
clearInflightRequests(creatableTopics)
@ -213,6 +229,33 @@ class DefaultAutoTopicCreationManager(
} else {
if (response.hasResponse) {
response.responseBody() match {
case envelopeResponse: EnvelopeResponse =>
// Unwrap the envelope response to get the actual CreateTopicsResponse
val envelopeError = envelopeResponse.error()
if (envelopeError != Errors.NONE) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
} else {
requestHeaderForParsing match {
case Some(requestHeader) =>
try {
// Use the captured request header for proper envelope response parsing
val createTopicsResponse = AbstractResponse.parseResponse(
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]
createTopicsResponse.data().topics().forEach(topicResult => {
val error = Errors.forCode(topicResult.errorCode)
if (error != Errors.NONE) {
warn(s"Auto topic creation failed for ${topicResult.name} with error '${error.name}': ${topicResult.errorMessage}")
}
})
} catch {
case e: Exception =>
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
}
case None =>
warn(s"Cannot parse envelope response without original request header information")
}
}
case createTopicsResponse: CreateTopicsResponse =>
createTopicsResponse.data().topics().forEach(topicResult => {
val error = Errors.forCode(topicResult.errorCode)
@ -229,26 +272,13 @@ class DefaultAutoTopicCreationManager(
}
}
val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
val request = (requestContext, requestHeaderForParsing) match {
case (Some(context), Some(requestHeader)) =>
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
case _ =>
createTopicsRequest
}
channelManager.sendRequest(request, requestCompletionHandler)
@ -364,6 +394,22 @@ class DefaultAutoTopicCreationManager(
.setTopics(topicsToCreate)
)
// Capture request header information for proper envelope response parsing
val requestHeaderForParsing = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
}
val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
clearInflightRequests(creatableTopics)
@ -382,36 +428,52 @@ class DefaultAutoTopicCreationManager(
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage, timeoutMs)
} else {
if (response.hasResponse) {
response.responseBody() match {
case envelopeResponse: EnvelopeResponse =>
// Unwrap the envelope response to get the actual CreateTopicsResponse
val envelopeError = envelopeResponse.error()
if (envelopeError != Errors.NONE) {
warn(s"Auto topic creation failed for ${creatableTopics.keys} with envelope error: ${envelopeError}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Envelope error: ${envelopeError}", timeoutMs)
} else {
requestHeaderForParsing match {
case Some(requestHeader) =>
try {
// Use the captured request header for proper envelope response parsing
val createTopicsResponse = AbstractResponse.parseResponse(
envelopeResponse.responseData(), requestHeader).asInstanceOf[CreateTopicsResponse]
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
} catch {
case e: Exception =>
warn(s"Failed to parse envelope response for auto topic creation of ${creatableTopics.keys}", e)
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Response parsing error: ${e.getMessage}", timeoutMs)
}
case None =>
warn(s"Cannot parse envelope response without original request header information")
cacheTopicCreationErrors(creatableTopics.keys.toSet, "Missing request header for envelope parsing", timeoutMs)
}
}
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
case _ =>
case unexpectedResponse =>
warn(s"Auto topic creation request received unexpected response type: ${unexpectedResponse.getClass.getSimpleName}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, s"Unexpected response type: ${unexpectedResponse.getClass.getSimpleName}", timeoutMs)
}
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
}
}
}
}
val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
val request = (requestContext, requestHeaderForParsing) match {
case (Some(context), Some(requestHeader)) =>
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
createTopicsRequest.build(requestHeader.apiVersion()).serializeWithHeader(requestHeader))
case _ =>
createTopicsRequest
}
channelManager.sendRequest(request, requestCompletionHandler)

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.RequestUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.server.util.MockTime
@ -523,6 +524,324 @@ class AutoTopicCreationManagerTest {
assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively cleaned up")
}
@Test
def testEnvelopeResponseSuccessfulParsing(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create a successful CreateTopicsResponse
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.NONE.code())
.setNumPartitions(1)
.setReplicationFactor(1.toShort)
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
val correlationId = requestContext.correlationId // Use the actual correlation ID from request context
val clientId = requestContext.clientId
// Serialize the CreateTopicsResponse with header as it would appear in an envelope
val responseHeader = new ResponseHeader(correlationId, ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
val serializedResponse = RequestUtils.serialize(responseHeader.data(), responseHeader.headerVersion(),
createTopicsResponse.data(), requestVersion)
// Create an EnvelopeResponse containing the serialized CreateTopicsResponse
val envelopeResponse = new EnvelopeResponse(serializedResponse, Errors.NONE)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, correlationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify no errors were cached (successful response)
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertTrue(cachedErrors.isEmpty, "No errors should be cached for successful response")
}
@Test
def testEnvelopeResponseWithEnvelopeError(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create an EnvelopeResponse with an envelope-level error
val envelopeResponse = new EnvelopeResponse(ByteBuffer.allocate(0), Errors.UNSUPPORTED_VERSION)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, requestContext.clientId, requestContext.correlationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify the envelope error was cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertTrue(cachedErrors("test-topic").contains("Envelope error: UNSUPPORTED_VERSION"))
}
@Test
def testEnvelopeResponseParsingException(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create an EnvelopeResponse with malformed response data that will cause parsing to fail
val malformedData = ByteBuffer.wrap("invalid response data".getBytes())
val envelopeResponse = new EnvelopeResponse(malformedData, Errors.NONE)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, requestContext.clientId, requestContext.correlationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify the parsing error was cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
}
@Test
def testEnvelopeResponseCorrelationIdMismatch(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create a CreateTopicsResponse with a different correlation ID than the request
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.NONE.code())
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
val requestCorrelationId = 123
val responseCorrelationId = 456 // Different correlation ID
val clientId = "test-client"
// Serialize the CreateTopicsResponse with mismatched correlation ID
val responseHeader = new ResponseHeader(responseCorrelationId, ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
val serializedResponse = RequestUtils.serialize(responseHeader.data(), responseHeader.headerVersion(),
createTopicsResponse.data(), requestVersion)
// Create an EnvelopeResponse containing the serialized CreateTopicsResponse
val envelopeResponse = new EnvelopeResponse(serializedResponse, Errors.NONE)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, requestCorrelationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify the correlation ID mismatch error was cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertTrue(cachedErrors("test-topic").contains("Response parsing error:"))
}
@Test
def testEnvelopeResponseWithTopicErrors(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic-1" -> new CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1),
"test-topic-2" -> new CreatableTopic().setName("test-topic-2").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val timeoutMs = 5000L
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, timeoutMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create a CreateTopicsResponse with mixed success and error results
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
// Successful topic
val successResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic-1")
.setErrorCode(Errors.NONE.code())
.setNumPartitions(1)
.setReplicationFactor(1.toShort)
createTopicsResponseData.topics().add(successResult)
// Failed topic
val errorResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic-2")
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
.setErrorMessage("Topic already exists")
createTopicsResponseData.topics().add(errorResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
val correlationId = requestContext.correlationId // Use the actual correlation ID from request context
val clientId = requestContext.clientId
// Serialize the CreateTopicsResponse with header
val responseHeader = new ResponseHeader(correlationId, ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
val serializedResponse = RequestUtils.serialize(responseHeader.data(), responseHeader.headerVersion(),
createTopicsResponse.data(), requestVersion)
// Create an EnvelopeResponse containing the serialized CreateTopicsResponse
val envelopeResponse = new EnvelopeResponse(serializedResponse, Errors.NONE)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, correlationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify only the failed topic was cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(
Set("test-topic-1", "test-topic-2"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size, s"Expected only 1 error but found: $cachedErrors")
assertTrue(cachedErrors.contains("test-topic-2"))
assertEquals("Topic already exists", cachedErrors("test-topic-2"))
}
@Test
def testSendCreateTopicRequestEnvelopeHandling(): Unit = {
// Test the sendCreateTopicRequest method (without error caching) handles envelopes correctly
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val requestContext = initializeRequestContextWithUserPrincipal()
// Call createTopics which uses sendCreateTopicRequest internally
autoTopicCreationManager.createTopics(
Set("test-topic"), ControllerMutationQuota.UNBOUNDED_CONTROLLER_MUTATION_QUOTA, Some(requestContext))
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Create a CreateTopicsResponse with an error
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())
.setErrorMessage("Invalid topic name")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val requestVersion = ApiKeys.CREATE_TOPICS.latestVersion()
val correlationId = requestContext.correlationId // Use the actual correlation ID from request context
val clientId = requestContext.clientId
// Serialize the CreateTopicsResponse with header
val responseHeader = new ResponseHeader(correlationId, ApiKeys.CREATE_TOPICS.responseHeaderVersion(requestVersion))
val serializedResponse = RequestUtils.serialize(responseHeader.data(), responseHeader.headerVersion(),
createTopicsResponse.data(), requestVersion)
// Create an EnvelopeResponse containing the serialized CreateTopicsResponse
val envelopeResponse = new EnvelopeResponse(serializedResponse, Errors.NONE)
val requestHeader = new RequestHeader(ApiKeys.ENVELOPE, 0, clientId, correlationId)
val clientResponse = new ClientResponse(requestHeader, null, null,
0, 0, false, null, null, envelopeResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// For sendCreateTopicRequest, errors are not cached, but we can verify the handler completed without exception
// The test passes if no exception is thrown during envelope processing
}
@Test
def testErrorCacheExpirationBasedEviction(): Unit = {
// Create manager with small cache size for testing