mirror of https://github.com/apache/kafka.git
MINOR: Update `config/producer.properties` to have new producer properties
Also include some trivial clean-ups in `ProducerConfig`and `BaseProducer`. Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Gwen Shapira Closes #710 from ijuma/use-new-producer-properties-in-config
This commit is contained in:
parent
b0b3e5aebf
commit
23d607dc21
|
@ -130,7 +130,7 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
|
|
||||||
/** <code>max.request.size</code> */
|
/** <code>max.request.size</code> */
|
||||||
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
|
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
|
||||||
private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record "
|
private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request in bytes. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record "
|
||||||
+ "batches the producer will send in a single request to avoid sending huge requests.";
|
+ "batches the producer will send in a single request to avoid sending huge requests.";
|
||||||
|
|
||||||
/** <code>reconnect.backoff.ms</code> */
|
/** <code>reconnect.backoff.ms</code> */
|
||||||
|
@ -143,7 +143,7 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
|
public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
|
||||||
private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
|
private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
|
||||||
+ "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
|
+ "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a record is sent and the buffer space is full.";
|
||||||
|
|
||||||
/** <code>retries</code> */
|
/** <code>retries</code> */
|
||||||
public static final String RETRIES_CONFIG = "retries";
|
public static final String RETRIES_CONFIG = "retries";
|
||||||
|
|
|
@ -18,36 +18,28 @@
|
||||||
|
|
||||||
# list of brokers used for bootstrapping knowledge about the rest of the cluster
|
# list of brokers used for bootstrapping knowledge about the rest of the cluster
|
||||||
# format: host1:port1,host2:port2 ...
|
# format: host1:port1,host2:port2 ...
|
||||||
metadata.broker.list=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
|
|
||||||
|
# specify the compression codec for all data generated: none, gzip, snappy, lz4
|
||||||
|
compression.type=none
|
||||||
|
|
||||||
# name of the partitioner class for partitioning events; default partition spreads data randomly
|
# name of the partitioner class for partitioning events; default partition spreads data randomly
|
||||||
#partitioner.class=
|
#partitioner.class=
|
||||||
|
|
||||||
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
|
# the maximum amount of time the client will wait for the response of a request
|
||||||
producer.type=sync
|
#request.timeout.ms=
|
||||||
|
|
||||||
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
|
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
|
||||||
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
|
#max.block.ms=
|
||||||
compression.codec=none
|
|
||||||
|
|
||||||
# message encoder
|
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
|
||||||
serializer.class=kafka.serializer.DefaultEncoder
|
#linger.ms=
|
||||||
|
|
||||||
# allow topic level compression
|
# the maximum size of a request in bytes
|
||||||
#compressed.topics=
|
#max.request.size=
|
||||||
|
|
||||||
############################# Async Producer #############################
|
# the default batch size in bytes when batching multiple records sent to a partition
|
||||||
# maximum time, in milliseconds, for buffering data on the producer queue
|
#batch.size=
|
||||||
#queue.buffering.max.ms=
|
|
||||||
|
|
||||||
# the maximum size of the blocking queue for buffering on the producer
|
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
|
||||||
#queue.buffering.max.messages=
|
#buffer.memory=
|
||||||
|
|
||||||
# Timeout for event enqueue:
|
|
||||||
# 0: events will be enqueued immediately or dropped if the queue is full
|
|
||||||
# -ve: enqueue will block indefinitely if the queue is full
|
|
||||||
# +ve: enqueue will block up to this many milliseconds if the queue is full
|
|
||||||
#queue.enqueue.timeout.ms=
|
|
||||||
|
|
||||||
# the number of messages batched at the producer
|
|
||||||
#batch.num.messages=
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ trait BaseProducer {
|
||||||
}
|
}
|
||||||
|
|
||||||
class NewShinyProducer(producerProps: Properties) extends BaseProducer {
|
class NewShinyProducer(producerProps: Properties) extends BaseProducer {
|
||||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
|
||||||
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
|
||||||
|
|
||||||
// decide whether to send synchronously based on producer properties
|
// decide whether to send synchronously based on producer properties
|
||||||
|
@ -51,7 +51,6 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer {
|
||||||
}
|
}
|
||||||
|
|
||||||
class OldProducer(producerProps: Properties) extends BaseProducer {
|
class OldProducer(producerProps: Properties) extends BaseProducer {
|
||||||
import kafka.producer.{KeyedMessage, ProducerConfig}
|
|
||||||
|
|
||||||
// default to byte array partitioner
|
// default to byte array partitioner
|
||||||
if (producerProps.getProperty("partitioner.class") == null)
|
if (producerProps.getProperty("partitioner.class") == null)
|
||||||
|
|
Loading…
Reference in New Issue