KAFKA-1961 Prevent deletion of _consumer_offsets topic; reviewed by Neha Narkhede, Gwen Shapira and Jun Rao

This commit is contained in:
Ted Malaska 2015-04-03 11:43:52 -07:00 committed by Neha Narkhede
parent ad722531da
commit 48f9970472
2 changed files with 52 additions and 7 deletions

View File

@ -19,7 +19,7 @@ package kafka.admin
import joptsimple._
import java.util.Properties
import kafka.common.AdminCommandFailedException
import kafka.common.{Topic, AdminCommandFailedException}
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@ -143,15 +143,21 @@ object TopicCommand {
}
topics.foreach { topic =>
try {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
if (Topic.InternalTopics.contains(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic));
} else {
ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
case e: ZkNodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e2: Throwable =>
case e: AdminOperationException =>
throw e
case e: Throwable =>
throw new AdminOperationException("Error while deleting topic %s".format(topic))
}
}
}
}

View File

@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.KafkaConfig
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
@ -60,4 +60,43 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
@Test
def testTopicDeletion() {
val normalTopic = "test"
val numPartitionsOriginal = 1
// create brokers
val brokers = List(0, 1, 2)
TestUtils.createBrokersInZk(zkClient, brokers)
// create the NormalTopic
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", normalTopic))
TopicCommand.createTopic(zkClient, createOpts)
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
val deletePath = ZkUtils.getDeleteTopicPath(normalTopic)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deletePath))
TopicCommand.deleteTopic(zkClient, deleteOpts)
assertTrue("Delete path for topic should exist after deletion.", zkClient.exists(deletePath))
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", OffsetManager.OffsetsTopicName))
TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
// try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName))
val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
}
assertFalse("Delete path for topic shouldn't exist after deletion.", zkClient.exists(deleteOffsetTopicPath))
}
}