mirror of https://github.com/apache/kafka.git
KAFKA-17625: Removing explicit ZK test parameterizations (#17638)
This PR removes ZK test parameterizations from ducktape by: - Removing zk from quorum.all_non_upgrade - Removing quorum.zk from @matrix and @parametrize annotations - Changing usages of quorum.all to quorum.all_kraft - Deleting message_format_change_test.py The default metadata_quorum value still needs to be changed to KRaft rather than ZK, but this will be done in a follow-up PR. Reviewers: Kirk True <kirk@kirktrue.pro>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
fa124385b8
commit
c5a31cd6fb
|
@ -25,10 +25,8 @@ all = [zk, isolated_kraft, combined_kraft]
|
||||||
# How we will parameterize tests that exercise all KRaft quorum styles
|
# How we will parameterize tests that exercise all KRaft quorum styles
|
||||||
all_kraft = [isolated_kraft, combined_kraft]
|
all_kraft = [isolated_kraft, combined_kraft]
|
||||||
# How we will parameterize tests that are unrelated to upgrades:
|
# How we will parameterize tests that are unrelated to upgrades:
|
||||||
# [“ZK”] before the KIP-500 bridge release(s)
|
|
||||||
# [“ZK”, “ISOLATED_KRAFT”] during the KIP-500 bridge release(s) and in preview releases
|
|
||||||
# [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s)
|
# [“ISOLATED_KRAFT”] after the KIP-500 bridge release(s)
|
||||||
all_non_upgrade = [zk, isolated_kraft]
|
all_non_upgrade = [isolated_kraft]
|
||||||
|
|
||||||
def for_test(test_context):
|
def for_test(test_context):
|
||||||
# A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
|
# A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
|
||||||
|
|
|
@ -48,10 +48,6 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
|
||||||
"Mismatched assignment: %s" % assignment
|
"Mismatched assignment: %s" % assignment
|
||||||
|
|
||||||
@cluster(num_nodes=4)
|
@cluster(num_nodes=4)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[True, False]
|
use_new_coordinator=[True, False]
|
||||||
|
|
|
@ -76,7 +76,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -138,7 +138,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
clean_shutdown=[True],
|
clean_shutdown=[True],
|
||||||
bounce_mode=["all", "rolling"],
|
bounce_mode=["all", "rolling"],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -195,7 +195,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
static_membership=[True, False],
|
static_membership=[True, False],
|
||||||
bounce_mode=["all", "rolling"],
|
bounce_mode=["all", "rolling"],
|
||||||
num_bounces=[5],
|
num_bounces=[5],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -272,7 +272,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
@matrix(
|
||||||
bounce_mode=["all", "rolling"],
|
bounce_mode=["all", "rolling"],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -314,7 +314,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
num_conflict_consumers=[1, 2],
|
num_conflict_consumers=[1, 2],
|
||||||
fencing_stage=["stable", "all"],
|
fencing_stage=["stable", "all"],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -399,7 +399,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
clean_shutdown=[True],
|
clean_shutdown=[True],
|
||||||
enable_autocommit=[True, False],
|
enable_autocommit=[True, False],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -454,12 +454,6 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
(consumer.last_commit(partition), consumer.current_position(partition))
|
(consumer.last_commit(partition), consumer.current_position(partition))
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
|
||||||
clean_shutdown=[True, False],
|
|
||||||
enable_autocommit=[True, False],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
clean_shutdown=[True, False],
|
clean_shutdown=[True, False],
|
||||||
enable_autocommit=[True, False],
|
enable_autocommit=[True, False],
|
||||||
|
@ -511,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -574,7 +568,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
|
||||||
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
|
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
|
||||||
"org.apache.kafka.clients.consumer.StickyAssignor",
|
"org.apache.kafka.clients.consumer.StickyAssignor",
|
||||||
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
|
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -1,106 +0,0 @@
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from ducktape.mark import matrix
|
|
||||||
from ducktape.utils.util import wait_until
|
|
||||||
from ducktape.mark.resource import cluster
|
|
||||||
|
|
||||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
|
||||||
from kafkatest.services.kafka import config_property, KafkaService, quorum
|
|
||||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
|
||||||
from kafkatest.utils import is_int
|
|
||||||
from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_11, V_2_8_0, DEV_BRANCH, KafkaVersion
|
|
||||||
|
|
||||||
|
|
||||||
class MessageFormatChangeTest(ProduceConsumeValidateTest):
|
|
||||||
|
|
||||||
def __init__(self, test_context):
|
|
||||||
super(MessageFormatChangeTest, self).__init__(test_context=test_context)
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.topic = "test_topic"
|
|
||||||
self.zk = ZookeeperService(self.test_context, num_nodes=1) if quorum.for_test(self.test_context) == quorum.zk else None
|
|
||||||
|
|
||||||
if self.zk:
|
|
||||||
self.zk.start()
|
|
||||||
|
|
||||||
# Producer and consumer
|
|
||||||
self.producer_throughput = 10000
|
|
||||||
self.num_producers = 1
|
|
||||||
self.num_consumers = 1
|
|
||||||
self.messages_per_producer = 100
|
|
||||||
|
|
||||||
def produce_and_consume(self, producer_version, consumer_version, group):
|
|
||||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
|
|
||||||
self.topic,
|
|
||||||
throughput=self.producer_throughput,
|
|
||||||
message_validator=is_int,
|
|
||||||
version=KafkaVersion(producer_version))
|
|
||||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
|
|
||||||
self.topic, consumer_timeout_ms=30000,
|
|
||||||
message_validator=is_int, version=KafkaVersion(consumer_version))
|
|
||||||
self.consumer.group_id = group
|
|
||||||
self.run_produce_consume_validate(lambda: wait_until(
|
|
||||||
lambda: self.producer.each_produced_at_least(self.messages_per_producer) == True,
|
|
||||||
timeout_sec=120, backoff_sec=1,
|
|
||||||
err_msg="Producer did not produce all messages in reasonable amount of time"))
|
|
||||||
|
|
||||||
@cluster(num_nodes=12)
|
|
||||||
@matrix(producer_version=[str(DEV_BRANCH)], consumer_version=[str(DEV_BRANCH)], metadata_quorum=[quorum.zk])
|
|
||||||
@matrix(producer_version=[str(LATEST_0_10)], consumer_version=[str(LATEST_0_10)], metadata_quorum=[quorum.zk])
|
|
||||||
@matrix(producer_version=[str(LATEST_0_9)], consumer_version=[str(LATEST_0_9)], metadata_quorum=[quorum.zk])
|
|
||||||
def test_compatibility(self, producer_version, consumer_version, metadata_quorum=quorum.zk):
|
|
||||||
""" This tests performs the following checks:
|
|
||||||
The workload is a mix of 0.9.x, 0.10.x and 0.11.x producers and consumers
|
|
||||||
that produce to and consume from a DEV_BRANCH cluster
|
|
||||||
1. initially the topic is using message format 0.9.0
|
|
||||||
2. change the message format version for topic to 0.10.0 on the fly.
|
|
||||||
3. change the message format version for topic to 0.11.0 on the fly.
|
|
||||||
4. change the message format version for topic back to 0.10.0 on the fly (only if the client version is 0.11.0 or newer)
|
|
||||||
- The producers and consumers should not have any issue.
|
|
||||||
|
|
||||||
Note regarding step number 4. Downgrading the message format version is generally unsupported as it breaks
|
|
||||||
older clients. More concretely, if we downgrade a topic from 0.11.0 to 0.10.0 after it contains messages with
|
|
||||||
version 0.11.0, we will return the 0.11.0 messages without down conversion due to an optimisation in the
|
|
||||||
handling of fetch requests. This will break any consumer that doesn't support 0.11.0. So, in practice, step 4
|
|
||||||
is similar to step 2 and it didn't seem worth it to increase the cluster size to in order to add a step 5 that
|
|
||||||
would change the message format version for the topic back to 0.9.0.0.
|
|
||||||
"""
|
|
||||||
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
|
|
||||||
"partitions": 3,
|
|
||||||
"replication-factor": 3,
|
|
||||||
'configs': {"min.insync.replicas": 2}}},
|
|
||||||
controller_num_nodes_override=1)
|
|
||||||
for node in self.kafka.nodes:
|
|
||||||
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = str(V_2_8_0) # required for writing old message formats
|
|
||||||
|
|
||||||
self.kafka.start()
|
|
||||||
self.logger.info("First format change to 0.9.0")
|
|
||||||
self.kafka.alter_message_format(self.topic, str(LATEST_0_9))
|
|
||||||
self.produce_and_consume(producer_version, consumer_version, "group1")
|
|
||||||
|
|
||||||
self.logger.info("Second format change to 0.10.0")
|
|
||||||
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
|
|
||||||
self.produce_and_consume(producer_version, consumer_version, "group2")
|
|
||||||
|
|
||||||
self.logger.info("Third format change to 0.11.0")
|
|
||||||
self.kafka.alter_message_format(self.topic, str(LATEST_0_11))
|
|
||||||
self.produce_and_consume(producer_version, consumer_version, "group3")
|
|
||||||
|
|
||||||
if producer_version == str(DEV_BRANCH) and consumer_version == str(DEV_BRANCH):
|
|
||||||
self.logger.info("Fourth format change back to 0.10.0")
|
|
||||||
self.kafka.alter_message_format(self.topic, str(LATEST_0_10))
|
|
||||||
self.produce_and_consume(producer_version, consumer_version, "group4")
|
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ class ConnectDistributedTest(Test):
|
||||||
@matrix(
|
@matrix(
|
||||||
exactly_once_source=[True, False],
|
exactly_once_source=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -210,7 +210,7 @@ class ConnectDistributedTest(Test):
|
||||||
@matrix(
|
@matrix(
|
||||||
connector_type=['source', 'exactly-once source', 'sink'],
|
connector_type=['source', 'exactly-once source', 'sink'],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -247,7 +247,7 @@ class ConnectDistributedTest(Test):
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@matrix(
|
@matrix(
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -277,7 +277,7 @@ class ConnectDistributedTest(Test):
|
||||||
@matrix(
|
@matrix(
|
||||||
connector_type=['source', 'sink'],
|
connector_type=['source', 'sink'],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -311,12 +311,6 @@ class ConnectDistributedTest(Test):
|
||||||
err_msg="Failed to see task transition to the RUNNING state")
|
err_msg="Failed to see task transition to the RUNNING state")
|
||||||
|
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@matrix(
|
|
||||||
exactly_once_source=[True, False],
|
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
exactly_once_source=[True, False],
|
exactly_once_source=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
|
@ -366,7 +360,7 @@ class ConnectDistributedTest(Test):
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@matrix(
|
@matrix(
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -422,12 +416,6 @@ class ConnectDistributedTest(Test):
|
||||||
err_msg="Failed to consume messages after resuming sink connector")
|
err_msg="Failed to consume messages after resuming sink connector")
|
||||||
|
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@matrix(
|
|
||||||
exactly_once_source=[True, False],
|
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
exactly_once_source=[True, False],
|
exactly_once_source=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
|
@ -643,7 +631,7 @@ class ConnectDistributedTest(Test):
|
||||||
security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
|
security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
|
||||||
exactly_once_source=[True, False],
|
exactly_once_source=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -693,7 +681,7 @@ class ConnectDistributedTest(Test):
|
||||||
@matrix(
|
@matrix(
|
||||||
clean=[True, False],
|
clean=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -823,12 +811,6 @@ class ConnectDistributedTest(Test):
|
||||||
assert success, "Found validation errors:\n" + "\n ".join(errors)
|
assert success, "Found validation errors:\n" + "\n ".join(errors)
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
|
||||||
clean=[True, False],
|
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
clean=[True, False],
|
clean=[True, False],
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
|
@ -933,7 +915,7 @@ class ConnectDistributedTest(Test):
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@matrix(
|
@matrix(
|
||||||
connect_protocol=['sessioned', 'compatible', 'eager'],
|
connect_protocol=['sessioned', 'compatible', 'eager'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -138,10 +138,8 @@ class ConnectStandaloneFileTest(Test):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.zk)
|
|
||||||
@parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.isolated_kraft)
|
@parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.isolated_kraft)
|
||||||
@parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.isolated_kraft)
|
@parametrize(error_tolerance=ErrorTolerance.ALL, metadata_quorum=quorum.isolated_kraft)
|
||||||
@parametrize(error_tolerance=ErrorTolerance.NONE, metadata_quorum=quorum.zk)
|
|
||||||
def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
|
def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
|
||||||
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
|
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, topics=self.topics)
|
||||||
|
|
||||||
|
|
|
@ -68,14 +68,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.debug("Produce workload finished")
|
self.logger.debug("Produce workload finished")
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
topics=[
|
|
||||||
["consume_bench_topic[0-5]"], # topic subscription
|
|
||||||
["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
|
|
||||||
],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
topics=[
|
topics=[
|
||||||
["consume_bench_topic[0-5]"], # topic subscription
|
["consume_bench_topic[0-5]"], # topic subscription
|
||||||
|
@ -114,10 +106,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
|
@ -149,10 +137,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
|
@ -185,10 +169,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
|
@ -222,10 +202,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
|
@ -259,10 +235,6 @@ class ConsumeBenchTest(Test):
|
||||||
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2))
|
||||||
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
|
|
|
@ -93,7 +93,7 @@ class ConsumerGroupCommandTest(Test):
|
||||||
@cluster(num_nodes=3)
|
@cluster(num_nodes=3)
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['PLAINTEXT', 'SSL'],
|
security_protocol=['PLAINTEXT', 'SSL'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -112,7 +112,7 @@ class ConsumerGroupCommandTest(Test):
|
||||||
@cluster(num_nodes=3)
|
@cluster(num_nodes=3)
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['PLAINTEXT', 'SSL'],
|
security_protocol=['PLAINTEXT', 'SSL'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -77,7 +77,7 @@ class ControllerMutationQuotaTest(Test):
|
||||||
self.zk.stop()
|
self.zk.stop()
|
||||||
|
|
||||||
@cluster(num_nodes=2)
|
@cluster(num_nodes=2)
|
||||||
@matrix(metadata_quorum=quorum.all)
|
@matrix(metadata_quorum=quorum.all_kraft)
|
||||||
def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
|
def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
|
||||||
self.partition_count = 10
|
self.partition_count = 10
|
||||||
mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds)
|
mutation_rate = 3 * self.partition_count / (self.window_num * self.window_size_seconds)
|
||||||
|
|
|
@ -71,7 +71,7 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
|
||||||
|
|
||||||
@cluster(num_nodes=9)
|
@cluster(num_nodes=9)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -91,8 +91,6 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
|
||||||
"""Override this since we're adding services outside of the constructor"""
|
"""Override this since we're adding services outside of the constructor"""
|
||||||
return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
|
return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
|
||||||
|
|
||||||
@cluster(num_nodes=8)
|
|
||||||
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk])
|
|
||||||
@cluster(num_nodes=10)
|
@cluster(num_nodes=10)
|
||||||
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft])
|
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft])
|
||||||
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum):
|
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum):
|
||||||
|
|
|
@ -147,7 +147,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
bounce_brokers=[True, False],
|
bounce_brokers=[True, False],
|
||||||
reassign_from_offset_zero=[True, False],
|
reassign_from_offset_zero=[True, False],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -52,7 +52,7 @@ class ReplicaScaleTest(Test):
|
||||||
topic_count=[50],
|
topic_count=[50],
|
||||||
partition_count=[34],
|
partition_count=[34],
|
||||||
replication_factor=[3],
|
replication_factor=[3],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -119,13 +119,6 @@ class ReplicaScaleTest(Test):
|
||||||
trogdor.stop()
|
trogdor.stop()
|
||||||
|
|
||||||
@cluster(num_nodes=12)
|
@cluster(num_nodes=12)
|
||||||
@matrix(
|
|
||||||
topic_count=[50],
|
|
||||||
partition_count=[34],
|
|
||||||
replication_factor=[3],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
topic_count=[50],
|
topic_count=[50],
|
||||||
partition_count=[34],
|
partition_count=[34],
|
||||||
|
|
|
@ -38,7 +38,7 @@ class ReplicationReplicaFailureTest(EndToEndTest):
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -61,7 +61,7 @@ class SecurityTest(EndToEndTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['PLAINTEXT'],
|
security_protocol=['PLAINTEXT'],
|
||||||
interbroker_security_protocol=['SSL'],
|
interbroker_security_protocol=['SSL'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -74,7 +74,7 @@ class SecurityTest(EndToEndTest):
|
||||||
@matrix(
|
@matrix(
|
||||||
security_protocol=['SSL'],
|
security_protocol=['SSL'],
|
||||||
interbroker_security_protocol=['PLAINTEXT'],
|
interbroker_security_protocol=['PLAINTEXT'],
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
@ -157,7 +157,7 @@ class SecurityTest(EndToEndTest):
|
||||||
|
|
||||||
@cluster(num_nodes=2)
|
@cluster(num_nodes=2)
|
||||||
@matrix(
|
@matrix(
|
||||||
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
|
metadata_quorum=[quorum.isolated_kraft],
|
||||||
use_new_coordinator=[False]
|
use_new_coordinator=[False]
|
||||||
)
|
)
|
||||||
@matrix(
|
@matrix(
|
||||||
|
|
|
@ -217,14 +217,6 @@ class TransactionsTest(Test):
|
||||||
}
|
}
|
||||||
|
|
||||||
@cluster(num_nodes=9)
|
@cluster(num_nodes=9)
|
||||||
@matrix(
|
|
||||||
failure_mode=["hard_bounce", "clean_bounce"],
|
|
||||||
bounce_target=["brokers", "clients"],
|
|
||||||
check_order=[True, False],
|
|
||||||
use_group_metadata=[True, False],
|
|
||||||
metadata_quorum=[quorum.zk],
|
|
||||||
use_new_coordinator=[False]
|
|
||||||
)
|
|
||||||
@matrix(
|
@matrix(
|
||||||
failure_mode=["hard_bounce", "clean_bounce"],
|
failure_mode=["hard_bounce", "clean_bounce"],
|
||||||
bounce_target=["brokers", "clients"],
|
bounce_target=["brokers", "clients"],
|
||||||
|
|
|
@ -217,11 +217,6 @@ class StreamsBrokerBounceTest(Test):
|
||||||
num_threads=[1, 3],
|
num_threads=[1, 3],
|
||||||
sleep_time_secs=[120],
|
sleep_time_secs=[120],
|
||||||
metadata_quorum=[quorum.isolated_kraft])
|
metadata_quorum=[quorum.isolated_kraft])
|
||||||
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
|
|
||||||
broker_type=["leader", "controller"],
|
|
||||||
num_threads=[1, 3],
|
|
||||||
sleep_time_secs=[120],
|
|
||||||
metadata_quorum=[quorum.zk])
|
|
||||||
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum):
|
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum):
|
||||||
"""
|
"""
|
||||||
Start a smoke test client, then kill one particular broker and ensure data is still received
|
Start a smoke test client, then kill one particular broker and ensure data is still received
|
||||||
|
|
Loading…
Reference in New Issue