Revert "KAFKA-7077: Use default producer settings in Connect Worker (#11475)" (#11932)

This reverts commit 76cf7a5793.

Connect already allows users to enable idempotent producers for connectors and the Connect workers. Although Kafka producers enabled idempotency by default in 3.0, due to compatibility requirements and the fact that [KIP-318](https://cwiki.apache.org/confluence/display/KAFKA/KIP-318%3A+Make+Kafka+Connect+Source+idempotent) hasn't been explicitly approved, the changes here are reverted. A separate commit will explicitly disable idempotency in producers instantiated by Connect by default until KIP-318 is approved and scheduled for release.
This commit is contained in:
Konstantine Karantasis 2022-03-22 17:19:29 -07:00 committed by GitHub
parent a44e1ed449
commit b60f4464ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 4 additions and 0 deletions

View File

@ -648,6 +648,8 @@ public class Worker {
// These settings will execute infinite retries on retriable exceptions. They *may* be overridden via configs passed to the worker,
// but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, defaultClientId);
// User-specified overrides

View File

@ -213,6 +213,8 @@ public class WorkerTest extends ThreadedTest {
defaultProducerConfigs.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
defaultProducerConfigs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(Long.MAX_VALUE));
defaultProducerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
defaultProducerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
defaultProducerConfigs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
defaultConsumerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");