KAFKA-2791; removed deprecated properties

Removed support for BLOCK_ON_BUFFER_FULL_CONFIG (block.on.buffer.full)
Removed support for METADATA_FETCH_TIMEOUT_CONFIG
Removed support for TIMEOUT_CONFIG (aka timeout.ms)

Added support for MAX_BLOCK_MS_CONFIG
Added support for REQUEST_TIMEOUT_MS_CONFIG

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #525 from benstopford/KAFKA-2791
This commit is contained in:
Ben Stopford 2015-11-13 10:42:50 -08:00 committed by Jun Rao
parent 397306cdfe
commit 4efe4ac6d7
1 changed files with 6 additions and 8 deletions

View File

@ -112,13 +112,11 @@ object ConsoleProducer {
props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString)
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString)
props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.maxBlockMs.toString)
props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString)
props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.requestTimeoutMs.toString)
props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString)
props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString)
if(config.queueEnqueueTimeoutMs != -1)
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
@ -190,10 +188,10 @@ object ConsoleProducer {
.describedAs("metadata expiration interval")
.ofType(classOf[java.lang.Long])
.defaultsTo(5*60*1000L)
val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
"The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
val maxBlockMsOpt = parser.accepts("max-block-ms",
"The max time that the producer will block for during a send request")
.withRequiredArg
.describedAs("metadata fetch timeout")
.describedAs("max block on send")
.ofType(classOf[java.lang.Long])
.defaultsTo(60*1000L)
val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
@ -276,7 +274,7 @@ object ConsoleProducer {
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
val maxBlockMs = options.valueOf(maxBlockMsOpt)
}
trait MessageReader {