MINOR: Expose internal topic creation errors to the user (#20325)

This PR introduces an ExpiringErrorCache that temporarily stores topic
creation errors, allowing the system to provide detailed failure reasons
in subsequent heartbeat responses.

Key Designs:

Time-based expiration: Errors are cached with a TTL based on the
streams group heartbeat interval (2x heartbeat interval). This ensures
errors remain available for at least one retry cycle while preventing
unbounded growth.    2. Priority queue for efficient expiry: Uses a
min-heap to track entries by expiration time, enabling efficient cleanup
of expired entries during cache operations.    3. Capacity enforcement:
Limits cache size to prevent memory issues under high error rates. When
capacity is exceeded, oldest entries are evicted first.    4. Reference
equality checks: Uses eq for object identity comparison when cleaning up
stale entries, avoiding expensive value comparisons while correctly
handling entry updates.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
This commit is contained in:
Jinhe Zhang 2025-09-16 14:52:39 -04:00 committed by GitHub
parent b043ca2074
commit 8ba41a2d0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 913 additions and 19 deletions

View File

@ -18,6 +18,7 @@
package kafka.server package kafka.server
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import kafka.coordinator.transaction.TransactionCoordinator import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging import kafka.utils.Logging
@ -35,6 +36,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
import org.apache.kafka.server.quota.ControllerMutationQuota import org.apache.kafka.server.quota.ControllerMutationQuota
import org.apache.kafka.common.utils.Time
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -50,21 +52,96 @@ trait AutoTopicCreationManager {
def createStreamsInternalTopics( def createStreamsInternalTopics(
topics: Map[String, CreatableTopic], topics: Map[String, CreatableTopic],
requestContext: RequestContext requestContext: RequestContext,
timeoutMs: Long
): Unit ): Unit
def getStreamsInternalTopicCreationErrors(
topicNames: Set[String],
currentTimeMs: Long
): Map[String, String]
def close(): Unit = {}
} }
/**
* Thread-safe cache that stores topic creation errors with per-entry expiration.
* - Expiration: maintained by a min-heap (priority queue) on expiration time
* - Capacity: enforced by evicting entries with earliest expiration time (not LRU)
* - Updates: old entries remain in queue but are ignored via reference equality check
*/
private[server] class ExpiringErrorCache(maxSize: Int, time: Time) {
private case class Entry(topicName: String, errorMessage: String, expirationTimeMs: Long)
private val byTopic = new ConcurrentHashMap[String, Entry]()
private val expiryQueue = new java.util.PriorityQueue[Entry](11, new java.util.Comparator[Entry] {
override def compare(a: Entry, b: Entry): Int = java.lang.Long.compare(a.expirationTimeMs, b.expirationTimeMs)
})
private val lock = new ReentrantLock()
def put(topicName: String, errorMessage: String, ttlMs: Long): Unit = {
lock.lock()
try {
val currentTimeMs = time.milliseconds()
val expirationTimeMs = currentTimeMs + ttlMs
val entry = Entry(topicName, errorMessage, expirationTimeMs)
byTopic.put(topicName, entry)
expiryQueue.add(entry)
// Clean up expired entries and enforce capacity
while (!expiryQueue.isEmpty &&
(expiryQueue.peek().expirationTimeMs <= currentTimeMs || byTopic.size() > maxSize)) {
val evicted = expiryQueue.poll()
val current = byTopic.get(evicted.topicName)
if (current != null && (current eq evicted)) {
byTopic.remove(evicted.topicName)
}
}
} finally {
lock.unlock()
}
}
def getErrorsForTopics(topicNames: Set[String], currentTimeMs: Long): Map[String, String] = {
val result = mutable.Map.empty[String, String]
topicNames.foreach { topicName =>
val entry = byTopic.get(topicName)
if (entry != null && entry.expirationTimeMs > currentTimeMs) {
result.put(topicName, entry.errorMessage)
}
}
result.toMap
}
private[server] def clear(): Unit = {
lock.lock()
try {
byTopic.clear()
expiryQueue.clear()
} finally {
lock.unlock()
}
}
}
class DefaultAutoTopicCreationManager( class DefaultAutoTopicCreationManager(
config: KafkaConfig, config: KafkaConfig,
channelManager: NodeToControllerChannelManager, channelManager: NodeToControllerChannelManager,
groupCoordinator: GroupCoordinator, groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator, txnCoordinator: TransactionCoordinator,
shareCoordinator: ShareCoordinator shareCoordinator: ShareCoordinator,
time: Time,
topicErrorCacheCapacity: Int = 1000
) extends AutoTopicCreationManager with Logging { ) extends AutoTopicCreationManager with Logging {
private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
// Hardcoded default capacity; can be overridden in tests via constructor param
private val topicCreationErrorCache = new ExpiringErrorCache(topicErrorCacheCapacity, time)
/** /**
* Initiate auto topic creation for the given topics. * Initiate auto topic creation for the given topics.
* *
@ -93,13 +170,21 @@ class DefaultAutoTopicCreationManager(
override def createStreamsInternalTopics( override def createStreamsInternalTopics(
topics: Map[String, CreatableTopic], topics: Map[String, CreatableTopic],
requestContext: RequestContext requestContext: RequestContext,
timeoutMs: Long
): Unit = { ): Unit = {
if (topics.nonEmpty) { if (topics.nonEmpty) {
sendCreateTopicRequest(topics, Some(requestContext)) sendCreateTopicRequestWithErrorCaching(topics, Some(requestContext), timeoutMs)
} }
} }
override def getStreamsInternalTopicCreationErrors(
topicNames: Set[String],
currentTimeMs: Long
): Map[String, String] = {
topicCreationErrorCache.getErrorsForTopics(topicNames, currentTimeMs)
}
private def sendCreateTopicRequest( private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic], creatableTopics: Map[String, CreatableTopic],
requestContext: Option[RequestContext] requestContext: Option[RequestContext]
@ -264,4 +349,101 @@ class DefaultAutoTopicCreationManager(
(creatableTopics, uncreatableTopics) (creatableTopics, uncreatableTopics)
} }
private def sendCreateTopicRequestWithErrorCaching(
creatableTopics: Map[String, CreatableTopic],
requestContext: Option[RequestContext],
timeoutMs: Long
): Seq[MetadataResponseTopic] = {
val topicsToCreate = new CreateTopicsRequestData.CreatableTopicCollection(creatableTopics.size)
topicsToCreate.addAll(creatableTopics.values.asJavaCollection)
val createTopicsRequest = new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTimeoutMs(config.requestTimeoutMs)
.setTopics(topicsToCreate)
)
val requestCompletionHandler = new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
clearInflightRequests(creatableTopics)
debug(s"Auto topic creation timed out for ${creatableTopics.keys}.")
cacheTopicCreationErrors(creatableTopics.keys.toSet, "Auto topic creation timed out.", timeoutMs)
}
override def onComplete(response: ClientResponse): Unit = {
clearInflightRequests(creatableTopics)
if (response.authenticationException() != null) {
val authException = response.authenticationException()
warn(s"Auto topic creation failed for ${creatableTopics.keys} with authentication exception: ${authException.getMessage}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, authException.getMessage, timeoutMs)
} else if (response.versionMismatch() != null) {
val versionException = response.versionMismatch()
warn(s"Auto topic creation failed for ${creatableTopics.keys} with version mismatch exception: ${versionException.getMessage}")
cacheTopicCreationErrors(creatableTopics.keys.toSet, versionException.getMessage, timeoutMs)
} else {
response.responseBody() match {
case createTopicsResponse: CreateTopicsResponse =>
cacheTopicCreationErrorsFromResponse(createTopicsResponse, timeoutMs)
case _ =>
debug(s"Auto topic creation completed for ${creatableTopics.keys} with response ${response.responseBody}.")
}
}
}
}
val request = requestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
ApiKeys.CREATE_TOPICS.latestVersion()
case Some(nodeApiVersions) =>
nodeApiVersions.latestUsableVersion(ApiKeys.CREATE_TOPICS)
}
// Borrow client information such as client id and correlation id from the original request,
// in order to correlate the create request with the original metadata request.
val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
requestVersion,
context.clientId,
context.correlationId)
ForwardingManager.buildEnvelopeRequest(context,
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
}.getOrElse(createTopicsRequest)
channelManager.sendRequest(request, requestCompletionHandler)
val creatableTopicResponses = creatableTopics.keySet.toSeq.map { topic =>
new MetadataResponseTopic()
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setName(topic)
.setIsInternal(Topic.isInternal(topic))
}
creatableTopicResponses
}
private def cacheTopicCreationErrors(topicNames: Set[String], errorMessage: String, ttlMs: Long): Unit = {
topicNames.foreach { topicName =>
topicCreationErrorCache.put(topicName, errorMessage, ttlMs)
}
}
private def cacheTopicCreationErrorsFromResponse(response: CreateTopicsResponse, ttlMs: Long): Unit = {
response.data().topics().forEach { topicResult =>
if (topicResult.errorCode() != Errors.NONE.code()) {
val errorMessage = Option(topicResult.errorMessage())
.filter(_.nonEmpty)
.getOrElse(Errors.forCode(topicResult.errorCode()).message())
topicCreationErrorCache.put(topicResult.name(), errorMessage, ttlMs)
debug(s"Cached topic creation error for ${topicResult.name()}: $errorMessage")
}
}
}
override def close(): Unit = {
topicCreationErrorCache.clear()
}
} }

View File

@ -387,7 +387,7 @@ class BrokerServer(
autoTopicCreationManager = new DefaultAutoTopicCreationManager( autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, clientToControllerChannelManager, groupCoordinator, config, clientToControllerChannelManager, groupCoordinator,
transactionCoordinator, shareCoordinator) transactionCoordinator, shareCoordinator, time)
dynamicConfigHandlers = Map[ConfigType, ConfigHandler]( dynamicConfigHandlers = Map[ConfigType, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers), ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers),
@ -780,6 +780,9 @@ class BrokerServer(
if (shareCoordinator != null) if (shareCoordinator != null)
CoreUtils.swallow(shareCoordinator.shutdown(), this) CoreUtils.swallow(shareCoordinator.shutdown(), this)
if (autoTopicCreationManager != null)
CoreUtils.swallow(autoTopicCreationManager.close(), this)
if (assignmentsManager != null) if (assignmentsManager != null)
CoreUtils.swallow(assignmentsManager.close(), this) CoreUtils.swallow(assignmentsManager.close(), this)

View File

@ -2812,10 +2812,35 @@ class KafkaApis(val requestChannel: RequestChannel,
) )
} }
} else { } else {
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext); // Compute group-specific timeout for caching errors (2 * heartbeat interval)
} val heartbeatIntervalMs = Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
} .map(_.streamsHeartbeatIntervalMs().toLong)
.getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong)
val timeoutMs = heartbeatIntervalMs * 2
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, requestContext, timeoutMs)
// Check for cached topic creation errors only if there's already a MISSING_INTERNAL_TOPICS status
val hasMissingInternalTopicsStatus = responseData.status() != null &&
responseData.status().stream().anyMatch(s => s.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
if (hasMissingInternalTopicsStatus) {
val currentTimeMs = time.milliseconds()
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicsToCreate.keys.toSet, currentTimeMs)
if (cachedErrors.nonEmpty) {
val missingInternalTopicStatus =
responseData.status().stream().filter(x => x.statusCode() == StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code()).findFirst()
val creationErrorDetails = cachedErrors.map { case (topic, error) => s"$topic ($error)" }.mkString(", ")
if (missingInternalTopicStatus.isPresent) {
val existingDetail = Option(missingInternalTopicStatus.get().statusDetail()).getOrElse("")
missingInternalTopicStatus.get().setStatusDetail(
existingDetail + s"; Creation failed: $creationErrorDetails."
)
}
}
}
}
}
requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData)) requestHelper.sendMaybeThrottle(request, new StreamsGroupHeartbeatResponse(responseData))
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor, Errors}
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils} import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfig} import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfig}
import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.metadata.MetadataCache
@ -45,14 +46,15 @@ import org.apache.kafka.server.quota.ControllerMutationQuota
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test} import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.never
import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito} import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
import org.mockito.Mockito.never
import scala.collection.{Map, Seq} import scala.collection.{Map, Seq}
class AutoTopicCreationManagerTest { class AutoTopicCreationManagerTest {
private val requestTimeout = 100 private val requestTimeout = 100
private val testCacheCapacity = 3
private var config: KafkaConfig = _ private var config: KafkaConfig = _
private val metadataCache = Mockito.mock(classOf[MetadataCache]) private val metadataCache = Mockito.mock(classOf[MetadataCache])
private val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager]) private val brokerToController = Mockito.mock(classOf[NodeToControllerChannelManager])
@ -60,6 +62,7 @@ class AutoTopicCreationManagerTest {
private val transactionCoordinator = Mockito.mock(classOf[TransactionCoordinator]) private val transactionCoordinator = Mockito.mock(classOf[TransactionCoordinator])
private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator]) private val shareCoordinator = Mockito.mock(classOf[ShareCoordinator])
private var autoTopicCreationManager: AutoTopicCreationManager = _ private var autoTopicCreationManager: AutoTopicCreationManager = _
private val mockTime = new MockTime(0L, 0L)
private val internalTopicPartitions = 2 private val internalTopicPartitions = 2
private val internalTopicReplicationFactor: Short = 2 private val internalTopicReplicationFactor: Short = 2
@ -76,6 +79,8 @@ class AutoTopicCreationManagerTest {
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) props.setProperty(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString) props.setProperty(ShareCoordinatorConfig.STATE_TOPIC_NUM_PARTITIONS_CONFIG, internalTopicReplicationFactor.toString)
// Set a short group max session timeout for testing TTL (1 second)
props.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, "1000")
config = KafkaConfig.fromProps(props) config = KafkaConfig.fromProps(props)
val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1, "host1", 1)) val aliveBrokers = util.List.of(new Node(0, "host0", 0), new Node(1, "host1", 1))
@ -115,7 +120,9 @@ class AutoTopicCreationManagerTest {
brokerToController, brokerToController,
groupCoordinator, groupCoordinator,
transactionCoordinator, transactionCoordinator,
shareCoordinator) shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection val topicsCollection = new CreateTopicsRequestData.CreatableTopicCollection
topicsCollection.add(getNewTopic(topicName, numPartitions, replicationFactor)) topicsCollection.add(getNewTopic(topicName, numPartitions, replicationFactor))
@ -231,9 +238,11 @@ class AutoTopicCreationManagerTest {
brokerToController, brokerToController,
groupCoordinator, groupCoordinator,
transactionCoordinator, transactionCoordinator,
shareCoordinator) shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest( Mockito.verify(brokerToController).sendRequest(
@ -267,9 +276,11 @@ class AutoTopicCreationManagerTest {
brokerToController, brokerToController,
groupCoordinator, groupCoordinator,
transactionCoordinator, transactionCoordinator,
shareCoordinator) shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
Mockito.verify(brokerToController, never()).sendRequest( Mockito.verify(brokerToController, never()).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]), any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
@ -288,9 +299,11 @@ class AutoTopicCreationManagerTest {
brokerToController, brokerToController,
groupCoordinator, groupCoordinator,
transactionCoordinator, transactionCoordinator,
shareCoordinator) shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext) autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]) val argumentCaptor = ArgumentCaptor.forClass(classOf[AbstractRequest.Builder[_ <: AbstractRequest]])
Mockito.verify(brokerToController).sendRequest( Mockito.verify(brokerToController).sendRequest(
@ -319,7 +332,9 @@ class AutoTopicCreationManagerTest {
brokerToController, brokerToController,
groupCoordinator, groupCoordinator,
transactionCoordinator, transactionCoordinator,
shareCoordinator) shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion() val createTopicApiVersion = new ApiVersionsResponseData.ApiVersion()
.setApiKey(ApiKeys.CREATE_TOPICS.id) .setApiKey(ApiKeys.CREATE_TOPICS.id)
@ -356,4 +371,217 @@ class AutoTopicCreationManagerTest {
.setNumPartitions(numPartitions) .setNumPartitions(numPartitions)
.setReplicationFactor(replicationFactor) .setReplicationFactor(replicationFactor)
} }
@Test
def testTopicCreationErrorCaching(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"test-topic-1" -> new CreatableTopic().setName("test-topic-1").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Simulate a CreateTopicsResponse with errors
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic-1")
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
.setErrorMessage("Topic 'test-topic-1' already exists.")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)
// Trigger the completion handler
argumentCaptor.getValue.onComplete(clientResponse)
// Verify that the error was cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic-1"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertTrue(cachedErrors.contains("test-topic-1"))
assertEquals("Topic 'test-topic-1' already exists.", cachedErrors("test-topic-1"))
}
@Test
def testGetTopicCreationErrorsWithMultipleTopics(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
val topics = Map(
"success-topic" -> new CreatableTopic().setName("success-topic").setNumPartitions(1).setReplicationFactor(1),
"failed-topic" -> new CreatableTopic().setName("failed-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Simulate mixed response - one success, one failure
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
createTopicsResponseData.topics().add(
new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("success-topic")
.setErrorCode(Errors.NONE.code())
)
createTopicsResponseData.topics().add(
new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("failed-topic")
.setErrorCode(Errors.POLICY_VIOLATION.code())
.setErrorMessage("Policy violation")
)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)
argumentCaptor.getValue.onComplete(clientResponse)
// Only the failed topic should be cached
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("success-topic", "failed-topic", "nonexistent-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertTrue(cachedErrors.contains("failed-topic"))
assertEquals("Policy violation", cachedErrors("failed-topic"))
}
@Test
def testErrorCacheTTL(): Unit = {
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = testCacheCapacity)
// First cache an error by simulating topic creation failure
val topics = Map(
"test-topic" -> new CreatableTopic().setName("test-topic").setNumPartitions(1).setReplicationFactor(1)
)
val requestContext = initializeRequestContextWithUserPrincipal()
val shortTtlMs = 1000L // Use 1 second TTL for faster testing
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, shortTtlMs)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Simulate a CreateTopicsResponse with error
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName("test-topic")
.setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code())
.setErrorMessage("Invalid replication factor")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)
// Cache the error at T0
argumentCaptor.getValue.onComplete(clientResponse)
// Verify error is cached and accessible within TTL
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertEquals(1, cachedErrors.size)
assertEquals("Invalid replication factor", cachedErrors("test-topic"))
// Advance time beyond TTL
mockTime.sleep(shortTtlMs + 100) // T0 + 1.1 seconds
// Verify error is now expired and proactively cleaned up
val expiredErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(Set("test-topic"), mockTime.milliseconds())
assertTrue(expiredErrors.isEmpty, "Expired errors should be proactively cleaned up")
}
@Test
def testErrorCacheExpirationBasedEviction(): Unit = {
// Create manager with small cache size for testing
autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config,
brokerToController,
groupCoordinator,
transactionCoordinator,
shareCoordinator,
mockTime,
topicErrorCacheCapacity = 3)
val requestContext = initializeRequestContextWithUserPrincipal()
// Create 5 topics to exceed the cache size of 3
val topicNames = (1 to 5).map(i => s"test-topic-$i")
// Add errors for all 5 topics to the cache
topicNames.zipWithIndex.foreach { case (topicName, idx) =>
val topics = Map(
topicName -> new CreatableTopic().setName(topicName).setNumPartitions(1).setReplicationFactor(1)
)
autoTopicCreationManager.createStreamsInternalTopics(topics, requestContext, config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs() * 2)
val argumentCaptor = ArgumentCaptor.forClass(classOf[ControllerRequestCompletionHandler])
Mockito.verify(brokerToController, Mockito.atLeastOnce()).sendRequest(
any(classOf[AbstractRequest.Builder[_ <: AbstractRequest]]),
argumentCaptor.capture())
// Simulate error response for this topic
val createTopicsResponseData = new org.apache.kafka.common.message.CreateTopicsResponseData()
val topicResult = new org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult()
.setName(topicName)
.setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code())
.setErrorMessage(s"Topic '$topicName' already exists.")
createTopicsResponseData.topics().add(topicResult)
val createTopicsResponse = new CreateTopicsResponse(createTopicsResponseData)
val header = new RequestHeader(ApiKeys.CREATE_TOPICS, 0, "client", 1)
val clientResponse = new ClientResponse(header, null, null,
0, 0, false, null, null, createTopicsResponse)
argumentCaptor.getValue.onComplete(clientResponse)
// Advance time slightly between additions to ensure different timestamps
mockTime.sleep(10)
}
// With cache size of 3, topics 1 and 2 should have been evicted
val cachedErrors = autoTopicCreationManager.getStreamsInternalTopicCreationErrors(topicNames.toSet, mockTime.milliseconds())
// Only the last 3 topics should be in the cache (topics 3, 4, 5)
assertEquals(3, cachedErrors.size, "Cache should contain only the most recent 3 entries")
assertTrue(cachedErrors.contains("test-topic-3"), "test-topic-3 should be in cache")
assertTrue(cachedErrors.contains("test-topic-4"), "test-topic-4 should be in cache")
assertTrue(cachedErrors.contains("test-topic-5"), "test-topic-5 should be in cache")
assertTrue(!cachedErrors.contains("test-topic-1"), "test-topic-1 should have been evicted")
assertTrue(!cachedErrors.contains("test-topic-2"), "test-topic-2 should have been evicted")
}
} }

View File

@ -0,0 +1,400 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random
import java.util.concurrent.{CountDownLatch, TimeUnit}
class ExpiringErrorCacheTest {
private var mockTime: MockTime = _
private var cache: ExpiringErrorCache = _
@BeforeEach
def setUp(): Unit = {
mockTime = new MockTime()
}
// Basic Functionality Tests
@Test
def testPutAndGet(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 2000L)
val errors = cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds())
assertEquals(2, errors.size)
assertEquals("error1", errors("topic1"))
assertEquals("error2", errors("topic2"))
}
@Test
def testGetNonExistentTopic(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
val errors = cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds())
assertEquals(1, errors.size)
assertEquals("error1", errors("topic1"))
assertFalse(errors.contains("topic2"))
}
@Test
def testUpdateExistingEntry(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
assertEquals("error1", cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds())("topic1"))
// Update with new error
cache.put("topic1", "error2", 2000L)
assertEquals("error2", cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds())("topic1"))
}
@Test
def testGetMultipleTopics(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 1000L)
cache.put("topic3", "error3", 1000L)
val errors = cache.getErrorsForTopics(Set("topic1", "topic3", "topic4"), mockTime.milliseconds())
assertEquals(2, errors.size)
assertEquals("error1", errors("topic1"))
assertEquals("error3", errors("topic3"))
assertFalse(errors.contains("topic2"))
assertFalse(errors.contains("topic4"))
}
// Expiration Tests
@Test
def testExpiredEntryNotReturned(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
// Entry should be available before expiration
assertEquals(1, cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds()).size)
// Advance time past expiration
mockTime.sleep(1001L)
// Entry should not be returned after expiration
assertTrue(cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds()).isEmpty)
}
@Test
def testExpiredEntriesCleanedOnPut(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
// Add entries with different TTLs
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 2000L)
// Advance time to expire topic1 but not topic2
mockTime.sleep(1500L)
// Add a new entry - this should trigger cleanup
cache.put("topic3", "error3", 1000L)
// Verify only non-expired entries remain
val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"), mockTime.milliseconds())
assertEquals(2, errors.size)
assertFalse(errors.contains("topic1"))
assertEquals("error2", errors("topic2"))
assertEquals("error3", errors("topic3"))
}
@Test
def testMixedExpiredAndValidEntries(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 500L)
cache.put("topic2", "error2", 1000L)
cache.put("topic3", "error3", 1500L)
// Advance time to expire only topic1
mockTime.sleep(600L)
val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"), mockTime.milliseconds())
assertEquals(2, errors.size)
assertFalse(errors.contains("topic1"))
assertTrue(errors.contains("topic2"))
assertTrue(errors.contains("topic3"))
}
// Capacity Enforcement Tests
@Test
def testCapacityEnforcement(): Unit = {
cache = new ExpiringErrorCache(3, mockTime)
// Add 5 entries, exceeding capacity of 3
for (i <- 1 to 5) {
cache.put(s"topic$i", s"error$i", 1000L)
// Small time advance between entries to ensure different insertion order
mockTime.sleep(10L)
}
val errors = cache.getErrorsForTopics((1 to 5).map(i => s"topic$i").toSet, mockTime.milliseconds())
assertEquals(3, errors.size)
// The cache evicts by earliest expiration time
// Since all have same TTL, earliest inserted (topic1, topic2) should be evicted
assertFalse(errors.contains("topic1"))
assertFalse(errors.contains("topic2"))
assertTrue(errors.contains("topic3"))
assertTrue(errors.contains("topic4"))
assertTrue(errors.contains("topic5"))
}
@Test
def testEvictionOrder(): Unit = {
cache = new ExpiringErrorCache(3, mockTime)
// Add entries with different TTLs
cache.put("topic1", "error1", 3000L) // Expires at 3000
mockTime.sleep(100L)
cache.put("topic2", "error2", 1000L) // Expires at 1100
mockTime.sleep(100L)
cache.put("topic3", "error3", 2000L) // Expires at 2200
mockTime.sleep(100L)
cache.put("topic4", "error4", 500L) // Expires at 800
// With capacity 3, topic4 (earliest expiration) should be evicted
val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3", "topic4"), mockTime.milliseconds())
assertEquals(3, errors.size)
assertTrue(errors.contains("topic1"))
assertTrue(errors.contains("topic2"))
assertTrue(errors.contains("topic3"))
assertFalse(errors.contains("topic4"))
}
@Test
def testCapacityWithDifferentTTLs(): Unit = {
cache = new ExpiringErrorCache(2, mockTime)
cache.put("topic1", "error1", 5000L) // Long TTL
cache.put("topic2", "error2", 100L) // Short TTL
cache.put("topic3", "error3", 3000L) // Medium TTL
// topic2 has earliest expiration, so it should be evicted
val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3"), mockTime.milliseconds())
assertEquals(2, errors.size)
assertTrue(errors.contains("topic1"))
assertFalse(errors.contains("topic2"))
assertTrue(errors.contains("topic3"))
}
// Update and Stale Entry Tests
@Test
def testUpdateDoesNotLeaveStaleEntries(): Unit = {
cache = new ExpiringErrorCache(3, mockTime)
// Fill cache to capacity
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 1000L)
cache.put("topic3", "error3", 1000L)
// Update topic2 with longer TTL
cache.put("topic2", "error2_updated", 5000L)
// Add new entry to trigger eviction
cache.put("topic4", "error4", 1000L)
// Should evict topic1 or topic3 (earliest expiration), not the updated topic2
val errors = cache.getErrorsForTopics(Set("topic1", "topic2", "topic3", "topic4"), mockTime.milliseconds())
assertEquals(3, errors.size)
assertTrue(errors.contains("topic2"))
assertEquals("error2_updated", errors("topic2"))
}
@Test
def testStaleEntriesInQueueHandledCorrectly(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
// Add and update same topic multiple times
cache.put("topic1", "error1", 1000L)
cache.put("topic1", "error2", 2000L)
cache.put("topic1", "error3", 3000L)
// Only latest value should be returned
val errors = cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds())
assertEquals(1, errors.size)
assertEquals("error3", errors("topic1"))
// Advance time to expire first two entries
mockTime.sleep(2500L)
// Force cleanup by adding new entry
cache.put("topic2", "error_new", 1000L)
// topic1 should still be available with latest value
val errorsAfterCleanup = cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds())
assertEquals(1, errorsAfterCleanup.size)
assertEquals("error3", errorsAfterCleanup("topic1"))
}
// Edge Cases
@Test
def testEmptyCache(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
val errors = cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds())
assertTrue(errors.isEmpty)
}
@Test
def testSingleEntryCache(): Unit = {
cache = new ExpiringErrorCache(1, mockTime)
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 1000L)
// Only most recent should remain
val errors = cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds())
assertEquals(1, errors.size)
assertFalse(errors.contains("topic1"))
assertTrue(errors.contains("topic2"))
}
@Test
def testZeroTTL(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 0L)
// Entry expires immediately
assertTrue(cache.getErrorsForTopics(Set("topic1"), mockTime.milliseconds()).isEmpty)
}
@Test
def testClearOperation(): Unit = {
cache = new ExpiringErrorCache(10, mockTime)
cache.put("topic1", "error1", 1000L)
cache.put("topic2", "error2", 1000L)
assertEquals(2, cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds()).size)
cache.clear()
assertTrue(cache.getErrorsForTopics(Set("topic1", "topic2"), mockTime.milliseconds()).isEmpty)
}
// Concurrent Access Tests
@Test
def testConcurrentPutOperations(): Unit = {
cache = new ExpiringErrorCache(100, mockTime)
val numThreads = 10
val numTopicsPerThread = 20
val latch = new CountDownLatch(numThreads)
(1 to numThreads).foreach { threadId =>
Future {
try {
for (i <- 1 to numTopicsPerThread) {
cache.put(s"topic_${threadId}_$i", s"error_${threadId}_$i", 1000L)
}
} finally {
latch.countDown()
}
}
}
assertTrue(latch.await(5, TimeUnit.SECONDS))
// Verify all entries were added
val allTopics = (1 to numThreads).flatMap { threadId =>
(1 to numTopicsPerThread).map(i => s"topic_${threadId}_$i")
}.toSet
val errors = cache.getErrorsForTopics(allTopics, mockTime.milliseconds())
assertEquals(100, errors.size) // Limited by cache capacity
}
@Test
def testConcurrentPutAndGet(): Unit = {
cache = new ExpiringErrorCache(100, mockTime)
val numOperations = 1000
val random = new Random()
val topics = (1 to 50).map(i => s"topic$i").toArray
val futures = (1 to numOperations).map { _ =>
Future {
if (random.nextBoolean()) {
// Put operation
val topic = topics(random.nextInt(topics.length))
cache.put(topic, s"error_${random.nextInt()}", 1000L)
} else {
// Get operation
val topicsToGet = Set(topics(random.nextInt(topics.length)))
cache.getErrorsForTopics(topicsToGet, mockTime.milliseconds())
}
}
}
// Wait for all operations to complete
Future.sequence(futures).map(_ => ())
}
@Test
def testConcurrentUpdates(): Unit = {
cache = new ExpiringErrorCache(50, mockTime)
val numThreads = 10
val numUpdatesPerThread = 100
val sharedTopics = (1 to 10).map(i => s"shared_topic$i").toArray
val latch = new CountDownLatch(numThreads)
(1 to numThreads).foreach { threadId =>
Future {
try {
val random = new Random()
for (i <- 1 to numUpdatesPerThread) {
val topic = sharedTopics(random.nextInt(sharedTopics.length))
cache.put(topic, s"error_thread${threadId}_update$i", 1000L)
}
} finally {
latch.countDown()
}
}
}
assertTrue(latch.await(5, TimeUnit.SECONDS))
// Verify all shared topics have some value
val errors = cache.getErrorsForTopics(sharedTopics.toSet, mockTime.milliseconds())
sharedTopics.foreach { topic =>
assertTrue(errors.contains(topic), s"Topic $topic should have a value")
assertTrue(errors(topic).startsWith("error_thread"), s"Value should be from one of the threads")
}
}
}

View File

@ -168,7 +168,8 @@ class KafkaApisTest extends Logging {
authorizer: Option[Authorizer] = None, authorizer: Option[Authorizer] = None,
configRepository: ConfigRepository = new MockConfigRepository(), configRepository: ConfigRepository = new MockConfigRepository(),
overrideProperties: Map[String, String] = Map.empty, overrideProperties: Map[String, String] = Map.empty,
featureVersions: Seq[FeatureVersion] = Seq.empty featureVersions: Seq[FeatureVersion] = Seq.empty,
autoTopicCreationManager: Option[AutoTopicCreationManager] = None
): KafkaApis = { ): KafkaApis = {
val properties = TestUtils.createBrokerConfig(brokerId) val properties = TestUtils.createBrokerConfig(brokerId)
@ -194,7 +195,7 @@ class KafkaApisTest extends Logging {
groupCoordinator = groupCoordinator, groupCoordinator = groupCoordinator,
txnCoordinator = txnCoordinator, txnCoordinator = txnCoordinator,
shareCoordinator = shareCoordinator, shareCoordinator = shareCoordinator,
autoTopicCreationManager = autoTopicCreationManager, autoTopicCreationManager = autoTopicCreationManager.getOrElse(this.autoTopicCreationManager),
brokerId = brokerId, brokerId = brokerId,
config = config, config = config,
configRepository = configRepository, configRepository = configRepository,
@ -10887,7 +10888,7 @@ class KafkaApisTest extends Logging {
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics.asJava)) future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics.asJava))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data) assertEquals(streamsGroupHeartbeatResponse, response.data)
verify(autoTopicCreationManager).createStreamsInternalTopics(missingTopics, requestChannelRequest.context) verify(autoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong())
} }
@Test @Test
@ -10947,6 +10948,61 @@ class KafkaApisTest extends Logging {
) )
} }
@Test
def testStreamsGroupHeartbeatRequestWithCachedTopicCreationErrors(): Unit = {
val features = mock(classOf[FinalizedFeatures])
when(features.finalizedFeatures()).thenReturn(util.Map.of(StreamsVersion.FEATURE_NAME, 1.toShort))
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.features()).thenReturn(features)
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, true).build())
val future = new CompletableFuture[StreamsGroupHeartbeatResult]()
when(groupCoordinator.streamsGroupHeartbeat(
requestChannelRequest.context,
streamsGroupHeartbeatRequest
)).thenReturn(future)
// Mock AutoTopicCreationManager to return cached errors
val mockAutoTopicCreationManager = mock(classOf[AutoTopicCreationManager])
when(mockAutoTopicCreationManager.getStreamsInternalTopicCreationErrors(ArgumentMatchers.eq(Set("test-topic")), any()))
.thenReturn(Map("test-topic" -> "INVALID_REPLICATION_FACTOR"))
// Mock the createStreamsInternalTopics method to do nothing (simulate topic creation attempt)
doNothing().when(mockAutoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong())
kafkaApis = createKafkaApis(autoTopicCreationManager = Some(mockAutoTopicCreationManager))
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
// Group coordinator returns MISSING_INTERNAL_TOPICS status and topics to create
val missingTopics = util.Map.of("test-topic", new CreatableTopic())
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")
.setStatus(util.List.of(
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Internal topics are missing: [test-topic]")
))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.NONE.code, response.data.errorCode())
assertEquals(null, response.data.errorMessage())
// Verify that the cached error was appended to the existing status detail
assertEquals(1, response.data.status().size())
val status = response.data.status().get(0)
assertEquals(StreamsGroupHeartbeatResponse.Status.MISSING_INTERNAL_TOPICS.code(), status.statusCode())
assertTrue(status.statusDetail().contains("Internal topics are missing: [test-topic]"))
assertTrue(status.statusDetail().contains("Creation failed: test-topic (INVALID_REPLICATION_FACTOR)"))
// Verify that createStreamsInternalTopics was called
verify(mockAutoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong())
verify(mockAutoTopicCreationManager).getStreamsInternalTopicCreationErrors(ArgumentMatchers.eq(Set("test-topic")), any())
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = Array(true, false)) @ValueSource(booleans = Array(true, false))
def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = { def testConsumerGroupDescribe(includeAuthorizedOperations: Boolean): Unit = {