KAFKA-19422: Deflake streams_application_upgrade_test (#20004)

In this upgrade test, applications sometimes crash before the upgrade,
so it's actually triggering a bug in several older versions (2.x and
possibly others). It seems to be a rare race condition that has been
happening since 2022. Since we are not going to roll out a patch release
for Kafka Streams 2.x, we should just allow applications to crash before
the upgrade.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-06-23 17:41:05 +02:00 committed by GitHub
parent cb809e2574
commit 261e861340
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 26 additions and 15 deletions

View File

@ -108,10 +108,10 @@ class StreamsUpgradeTest(Test):
random.shuffle(self.processors)
for p in self.processors:
p.CLEAN_NODE_ENABLED = False
self.do_stop_start_bounce(p, None, to_version, counter)
self.do_stop_start_bounce(p, None, from_version, to_version, counter)
counter = counter + 1
elif bounce_type == "full":
self.restart_all_nodes_with(to_version)
self.restart_all_nodes_with(from_version, to_version)
else:
raise Exception("Unrecognized bounce_type: " + str(bounce_type))
@ -157,30 +157,36 @@ class StreamsUpgradeTest(Test):
self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE)
self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE)
def restart_all_nodes_with(self, version):
def restart_all_nodes_with(self, from_version, to_version):
self.processor1.stop_node(self.processor1.node)
self.processor2.stop_node(self.processor2.node)
self.processor3.stop_node(self.processor3.node)
# make sure the members have stopped
self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
if from_version.startswith("2."):
# some older versions crash on shutdown, so we allow crashes here.
self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor1.STDOUT_FILE)
self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor2.STDOUT_FILE)
self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)", self.processor3.STDOUT_FILE)
else:
self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-CLOSED", self.processor1.STDOUT_FILE)
self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-CLOSED", self.processor2.STDOUT_FILE)
self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-CLOSED", self.processor3.STDOUT_FILE)
self.roll_logs(self.processor1, ".1-1")
self.roll_logs(self.processor2, ".1-1")
self.roll_logs(self.processor3, ".1-1")
self.set_version(self.processor1, version)
self.set_version(self.processor2, version)
self.set_version(self.processor3, version)
self.set_version(self.processor1, to_version)
self.set_version(self.processor2, to_version)
self.set_version(self.processor3, to_version)
self.processor1.start_node(self.processor1.node)
self.processor2.start_node(self.processor2.node)
self.processor3.start_node(self.processor3.node)
# double-check the version
kafka_version_str = self.get_version_string(version)
kafka_version_str = self.get_version_string(to_version)
self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE)
self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE)
self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE)
@ -226,8 +232,8 @@ class StreamsUpgradeTest(Test):
def purge_state_dir(self, processor):
processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
kafka_version_str = self.get_version_string(new_version)
def do_stop_start_bounce(self, processor, upgrade_from, from_version, to_version, counter):
kafka_version_str = self.get_version_string(to_version)
first_other_processor = None
second_other_processor = None
@ -252,7 +258,12 @@ class StreamsUpgradeTest(Test):
second_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if from_version.startswith("2."):
# some older versions crash on shutdown, so we allow crashes here.
node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-(EXCEPTION|CLOSED)' %s" % processor.STDOUT_FILE, allow_fail=False)
else:
node.account.ssh_capture("grep -E 'SMOKE-TEST-CLIENT-CLOSED' %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
roll_counter = ".1-" # second round of rolling bounces
@ -261,7 +272,7 @@ class StreamsUpgradeTest(Test):
self.roll_logs(processor, roll_counter + str(counter))
self.set_version(processor, new_version)
self.set_version(processor, to_version)
processor.set_upgrade_from(upgrade_from)
grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
@ -273,7 +284,7 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
err_msg="Could not detect Kafka Streams version " + to_version + " on " + str(node.account))
first_other_monitor.wait_until(self.processed_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))