KAFKA-10038: Supports default client.id for ConsoleConsumer, ProducerPerformance, ConsumerPerformance (#11297)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Yanwen(Jason) Lin 2021-09-07 13:49:50 -07:00 committed by GitHub
parent 9d107c174b
commit 66a27af2f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 108 additions and 3 deletions

View File

@ -148,6 +148,8 @@ object ConsoleConsumer extends Logging {
props ++= config.extraConsumerProps props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props) setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "console-consumer")
CommandLineUtils.maybeMergeOptions( CommandLineUtils.maybeMergeOptions(
props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt) props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
props props

View File

@ -288,6 +288,8 @@ object ConsumerPerformance extends LazyLogging {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer]) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false") props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null)
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-consumer-client")
val numThreads = options.valueOf(numThreadsOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt) val topic = options.valueOf(topicOpt)

View File

@ -604,4 +604,37 @@ class ConsoleConsumerTest {
try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args)) try assertThrows(classOf[IllegalArgumentException], () => new ConsoleConsumer.ConsumerConfig(args))
finally Exit.resetExitProcedure() finally Exit.resetExitProcedure()
} }
@Test
def testClientIdOverride(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning",
"--consumer-property", "client.id=consumer-1")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("consumer-1", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
@Test
def testDefaultClientId(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--from-beginning")
//When
val config = new ConsoleConsumer.ConsumerConfig(args)
val consumerProperties = ConsoleConsumer.consumerProps(config)
//Then
assertEquals("console-consumer", consumerProperties.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
} }

View File

@ -115,4 +115,12 @@ class ConsoleProducerTest {
assertEquals("producer-1", assertEquals("producer-1",
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG)) producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
} }
@Test
def testDefaultClientId(): Unit = {
val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
assertEquals("console-producer",
producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG))
}
} }

View File

@ -17,10 +17,10 @@
package kafka.tools package kafka.tools
import java.io.ByteArrayOutputStream import java.io.{ByteArrayOutputStream, File, PrintWriter}
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import kafka.utils.Exit import kafka.utils.Exit
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -111,6 +111,43 @@ class ConsumerPerformanceTest {
finally Exit.resetExitProcedure() finally Exit.resetExitProcedure()
} }
@Test
def testClientIdOverride(): Unit = {
val consumerConfigFile = File.createTempFile("test_consumer_config",".conf")
consumerConfigFile.deleteOnExit()
new PrintWriter(consumerConfigFile.getPath) { write("client.id=consumer-1"); close() }
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--consumer.config", consumerConfigFile.getPath
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("consumer-1", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
@Test
def testDefaultClientId(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
"--topic", "test",
"--messages", "10"
)
//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)
//Then
assertEquals("perf-consumer-client", config.props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG))
}
private def testHeaderMatchContent(detailed: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = { private def testHeaderMatchContent(detailed: Boolean, expectedOutputLineCount: Int, fun: () => Unit): Unit = {
Console.withOut(outContent) { Console.withOut(outContent) {
ConsumerPerformance.printHeader(detailed) ConsumerPerformance.printHeader(detailed)

View File

@ -193,6 +193,9 @@ public class ProducerPerformance {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
if (transactionsEnabled) props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); if (transactionsEnabled) props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
if (props.getProperty(ProducerConfig.CLIENT_ID_CONFIG) == null) {
props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-producer-client");
}
return props; return props;
} }

View File

@ -84,7 +84,7 @@ public class ProducerPerformanceTest {
Properties prop = ProducerPerformance.readProps(producerProps, producerConfig, transactionalId, transactionsEnabled); Properties prop = ProducerPerformance.readProps(producerProps, producerConfig, transactionalId, transactionsEnabled);
assertNotNull(prop); assertNotNull(prop);
assertEquals(5, prop.size()); assertEquals(6, prop.size());
} }
@Test @Test
@ -154,4 +154,24 @@ public class ProducerPerformanceTest {
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random)); IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, () -> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random));
assertEquals("no payload File Path or record Size provided", thrown.getMessage()); assertEquals("no payload File Path or record Size provided", thrown.getMessage());
} }
@Test
public void testClientIdOverride() throws Exception {
List<String> producerProps = Collections.singletonList("client.id=producer-1");
Properties prop = ProducerPerformance.readProps(producerProps, null, "1234", true);
assertNotNull(prop);
assertEquals("producer-1", prop.getProperty("client.id"));
}
@Test
public void testDefaultClientId() throws Exception {
List<String> producerProps = Collections.singletonList("acks=1");
Properties prop = ProducerPerformance.readProps(producerProps, null, "1234", true);
assertNotNull(prop);
assertEquals("perf-producer-client", prop.getProperty("client.id"));
}
} }