diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9a5889210c7..647fc77db67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -304,7 +304,7 @@ public class ProducerConfig extends AbstractConfig { Type.STRING, "all", in("all", "-1", "0", "1"), - Importance.HIGH, + Importance.LOW, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index f055e12c231..2784f19c025 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -172,6 +172,89 @@ public class KafkaProducerTest { config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG))); } + @Test + public void testAcksAndIdempotenceForIdempotentProducers() { + Properties baseProps = new Properties() {{ + setProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + setProperty( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + setProperty( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + }}; + + Properties validProps = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "0"); + setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + }}; + ProducerConfig config = new ProducerConfig(validProps); + assertFalse( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be overwritten"); + assertEquals( + "0", + config.getString(ProducerConfig.ACKS_CONFIG), + "acks should be overwritten"); + + Properties validProps2 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); + }}; + config = new ProducerConfig(validProps2); + assertTrue( + config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be set with the default value"); + assertEquals( + "-1", + config.getString(ProducerConfig.ACKS_CONFIG), + "acks should be set with the default value"); + + Properties validProps3 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "all"); + setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + }}; + config = new ProducerConfig(validProps3); + assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), + "idempotence should be overwritten"); + assertEquals( + "-1", + config.getString(ProducerConfig.ACKS_CONFIG), + "acks should be overwritten"); + + Properties invalidProps = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "0"); + setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId"); + }}; + assertThrows( + ConfigException.class, + () -> new ProducerConfig(invalidProps), + "Cannot set a transactional.id without also enabling idempotence"); + + Properties invalidProps2 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "1"); + setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + }}; + assertThrows( + ConfigException.class, + () -> new ProducerConfig(invalidProps2), + "Must set acks to all in order to use the idempotent producer"); + + Properties invalidProps3 = new Properties() {{ + putAll(baseProps); + setProperty(ProducerConfig.ACKS_CONFIG, "0"); + setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + }}; + assertThrows( + ConfigException.class, + () -> new ProducerConfig(invalidProps3), + "Must set acks to all in order to use the idempotent producer"); + } + @Test public void testMetricsReporterAutoGeneratedClientId() { Properties props = new Properties(); diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py index 55a928f7644..1aa21099521 100644 --- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -49,12 +49,14 @@ class TestVerifiableProducer(Test): @parametrize(producer_version=str(LATEST_0_9)) @parametrize(producer_version=str(LATEST_0_10_0)) @parametrize(producer_version=str(LATEST_0_10_1)) + @matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False]) + @matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True]) @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all) @cluster(num_nodes=4) @matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'], metadata_quorum=quorum.all) - def test_simple_run(self, producer_version, security_protocol = 'PLAINTEXT', sasl_mechanism='PLAIN', - metadata_quorum=quorum.zk): + def test_simple_run(self, producer_version, acks=None, enable_idempotence=False, security_protocol = 'PLAINTEXT', + sasl_mechanism='PLAIN', metadata_quorum=quorum.zk): """ Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and verify that we can produce a small number of messages. @@ -72,6 +74,8 @@ class TestVerifiableProducer(Test): self.kafka.start() node = self.producer.nodes[0] + self.producer.enable_idempotence = enable_idempotence + self.producer.acks = acks node.version = KafkaVersion(producer_version) self.producer.start() wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,