KAFKA-13026: Idempotent producer (KAFKA-10619) follow-up testings (#11002)

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Cheng Tan 2021-07-26 13:45:59 -07:00 committed by GitHub
parent c807980088
commit 8ed271e1fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 90 additions and 3 deletions

View File

@ -304,7 +304,7 @@ public class ProducerConfig extends AbstractConfig {
Type.STRING,
"all",
in("all", "-1", "0", "1"),
Importance.HIGH,
Importance.LOW,
ACKS_DOC)
.define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)

View File

@ -172,6 +172,89 @@ public class KafkaProducerTest {
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG)));
}
@Test
public void testAcksAndIdempotenceForIdempotentProducers() {
Properties baseProps = new Properties() {{
setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
setProperty(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
setProperty(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}};
Properties validProps = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "0");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
}};
ProducerConfig config = new ProducerConfig(validProps);
assertFalse(
config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
"idempotence should be overwritten");
assertEquals(
"0",
config.getString(ProducerConfig.ACKS_CONFIG),
"acks should be overwritten");
Properties validProps2 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
}};
config = new ProducerConfig(validProps2);
assertTrue(
config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
"idempotence should be set with the default value");
assertEquals(
"-1",
config.getString(ProducerConfig.ACKS_CONFIG),
"acks should be set with the default value");
Properties validProps3 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "all");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
}};
config = new ProducerConfig(validProps3);
assertFalse(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
"idempotence should be overwritten");
assertEquals(
"-1",
config.getString(ProducerConfig.ACKS_CONFIG),
"acks should be overwritten");
Properties invalidProps = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "0");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
}};
assertThrows(
ConfigException.class,
() -> new ProducerConfig(invalidProps),
"Cannot set a transactional.id without also enabling idempotence");
Properties invalidProps2 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "1");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
}};
assertThrows(
ConfigException.class,
() -> new ProducerConfig(invalidProps2),
"Must set acks to all in order to use the idempotent producer");
Properties invalidProps3 = new Properties() {{
putAll(baseProps);
setProperty(ProducerConfig.ACKS_CONFIG, "0");
setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
}};
assertThrows(
ConfigException.class,
() -> new ProducerConfig(invalidProps3),
"Must set acks to all in order to use the idempotent producer");
}
@Test
public void testMetricsReporterAutoGeneratedClientId() {
Properties props = new Properties();

View File

@ -49,12 +49,14 @@ class TestVerifiableProducer(Test):
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(LATEST_0_10_0))
@parametrize(producer_version=str(LATEST_0_10_1))
@matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"], enable_idempotence=[False])
@matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"], enable_idempotence=[True])
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@cluster(num_nodes=4)
@matrix(producer_version=[str(DEV_BRANCH)], security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN', 'GSSAPI'],
metadata_quorum=quorum.all)
def test_simple_run(self, producer_version, security_protocol = 'PLAINTEXT', sasl_mechanism='PLAIN',
metadata_quorum=quorum.zk):
def test_simple_run(self, producer_version, acks=None, enable_idempotence=False, security_protocol = 'PLAINTEXT',
sasl_mechanism='PLAIN', metadata_quorum=quorum.zk):
"""
Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and
verify that we can produce a small number of messages.
@ -72,6 +74,8 @@ class TestVerifiableProducer(Test):
self.kafka.start()
node = self.producer.nodes[0]
self.producer.enable_idempotence = enable_idempotence
self.producer.acks = acks
node.version = KafkaVersion(producer_version)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,