KAFKA-6627: Prevent config default values overriding ones specified through --producer-property on command line. (#6084)

* KAFKA-6627: Prevent config default values overriding ones specified through --producer-property on command line.

In Console{Producer,Consumer}, extraProducerProps (options specified in
--producer-property) is applied first, then overriden unconditionally,
even if the value is not specified explicitly (and default value is
used). This patch fixes it so that it doesn't override the existing
value set by --producer-property if it is not explicitly specified.

The contribution is my original work and I license the work to the
project under the project's open source license.

Reviewers: Sriharsha Chintalapani <sriharsha@apache.org>
This commit is contained in:
Kan Li 2019-01-11 10:29:10 -08:00 committed by Harsha
parent b4bf6232c2
commit 694da1ac1e
4 changed files with 187 additions and 22 deletions

View File

@ -152,7 +152,8 @@ object ConsoleConsumer extends Logging {
props ++= config.extraConsumerProps
setAutoOffsetResetValue(config, props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
CommandLineUtils.maybeMergeOptions(
props, ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.options, config.isolationLevelOpt)
props
}
@ -300,7 +301,6 @@ object ConsoleConsumer extends Logging {
val bootstrapServer = options.valueOf(bootstrapServerOpt)
val keyDeserializer = options.valueOf(keyDeserializerOpt)
val valueDeserializer = options.valueOf(valueDeserializerOpt)
val isolationLevel = options.valueOf(isolationLevelOpt).toString
val formatter: MessageFormatter = messageFormatterClass.getDeclaredConstructor().newInstance().asInstanceOf[MessageFormatter]
if (keyDeserializer != null && !keyDeserializer.isEmpty) {

View File

@ -91,20 +91,31 @@ object ConsoleProducer {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.maxBlockMs.toString)
props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.requestTimeoutMs.toString)
props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.LINGER_MS_CONFIG, config.options, config.sendTimeoutOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.ACKS_CONFIG, config.options, config.requestRequiredAcksOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.options, config.requestTimeoutMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.RETRIES_CONFIG, config.options, config.messageSendMaxRetriesOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.options, config.retryBackoffMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.SEND_BUFFER_CONFIG, config.options, config.socketBufferSizeOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.BUFFER_MEMORY_CONFIG, config.options, config.maxMemoryBytesOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.BATCH_SIZE_CONFIG, config.options, config.maxPartitionMemoryBytesOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.METADATA_MAX_AGE_CONFIG, config.options, config.metadataExpiryMsOpt)
CommandLineUtils.maybeMergeOptions(
props, ProducerConfig.MAX_BLOCK_MS_CONFIG, config.options, config.maxBlockMsOpt)
props
}
@ -218,19 +229,9 @@ object ConsoleProducer {
else compressionCodecOptionValue
else NoCompressionCodec.name
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
val readerClass = options.valueOf(messageReaderOpt)
val socketBuffer = options.valueOf(socketBufferSizeOpt)
val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
val maxBlockMs = options.valueOf(maxBlockMsOpt)
}
class LineMessageReader extends MessageReader {

View File

@ -96,4 +96,21 @@ object CommandLineUtils extends Logging {
}
props
}
/**
* Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low:
* 1) if {@code spec} is specified on {@code options} explicitly, use the value;
* 2) if {@code props} already has {@code key} set, keep it;
* 3) otherwise, use the default value of {@code spec}.
* A {@code null} value means to remove {@code key} from the {@code props}.
*/
def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet, spec: OptionSpec[V]) {
if (options.has(spec) || !props.containsKey(key)) {
val value = options.valueOf(spec)
if (value == null)
props.remove(key)
else
props.put(key, value.toString)
}
}
}

View File

@ -17,6 +17,9 @@
package kafka.utils
import java.util.Properties
import joptsimple.{OptionParser, OptionSpec}
import org.junit.Assert._
import org.junit.Test
@ -73,4 +76,148 @@ class CommandLineUtilsTest {
assertEquals("Value of second property should be 'thi=rd'", props.getProperty("third.property"), "thi=rd")
}
val props = new Properties()
val parser = new OptionParser(false)
var stringOpt : OptionSpec[String] = _
var intOpt : OptionSpec[java.lang.Integer] = _
var stringOptOptionalArg : OptionSpec[String] = _
var intOptOptionalArg : OptionSpec[java.lang.Integer] = _
var stringOptOptionalArgNoDefault : OptionSpec[String] = _
var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _
def setUpOptions(): Unit = {
stringOpt = parser.accepts("str")
.withRequiredArg
.ofType(classOf[String])
.defaultsTo("default-string")
intOpt = parser.accepts("int")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(100)
stringOptOptionalArg = parser.accepts("str-opt")
.withOptionalArg
.ofType(classOf[String])
.defaultsTo("default-string-2")
intOptOptionalArg = parser.accepts("int-opt")
.withOptionalArg
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
.withOptionalArg
.ofType(classOf[String])
intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
.withOptionalArg
.ofType(classOf[java.lang.Integer])
}
@Test
def testMaybeMergeOptionsOverwriteExisting(): Unit = {
setUpOptions()
props.put("skey", "existing-string")
props.put("ikey", "300")
props.put("sokey", "existing-string-2")
props.put("iokey", "400")
props.put("sondkey", "existing-string-3")
props.put("iondkey", "500")
val options = parser.parse(
"--str", "some-string",
"--int", "600",
"--str-opt", "some-string-2",
"--int-opt", "700",
"--str-opt-nodef", "some-string-3",
"--int-opt-nodef", "800",
)
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("some-string", props.get("skey"))
assertEquals("600", props.get("ikey"))
assertEquals("some-string-2", props.get("sokey"))
assertEquals("700", props.get("iokey"))
assertEquals("some-string-3", props.get("sondkey"))
assertEquals("800", props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = {
setUpOptions()
props.put("sokey", "existing-string")
props.put("iokey", "300")
props.put("sondkey", "existing-string-2")
props.put("iondkey", "400")
val options = parser.parse(
"--str-opt",
"--int-opt",
"--str-opt-nodef",
"--int-opt-nodef",
)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("default-string-2", props.get("sokey"))
assertEquals("200", props.get("iokey"))
assertNull(props.get("sondkey"))
assertNull(props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = {
setUpOptions()
val options = parser.parse()
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("default-string", props.get("skey"))
assertEquals("100", props.get("ikey"))
assertEquals("default-string-2", props.get("sokey"))
assertEquals("200", props.get("iokey"))
assertNull(props.get("sondkey"))
assertNull(props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsNotOverwriteExisting(): Unit = {
setUpOptions()
props.put("skey", "existing-string")
props.put("ikey", "300")
props.put("sokey", "existing-string-2")
props.put("iokey", "400")
props.put("sondkey", "existing-string-3")
props.put("iondkey", "500")
val options = parser.parse()
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("existing-string", props.get("skey"))
assertEquals("300", props.get("ikey"))
assertEquals("existing-string-2", props.get("sokey"))
assertEquals("400", props.get("iokey"))
assertEquals("existing-string-3", props.get("sondkey"))
assertEquals("500", props.get("iondkey"))
}
}