diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 627bd1054c3..439099950b5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -148,6 +148,8 @@ object ConsoleConsumer extends Logging { props ++= config.extraConsumerProps setAutoOffsetResetValue(config, props) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) + if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer") CommandLineUtils.maybeMergeOptions( props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt) props diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 44324e898c3..89428e5663b 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -288,6 +288,8 @@ object ConsumerPerformance extends LazyLogging { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false") + if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client") val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index a87800aa134..9a8b734cbe2 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -604,4 +604,37 @@ class ConsoleConsumerTest { try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) finally Exit.resetExitProcedure() } + + @Test + def testClientIdOverride(): Unit = { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning", + "--consumer-property", "client.id=consumer-1") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.consumerProps(config) + + //Then + assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) + } + + @Test + def testDefaultClientId(): Unit = { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + val consumerProperties = ConsoleConsumer.consumerProps(config) + + //Then + assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) + } } diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 1ead9af1e2f..84aafa18ea0 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -115,4 +115,12 @@ class ConsoleProducerTest { assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) } + + @Test + def testDefaultClientId(): Unit = { + val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals("console-producer", + producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) + } } diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala index acaed483a5b..3cd31935acd 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala @@ -17,10 +17,10 @@ package kafka.tools -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayOutputStream, File, PrintWriter} import java.text.SimpleDateFormat - import kafka.utils.Exit +import org.apache.kafka.clients.consumer.ConsumerConfig import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Test @@ -111,6 +111,43 @@ class ConsumerPerformanceTest { finally Exit.resetExitProcedure() } + @Test + def testClientIdOverride(): Unit = { + val consumerConfigFile = File.createTempFile("test_consumer_config",".conf") + consumerConfigFile.deleteOnExit() + new PrintWriter(consumerConfigFile.getPath) { write("client.id=consumer-1"); close() } + + //Given + val args: Array[String] = Array( + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--consumer.config", consumerConfigFile.getPath + ) + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertEquals("consumer-1", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) + } + + @Test + def testDefaultClientId(): Unit = { + //Given + val args: Array[String] = Array( + "--broker-list", "localhost:9092", + "--topic", "test", + "--messages", "10" + ) + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertEquals("perf-consumer-client", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG)) + } + private def testHeaderMatchContent(detailed: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = { Console.withOut(outContent) { ConsumerPerformance.printHeader(detailed) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 1f47bbcf3e9..6967a16fa6b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -193,6 +193,9 @@ public class ProducerPerformance { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); if (transactionsEnabled) props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null) { + props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-producer-client"); + } return props; } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index be037bd9539..4c479b6497a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -84,7 +84,7 @@ public class ProducerPerformanceTest { Properties prop = ProducerPerformance.readProps(producerProps, producerConfig, transactionalId, transactionsEnabled); assertNotNull(prop); - assertEquals(5, prop.size()); + assertEquals(6, prop.size()); } @Test @@ -154,4 +154,24 @@ public class ProducerPerformanceTest { IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random)); assertEquals("no payload File Path or record Size provided", thrown.getMessage()); } + + @Test + public void testClientIdOverride() throws Exception { + List producerProps = Collections.singletonList("client.id=producer-1"); + + Properties prop = ProducerPerformance.readProps(producerProps, null, "1234", true); + + assertNotNull(prop); + assertEquals("producer-1", prop.getProperty("client.id")); + } + + @Test + public void testDefaultClientId() throws Exception { + List producerProps = Collections.singletonList("acks=1"); + + Properties prop = ProducerPerformance.readProps(producerProps, null, "1234", true); + + assertNotNull(prop); + assertEquals("perf-producer-client", prop.getProperty("client.id")); + } }