mirror of https://github.com/apache/kafka.git
KAFKA-13750; Client Compatability KafkaTest uses invalid idempotency configs (#11909)
Reviewers: Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
parent
77ad7db620
commit
ff4dff044a
|
@ -43,9 +43,11 @@ def get_broker_features(broker_version):
|
||||||
if broker_version < V_0_11_0_0:
|
if broker_version < V_0_11_0_0:
|
||||||
features["describe-acls-supported"] = False
|
features["describe-acls-supported"] = False
|
||||||
features["describe-configs-supported"] = False
|
features["describe-configs-supported"] = False
|
||||||
|
features["idempotent-producer-supported"] = False
|
||||||
else:
|
else:
|
||||||
features["describe-acls-supported"] = True
|
features["describe-acls-supported"] = True
|
||||||
features["describe-configs-supported"] = True
|
features["describe-configs-supported"] = True
|
||||||
|
features["idempotent-producer-supported"] = True
|
||||||
return features
|
return features
|
||||||
|
|
||||||
def run_command(node, cmd, ssh_log_file):
|
def run_command(node, cmd, ssh_log_file):
|
||||||
|
|
|
@ -88,6 +88,7 @@ public class ClientCompatibilityTest {
|
||||||
final boolean createTopicsSupported;
|
final boolean createTopicsSupported;
|
||||||
final boolean describeAclsSupported;
|
final boolean describeAclsSupported;
|
||||||
final boolean describeConfigsSupported;
|
final boolean describeConfigsSupported;
|
||||||
|
final boolean idempotentProducerSupported;
|
||||||
|
|
||||||
TestConfig(Namespace res) {
|
TestConfig(Namespace res) {
|
||||||
this.bootstrapServer = res.getString("bootstrapServer");
|
this.bootstrapServer = res.getString("bootstrapServer");
|
||||||
|
@ -99,6 +100,7 @@ public class ClientCompatibilityTest {
|
||||||
this.createTopicsSupported = res.getBoolean("createTopicsSupported");
|
this.createTopicsSupported = res.getBoolean("createTopicsSupported");
|
||||||
this.describeAclsSupported = res.getBoolean("describeAclsSupported");
|
this.describeAclsSupported = res.getBoolean("describeAclsSupported");
|
||||||
this.describeConfigsSupported = res.getBoolean("describeConfigsSupported");
|
this.describeConfigsSupported = res.getBoolean("describeConfigsSupported");
|
||||||
|
this.idempotentProducerSupported = res.get("idempotentProducerSupported");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +174,13 @@ public class ClientCompatibilityTest {
|
||||||
.dest("describeConfigsSupported")
|
.dest("describeConfigsSupported")
|
||||||
.metavar("DESCRIBE_CONFIGS_SUPPORTED")
|
.metavar("DESCRIBE_CONFIGS_SUPPORTED")
|
||||||
.help("Whether describeConfigs is supported in the AdminClient.");
|
.help("Whether describeConfigs is supported in the AdminClient.");
|
||||||
|
parser.addArgument("--idempotent-producer-supported")
|
||||||
|
.action(store())
|
||||||
|
.required(true)
|
||||||
|
.type(Boolean.class)
|
||||||
|
.dest("idempotentProducerSupported")
|
||||||
|
.metavar("IDEMPOTENT_PRODUCER_SUPPORTED")
|
||||||
|
.help("Whether the producer supports idempotency.");
|
||||||
|
|
||||||
Namespace res = null;
|
Namespace res = null;
|
||||||
try {
|
try {
|
||||||
|
@ -243,6 +252,9 @@ public class ClientCompatibilityTest {
|
||||||
public void testProduce() throws Exception {
|
public void testProduce() throws Exception {
|
||||||
Properties producerProps = new Properties();
|
Properties producerProps = new Properties();
|
||||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
|
||||||
|
if (!testConfig.idempotentProducerSupported) {
|
||||||
|
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
|
||||||
|
}
|
||||||
ByteArraySerializer serializer = new ByteArraySerializer();
|
ByteArraySerializer serializer = new ByteArraySerializer();
|
||||||
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, serializer, serializer);
|
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, serializer, serializer);
|
||||||
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(testConfig.topic, message1);
|
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(testConfig.topic, message1);
|
||||||
|
|
Loading…
Reference in New Issue