KAFKA-16331: Remove EOSv1 from Kafka Streams system tests (#17108)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-09-10 17:55:03 -07:00 committed by GitHub
parent 0af75c0e41
commit 6fd973b4a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 39 additions and 76 deletions

View File

@ -742,9 +742,9 @@ public class StreamThread extends Thread implements ProcessingThread {
errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
"Setting {}=\"{}\"/\"{}\" requires broker version 2.5 or higher.", "Setting {}=\"{}\" requires broker version 2.5 or higher.",
StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA); StreamsConfig.EXACTLY_ONCE_V2);
} }
failedStreamThreadSensor.record(); failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false); this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false);

View File

@ -68,8 +68,8 @@ public class BrokerCompatibilityTest {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility"); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility");
streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode); streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode);
@ -103,7 +103,7 @@ public class BrokerCompatibilityTest {
System.out.println("start Kafka Streams"); System.out.println("start Kafka Streams");
streams.start(); streams.start();
final boolean eosEnabled = processingMode.startsWith("exactly_once"); final boolean eosEnabled = processingMode.equals("exactly_once_v2");
System.out.println("send data"); System.out.println("send data");
final Properties producerProperties = new Properties(); final Properties producerProperties = new Properties();

View File

@ -29,7 +29,6 @@ public class StreamsEosTest {
* args ::= kafka propFileName command * args ::= kafka propFileName command
* command := "run" | "process" | "verify" * command := "run" | "process" | "verify"
*/ */
@SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException { public static void main(final String[] args) throws IOException {
if (args.length < 2) { if (args.length < 2) {
System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
@ -49,12 +48,9 @@ public class StreamsEosTest {
} }
if ("process".equals(command) || "process-complex".equals(command)) { if ("process".equals(command) || "process-complex".equals(command)) {
if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) && if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + System.err.println("processingGuarantee must be " + StreamsConfig.EXACTLY_ONCE_V2);
StreamsConfig.EXACTLY_ONCE_BETA + " or " + StreamsConfig.EXACTLY_ONCE_V2);
Exit.exit(1); Exit.exit(1);
} }
} }

View File

@ -38,7 +38,6 @@ public class StreamsSmokeTest {
* *
* @param args * @param args
*/ */
@SuppressWarnings("deprecation")
public static void main(final String[] args) throws IOException { public static void main(final String[] args) throws IOException {
if (args.length < 2) { if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
@ -60,14 +59,10 @@ public class StreamsSmokeTest {
if ("process".equals(command)) { if ("process".equals(command)) {
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) && if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + System.err.println("processingGuarantee must be either " +
StreamsConfig.AT_LEAST_ONCE + ", " + StreamsConfig.AT_LEAST_ONCE + ", " +
StreamsConfig.EXACTLY_ONCE + ", or " +
StreamsConfig.EXACTLY_ONCE_BETA + ", or " +
StreamsConfig.EXACTLY_ONCE_V2); StreamsConfig.EXACTLY_ONCE_V2);
Exit.exit(1); Exit.exit(1);

View File

@ -384,17 +384,16 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
clean_node_enabled = True clean_node_enabled = True
def __init__(self, test_context, kafka, processing_guarantee, command): def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context, super(StreamsEosTestBaseService, self).__init__(test_context,
kafka, kafka,
"org.apache.kafka.streams.tests.StreamsEosTest", "org.apache.kafka.streams.tests.StreamsEosTest",
command) command)
self.PROCESSING_GUARANTEE = processing_guarantee
def prop_file(self): def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE, streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735 "session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
} }
@ -440,24 +439,24 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
class StreamsEosTestDriverService(StreamsEosTestBaseService): class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "not-required", "run") super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka, processing_guarantee): def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process") super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka, processing_guarantee): def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process-complex") super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify") super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify-complex") super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):

View File

@ -87,29 +87,6 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop() self.consumer.stop()
self.kafka.stop() self.kafka.stop()
@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_0_11_0),str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),
str(LATEST_2_1),str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),
str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)])
def test_compatible_brokers_eos_alpha_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once")
processor.start()
self.consumer.start()
processor.wait()
wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.")
self.consumer.stop()
self.kafka.stop()
@cluster(num_nodes=4) @cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8), @matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
@ -167,9 +144,9 @@ class StreamsBrokerCompatibility(Test):
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
with processor.node.account.monitor_log(processor.LOG_FILE) as log: with processor.node.account.monitor_log(processor.LOG_FILE) as log:
processor.start() processor.start()
log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2"/"exactly_once_beta" requires broker version 2\.5 or higher\.', log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.',
timeout_sec=60, timeout_sec=60,
err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"/\"exactly_once_beta\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account)) err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account))
monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException', monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException',
timeout_sec=60, timeout_sec=60,
err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account)) err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account))

View File

@ -39,21 +39,19 @@ class StreamsEosTest(KafkaTest):
self.test_context = test_context self.test_context = test_context
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], @matrix(metadata_quorum=[quorum.isolated_kraft])
metadata_quorum=[quorum.isolated_kraft]) def test_rebalance_simple(self, metadata_quorum):
def test_rebalance_simple(self, processing_guarantee, metadata_quorum): self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], @matrix(metadata_quorum=[quorum.isolated_kraft])
metadata_quorum=[quorum.isolated_kraft]) def test_rebalance_complex(self, metadata_quorum):
def test_rebalance_complex(self, processing_guarantee, metadata_quorum): self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
def run_rebalance(self, processor1, processor2, processor3, verifier): def run_rebalance(self, processor1, processor2, processor3, verifier):
@ -83,21 +81,19 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], @matrix(metadata_quorum=[quorum.isolated_kraft])
metadata_quorum=[quorum.isolated_kraft]) def test_failure_and_recovery(self, metadata_quorum):
def test_failure_and_recovery(self, processing_guarantee, metadata_quorum): self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], @matrix(metadata_quorum=[quorum.isolated_kraft])
metadata_quorum=[quorum.isolated_kraft]) def test_failure_and_recovery_complex(self, metadata_quorum):
def test_failure_and_recovery_complex(self, processing_guarantee, metadata_quorum): self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):

View File

@ -47,7 +47,7 @@ class StreamsSmokeTest(KafkaTest):
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
@cluster(num_nodes=8) @cluster(num_nodes=8)
@matrix(processing_guarantee=['exactly_once', 'exactly_once_v2', 'at_least_once'], @matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
crash=[True, False], crash=[True, False],
metadata_quorum=quorum.all_non_upgrade) metadata_quorum=quorum.all_non_upgrade)
def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk): def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):