KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config

Author: Brian Byrne <bbyrne@confluent.io>

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Boyang Chan <boyang@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #8717 from bdbyrne/KAFKA-10033
This commit is contained in:
Brian Byrne 2020-06-06 21:04:04 +05:30 committed by Manikumar Reddy
parent 4eb3f75556
commit dd7c036956
2 changed files with 37 additions and 6 deletions

View File

@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig,
private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean,
configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = {
val topic = resource.name
if (!metadataCache.contains(topic))
throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {

View File

@ -18,18 +18,22 @@ package kafka.server
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig._
import kafka.utils._
import kafka.server.Constants._
import org.junit.Assert._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.metrics.Quota
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@ -202,6 +206,23 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
}
}
@Test
def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
val topic = TestUtils.tempTopic
val admin = createAdminClient()
try {
val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "10000"), AlterConfigOp.OpType.SET)
admin.incrementalAlterConfigs(Map(resource -> List(op).asJavaCollection).asJava).all.get
fail("Should fail with UnknownTopicOrPartitionException for topic doesn't exist")
} catch {
case e: ExecutionException =>
assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
} finally {
admin.close()
}
}
@Test
def testProcessNotification(): Unit = {
val props = new Properties()
@ -314,4 +335,11 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp, value), 102, LeaderReplicationThrottledReplicasProp)
}
private def createAdminClient(): Admin = {
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
Admin.create(props)
}
}