KAFKA-5588: Remove deprecated --new-consumer tools option (#5097)

Reviewers: Viktor Somogyi <viktorsomogyi@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>, Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Paolo Patierno 2018-06-06 05:28:03 +02:00 committed by Ismael Juma
parent a23b28726e
commit be8808dd4b
11 changed files with 142 additions and 52 deletions

View File

@ -889,8 +889,6 @@ object ConsumerGroupCommand extends Logging {
"Pass in just a topic to delete the given topic's partition offsets and ownership information " +
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
val NewConsumerDoc = "Use the new consumer implementation. This is the default, so this option is deprecated and " +
"will be removed in a future release."
val TimeoutMsDoc = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, " +
"or is going through some changes)."
@ -943,7 +941,6 @@ object ConsumerGroupCommand extends Logging {
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
.withRequiredArg
.describedAs("timeout (ms)")
@ -1011,16 +1008,9 @@ object ConsumerGroupCommand extends Logging {
if (useOldConsumer) {
if (options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.")
else if (options.has(newConsumerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
} else {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (options.has(newConsumerOpt)) {
Console.err.println(s"The --new-consumer option is deprecated and will be removed in a future major release. " +
s"The new consumer is used by default if the --bootstrap-server option is provided.")
}
if (options.has(deleteOpt) && options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, s"When deleting a consumer group the option $topicOpt is only " +
s"valid with $zkConnectOpt. The new consumer does not support topic-specific offset deletion from a consumer group.")

View File

@ -324,8 +324,6 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default, so " +
"this option is deprecated and will be removed in a future release.")
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED (unless old consumer is used): The server to connect to.")
.withRequiredArg
.describedAs("server to connect to")
@ -397,8 +395,6 @@ object ConsoleConsumer extends Logging {
if (useOldConsumer) {
if (options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is not valid with $zkConnectOpt.")
else if (options.has(newConsumerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
@ -449,11 +445,6 @@ object ConsoleConsumer extends Logging {
if (!useOldConsumer) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
if (options.has(newConsumerOpt)) {
Console.err.println("The --new-consumer option is deprecated and will be removed in a future major release. " +
"The new consumer is used by default if the --bootstrap-server option is provided.")
}
}
if (options.has(csvMetricsReporterEnabledOpt)) {

View File

@ -296,8 +296,6 @@ object ConsumerPerformance extends LazyLogging {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default, so " +
"this option is deprecated and will be removed in a future release.")
val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg
.describedAs("config file")
@ -325,11 +323,6 @@ object ConsumerPerformance extends LazyLogging {
if (!useOldConsumer) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt)
if (options.has(newConsumerOpt)) {
Console.err.println("The --new-consumer option is deprecated and will be removed in a future major release. " +
"The new consumer is used by default if the --broker-list option is provided.")
}
import org.apache.kafka.clients.consumer.ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
@ -342,8 +335,7 @@ object ConsumerPerformance extends LazyLogging {
} else {
if (options.has(bootstrapServersOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid with $zkConnectOpt.")
else if (options.has(newConsumerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt)
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)

View File

@ -16,6 +16,7 @@
*/
package unit.kafka.admin
import joptsimple.OptionException
import kafka.admin.ConsumerGroupCommandTest
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.Errors
@ -24,12 +25,11 @@ import org.junit.Test
class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
@Test(expected = classOf[joptsimple.OptionException])
@Test(expected = classOf[OptionException])
def testDeleteWithTopicOption() {
TestUtils.createOffsetsTopic(zkClient, servers)
val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", group, "--topic")
getConsumerGroupService(cgcArgs)
fail("Expected an error due to presence of mutually exclusive options")
}
@Test
@ -222,4 +222,10 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest {
result.size == 1 &&
result.keySet.contains(group) && result.get(group).contains(Errors.COORDINATOR_NOT_AVAILABLE))
}
@Test(expected = classOf[OptionException])
def testDeleteWithUnrecognizedNewConsumerOption() {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--delete", "--group", group)
getConsumerGroupService(cgcArgs)
}
}

View File

@ -16,6 +16,7 @@
*/
package kafka.admin
import joptsimple.OptionException
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.RoundRobinAssignor
import org.apache.kafka.common.TopicPartition
@ -112,12 +113,11 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
@Test(expected = classOf[joptsimple.OptionException])
@Test(expected = classOf[OptionException])
def testDescribeWithMultipleSubActions() {
TestUtils.createOffsetsTopic(zkClient, servers)
val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group, "--members", "--state")
getConsumerGroupService(cgcArgs)
fail("Expected an error due to presence of mutually exclusive options")
}
@Test
@ -662,6 +662,12 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
}
}
@Test(expected = classOf[joptsimple.OptionException])
def testDescribeWithUnrecognizedNewConsumerOption() {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--describe", "--group", group)
getConsumerGroupService(cgcArgs)
fail("Expected an error due to presence of unrecognized --new-consumer option")
}
}

View File

@ -18,6 +18,7 @@ package kafka.admin
import java.util.Properties
import joptsimple.OptionException
import org.junit.Test
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
@ -86,4 +87,9 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
}, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
}
@Test(expected = classOf[OptionException])
def testListWithUnrecognizedNewConsumerOption() {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--list")
getConsumerGroupService(cgcArgs)
}
}

View File

@ -16,6 +16,7 @@ import java.io.{BufferedWriter, File, FileWriter}
import java.text.{ParseException, SimpleDateFormat}
import java.util.{Calendar, Date, Properties}
import joptsimple.OptionException
import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
@ -335,6 +336,13 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
adminZkClient.deleteTopic(topic)
}
@Test(expected = classOf[OptionException])
def testResetWithUnrecognizedNewConsumerOption() {
val cgcArgs = Array("--new-consumer", "--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics",
"--to-offset", "2", "--export")
getConsumerGroupService(cgcArgs)
}
private def produceMessages(topic: String, numMessages: Int): Unit = {
val records = (0 until numMessages).map(_ => new ProducerRecord[Array[Byte], Array[Byte]](topic,
new Array[Byte](100 * 1000)))

View File

@ -149,8 +149,7 @@ class ConsoleConsumerTest {
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning",
"--new-consumer") //new
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
@ -169,8 +168,7 @@ class ConsoleConsumerTest {
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--partition", "0",
"--offset", "3",
"--new-consumer") //new
"--offset", "3")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
@ -185,6 +183,25 @@ class ConsoleConsumerTest {
}
@Test(expected = classOf[IllegalArgumentException])
def shouldExitOnUnrecognizedNewConsumerOption(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
val args: Array[String] = Array(
"--new-consumer",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
//When
try {
new ConsoleConsumer.ConsumerConfig(args)
} finally {
Exit.resetExitProcedure()
}
}
@Test
def testDefaultConsumer() {
//Given
@ -200,6 +217,21 @@ class ConsoleConsumerTest {
assertFalse(config.useOldConsumer)
}
@Test
def testNewConsumerRemovedOption() {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
//Then
assertFalse(config.useOldConsumer)
}
@Test
def shouldParseValidNewSimpleConsumerValidConfigWithStringOffset() {
//Given
@ -208,7 +240,6 @@ class ConsoleConsumerTest {
"--topic", "test",
"--partition", "0",
"--offset", "LatEst",
"--new-consumer", //new
"--property", "print.value=false")
//When
@ -366,9 +397,6 @@ class ConsoleConsumerTest {
@Test(expected = classOf[IllegalArgumentException])
def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningNewConsumer() {
// Override exit procedure to throw an exception instead of exiting, so we can catch the exit
// properly for this test case
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
@ -384,15 +412,10 @@ class ConsoleConsumerTest {
} finally {
Exit.resetExitProcedure()
}
fail("Expected consumer property construction to fail due to inconsistent reset options")
}
@Test(expected = classOf[IllegalArgumentException])
def shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningOldConsumer() {
// Override exit procedure to throw an exception instead of exiting, so we can catch the exit
// properly for this test case
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
//Given
@ -408,8 +431,6 @@ class ConsoleConsumerTest {
} finally {
Exit.resetExitProcedure()
}
fail("Expected consumer property construction to fail due to inconsistent reset options")
}
@Test

View File

@ -20,7 +20,8 @@ package kafka.tools
import java.io.ByteArrayOutputStream
import java.text.SimpleDateFormat
import org.junit.Assert.assertEquals
import joptsimple.OptionException
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.Test
class ConsumerPerformanceTest {
@ -45,6 +46,57 @@ class ConsumerPerformanceTest {
s"${dateFormat.format(System.currentTimeMillis)}, 1.0, 1.0, 1, 1.0"))
}
@Test
def testConfigUsingNewConsumer(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertFalse(config.useOldConsumer)
assertEquals("localhost:9092", config.options.valueOf(config.bootstrapServersOpt))
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@Test
def testConfigUsingOldConsumer() {
//Given
val args: Array[String] = Array(
"--zookeeper", "localhost:2181",
"--topic", "test",
"--messages", "10")
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertTrue(config.useOldConsumer)
assertEquals("localhost:2181", config.options.valueOf(config.zkConnectOpt))
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
@Test(expected = classOf[OptionException])
def testConfigUsingNewConsumerUnrecognizedOption(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--new-consumer"
)
//When
new ConsumerPerformance.ConsumerPerfConfig(args)
}
private def testHeaderMatchContent(detailed: Boolean, useOldConsumer: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = {
Console.withOut(outContent) {
ConsumerPerformance.printHeader(detailed, useOldConsumer)

View File

@ -98,6 +98,14 @@
will be removed in a future version.</li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> finally removes
the <code>--new-consumer</code> option for all consumer based tools as <code>kafka-console-consumer</code>, <code>kafka-consumer-perf-test</code>
and <code>kafka-consumer-groups</code>.
The new consumer is automatically used if the bootstrap servers list is provided on the command line
otherwise, when the zookeeper connection is provided, the old consumer is used.
The <code>--new-consumer</code> option had already been ignored as the way of selecting the consumer since Kafka 1.0.0,
this KIP just removes the option.
</li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>

View File

@ -30,7 +30,7 @@ from kafkatest.services.kafka import config_property
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import DEV_BRANCH
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
Port = collections.namedtuple('Port', ['name', 'number', 'open'])
@ -584,8 +584,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
command_config = "--command-config " + command_config
if new_consumer:
cmd = "%s --new-consumer --bootstrap-server %s %s --list" % \
new_consumer_opt = ""
if node.version <= LATEST_0_10_0:
new_consumer_opt = "--new-consumer"
cmd = "%s %s --bootstrap-server %s %s --list" % \
(consumer_group_script,
new_consumer_opt,
self.bootstrap_servers(self.security_protocol),
command_config)
else:
@ -611,8 +615,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
command_config = "--command-config " + command_config
if new_consumer:
cmd = "%s --new-consumer --bootstrap-server %s %s --group %s --describe" % \
(consumer_group_script, self.bootstrap_servers(self.security_protocol), command_config, group)
new_consumer_opt = ""
if node.version <= LATEST_0_10_0:
new_consumer_opt = "--new-consumer"
cmd = "%s %s --bootstrap-server %s %s --group %s --describe" % \
(consumer_group_script,
new_consumer_opt,
self.bootstrap_servers(self.security_protocol),
command_config, group)
else:
cmd = "%s --zookeeper %s %s --group %s --describe" % \
(consumer_group_script, self.zk_connect_setting(), command_config, group)