KAFKA-12361; Use default request.timeout.ms value for Connect producers (#10178)

Connect uses a high request timeout as a holdover from the days prior to KIP-91 when this was required to guarantee records would not get timed out in the accumulator. Having a high request timeout makes it harder for the producer to gracefully handle unclean connection terminations, which might happen in the case of sudden broker death.

Reducing that value to the default of 30 seconds should address that issue, without compromising the existing delivery guarantees of the Connect framework. Since the delivery timeout is still set to a very-high value, this change shouldn't make it more likely for `Producer::send` to throw an exception and fail the task.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Chris Egerton 2021-02-23 21:32:42 -05:00 committed by GitHub
parent 16d1439fc4
commit bc04c335fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 0 additions and 3 deletions

View File

@ -660,7 +660,6 @@ public class Worker {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

View File

@ -182,7 +182,6 @@ public class WorkerTest extends ThreadedTest {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

View File

@ -175,7 +175,6 @@ public class WorkerWithTopicCreationTest extends ThreadedTest {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");