diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index acafe119e77..23dd02bda98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -93,8 +93,11 @@ public class ProducerConfig extends AbstractConfig { + "
"
+ "Note: This setting gives the upper bound of the batch size to be sent. If we have fewer than this many bytes accumulated "
+ "for this partition, we will 'linger' for the linger.ms
time waiting for more records to show up. "
- + "This linger.ms
setting defaults to 0, which means we'll immediately send out a record even the accumulated "
- + "batch size is under this batch.size
setting.";
+ + "This linger.ms
setting defaults to 5, which means the producer will wait for 5ms or until the record batch is "
+ + "of batch.size
(whichever happens first) before sending the record batch. Note that broker backpressure can "
+ + " result in a higher effective linger time than this setting."
+ + "The default changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in "
+ + "similar or lower producer latency despite the increased linger.";
/** partitioner.adaptive.partitioning.enable
*/
public static final String PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG = "partitioner.adaptive.partitioning.enable";
@@ -147,8 +150,10 @@ public class ProducerConfig extends AbstractConfig {
+ "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
+ "we get " + BATCH_SIZE_CONFIG + "
worth of records for a partition it will be sent immediately regardless of this "
+ "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
- + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting " + LINGER_MS_CONFIG + "=5
, "
- + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.";
+ + "specified time waiting for more records to show up. This setting defaults to 5 (i.e. 5ms delay). Increasing " + LINGER_MS_CONFIG + "=50
, "
+ + "for example, would have the effect of reducing the number of requests sent but would add up to 50ms of latency to records sent in the absence of load."
+ + "The default changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in "
+ + "similar or lower producer latency despite the increased linger.";
/** request.timeout.ms
*/
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
@@ -383,7 +388,7 @@ public class ProducerConfig extends AbstractConfig {
.define(PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG, Type.BOOLEAN, true, Importance.LOW, PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_DOC)
.define(PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.LOW, PARTITIONER_AVAILABILITY_TIMEOUT_MS_DOC)
.define(PARTITIONER_IGNORE_KEYS_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, PARTITIONER_IGNORE_KEYS_DOC)
- .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
+ .define(LINGER_MS_CONFIG, Type.LONG, 5, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
.define(DELIVERY_TIMEOUT_MS_CONFIG, Type.INT, 120 * 1000, atLeast(0), Importance.MEDIUM, DELIVERY_TIMEOUT_MS_DOC)
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(CommonClientConfigs.SEND_BUFFER_LOWER_BOUND), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 8d3e76e7448..0ee52530e57 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -34,6 +34,7 @@ import scala.jdk.CollectionConverters._
class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
val producerCount: Int = 1
val brokerCount: Int = 2
+ val defaultLingerMs: Int = 5;
serverConfig.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 2.toString)
serverConfig.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, 2.toString)
@@ -41,7 +42,7 @@ class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000.toString)
- producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000.toString)
+ producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, (10000 + defaultLingerMs).toString)
/**
* Tests that Producer gets self-recovered when a topic is deleted mid-way of produce.
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8ea2791c5b1..66ff64f2cdc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -717,8 +717,9 @@ class TransactionsTest extends IntegrationTestHarness {
"kraft,consumer,false",
))
def testBumpTransactionalEpochWithTV2Disabled(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
+ val defaultLinger = 5;
val producer = createTransactionalProducer("transactionalProducer",
- deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
+ deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000)
val consumer = transactionalConsumers.head
try {
// Create a topic with RF=1 so that a single broker failure will render it unavailable
@@ -783,8 +784,9 @@ class TransactionsTest extends IntegrationTestHarness {
"kraft, consumer, true"
))
def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = {
+ val defaultLinger = 5;
val producer = createTransactionalProducer("transactionalProducer",
- deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)
+ deliveryTimeoutMs = 5000 + defaultLinger, requestTimeoutMs = 5000)
val consumer = transactionalConsumers.head
try {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 545e4359fdc..26c65f46603 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1349,7 +1349,8 @@ val configEntries = props.asScala.map { case (k, v) => new AlterConfigOp(new Con
private var _retries = Int.MaxValue
private var _acks = -1
private var _requestTimeoutMs = 30000
- private var _deliveryTimeoutMs = 30000
+ private val defaultLingerMs = 5;
+ private var _deliveryTimeoutMs = 30000 + defaultLingerMs
def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 315da4bb671..083df24eca1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -219,6 +219,9 @@
sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>, String)
method has been removed from the Producer API.
linger.ms
changed from 0 to 5 in Apache Kafka 4.0 as the efficiency gains from larger batches typically result in
+ similar or lower producer latency despite the increased linger.
+