From ff4dff044a22ef0f3095c6f11c38a1a718ad7d13 Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Thu, 17 Mar 2022 10:00:27 -0700 Subject: [PATCH] KAFKA-13750; Client Compatability KafkaTest uses invalid idempotency configs (#11909) Reviewers: Luke Chen , David Jacot --- .../client/client_compatibility_features_test.py | 2 ++ .../apache/kafka/tools/ClientCompatibilityTest.java | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index 15b6a939071..a20b6254f57 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -43,9 +43,11 @@ def get_broker_features(broker_version): if broker_version < V_0_11_0_0: features["describe-acls-supported"] = False features["describe-configs-supported"] = False + features["idempotent-producer-supported"] = False else: features["describe-acls-supported"] = True features["describe-configs-supported"] = True + features["idempotent-producer-supported"] = True return features def run_command(node, cmd, ssh_log_file): diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index a5d6c7a835d..e351f311f72 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -88,6 +88,7 @@ public class ClientCompatibilityTest { final boolean createTopicsSupported; final boolean describeAclsSupported; final boolean describeConfigsSupported; + final boolean idempotentProducerSupported; TestConfig(Namespace res) { this.bootstrapServer = res.getString("bootstrapServer"); @@ -99,6 +100,7 @@ public class ClientCompatibilityTest { this.createTopicsSupported = res.getBoolean("createTopicsSupported"); this.describeAclsSupported = res.getBoolean("describeAclsSupported"); this.describeConfigsSupported = res.getBoolean("describeConfigsSupported"); + this.idempotentProducerSupported = res.get("idempotentProducerSupported"); } } @@ -172,6 +174,13 @@ public class ClientCompatibilityTest { .dest("describeConfigsSupported") .metavar("DESCRIBE_CONFIGS_SUPPORTED") .help("Whether describeConfigs is supported in the AdminClient."); + parser.addArgument("--idempotent-producer-supported") + .action(store()) + .required(true) + .type(Boolean.class) + .dest("idempotentProducerSupported") + .metavar("IDEMPOTENT_PRODUCER_SUPPORTED") + .help("Whether the producer supports idempotency."); Namespace res = null; try { @@ -243,6 +252,9 @@ public class ClientCompatibilityTest { public void testProduce() throws Exception { Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer); + if (!testConfig.idempotentProducerSupported) { + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false"); + } ByteArraySerializer serializer = new ByteArraySerializer(); KafkaProducer producer = new KafkaProducer<>(producerProps, serializer, serializer); ProducerRecord record1 = new ProducerRecord<>(testConfig.topic, message1);