From f26974b16db7903bd350f06b5796acac52604d4c Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Sat, 17 May 2025 21:20:47 +0200 Subject: [PATCH] KAFKA-19202: Enable KIP-1071 in streams_eos_test (#19700) Enable next system test with KIP-1071. Some of the validation inside the test did not make sense for KIP-1071. This is because in KIP-1071, if a member leaves or joins the group, not all members may enter a REBALANCING state. We use the wrapper introduced in [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271) to print a log line whenever the member epoch is bumped, which is the only way a member can "indirectly" observe that other members are rebalancing. Reviewers: Bill Bejeck --- .../streams/topics/ConfiguredSubtopology.java | 7 +- .../KafkaStreamsTelemetryIntegrationTest.java | 6 +- .../kafka/streams/tests/EosTestClient.java | 52 +++++++++++- .../kafka/streams/tests/EosTestDriver.java | 53 ++++++++---- .../kafka/streams/tests/StreamsEosTest.java | 4 +- tests/kafkatest/services/streams.py | 24 +++--- .../tests/streams/streams_eos_test.py | 80 +++++++++++-------- 7 files changed, 160 insertions(+), 66 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java index b8586776ebe..8ef41c4967e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java @@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.streams.topics; import org.apache.kafka.common.message.StreamsGroupDescribeResponseData; +import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -53,9 +54,11 @@ public record ConfiguredSubtopology(Set sourceTopics, .setSourceTopics(sourceTopics.stream().sorted().toList()) .setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().toList()) .setRepartitionSourceTopics(repartitionSourceTopics.values().stream() - .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList()) + .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo) + .sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList()) .setStateChangelogTopics(stateChangelogTopics.values().stream() - .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList()); + .map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo) + .sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList()); } } \ No newline at end of file diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index eefb4e3e287..b0eb3ebf0a5 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -576,7 +576,7 @@ public class KafkaStreamsTelemetryIntegrationTest { public static class TestConsumerWrapper extends ConsumerWrapper { @Override public void wrapConsumer(final AsyncKafkaConsumer delegate, final Map config, final Optional streamsRebalanceData) { - final TestingMetricsInterceptingAsynConsumer consumer = new TestingMetricsInterceptingAsynConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData); + final TestingMetricsInterceptingAsyncConsumer consumer = new TestingMetricsInterceptingAsyncConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData); INTERCEPTING_CONSUMERS.add(consumer); super.wrapConsumer(consumer, config, streamsRebalanceData); @@ -613,10 +613,10 @@ public class KafkaStreamsTelemetryIntegrationTest { } } - public static class TestingMetricsInterceptingAsynConsumer extends AsyncKafkaConsumer implements TestingMetricsInterceptor { + public static class TestingMetricsInterceptingAsyncConsumer extends AsyncKafkaConsumer implements TestingMetricsInterceptor { private final List passedMetrics = new ArrayList<>(); - public TestingMetricsInterceptingAsynConsumer( + public TestingMetricsInterceptingAsyncConsumer( final Map configs, final Deserializer keyDeserializer, final Deserializer valueDeserializer, diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index ae9d7527d7a..80a46b3437e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -16,18 +16,26 @@ */ package org.apache.kafka.streams.tests; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; +import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.apache.kafka.streams.internals.ConsumerWrapper; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,6 +46,8 @@ public class EosTestClient extends SmokeTestUtil { private final Properties properties; private final boolean withRepartitioning; private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false); + private static final List CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>(); + private int minGroupEpoch = 0; private KafkaStreams streams; private boolean uncaughtException; @@ -46,6 +56,8 @@ public class EosTestClient extends SmokeTestUtil { super(); this.properties = properties; this.withRepartitioning = withRepartitioning; + this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, CapturingConsumerWrapper.class); + CAPTURING_CONSUMER_WRAPPERS.clear(); } private volatile boolean isRunning = true; @@ -95,7 +107,8 @@ public class EosTestClient extends SmokeTestUtil { streams.close(Duration.ofSeconds(60_000L)); streams = null; } - sleep(1000); + logGroupEpochBump(); + sleep(100); } } @@ -172,4 +185,41 @@ public class EosTestClient extends SmokeTestUtil { System.err.flush(); } } + + // Used in the streams group protocol + // Detect a completed rebalance by checking if the group epoch has been bumped for all threads. + private void logGroupEpochBump() { + int currentMin = Integer.MAX_VALUE; + for (final CapturingConsumerWrapper consumer : CAPTURING_CONSUMER_WRAPPERS) { + final int groupEpoch = consumer.lastSeenGroupEpoch; + if (groupEpoch < currentMin) { + currentMin = groupEpoch; + } + } + if (currentMin > minGroupEpoch) { + System.out.println("MemberEpochBump"); + } + if (currentMin != Integer.MAX_VALUE) { + minGroupEpoch = currentMin; + } + } + + public static class CapturingConsumerWrapper extends ConsumerWrapper { + + public volatile int lastSeenGroupEpoch = 0; + + @Override + public void wrapConsumer(final AsyncKafkaConsumer delegate, final Map config, final Optional streamsRebalanceData) { + CAPTURING_CONSUMER_WRAPPERS.add(this); + super.wrapConsumer(delegate, config, streamsRebalanceData); + } + + @Override + public ConsumerGroupMetadata groupMetadata() { + final ConsumerGroupMetadata consumerGroupMetadata = delegate.groupMetadata(); + lastSeenGroupEpoch = consumerGroupMetadata.generationId(); + return consumerGroupMetadata; + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index f62d0c480fc..0815c49db07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.StreamsGroupDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -48,6 +49,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -171,7 +173,7 @@ public class EosTestDriver extends SmokeTestUtil { } } - public static void verify(final String kafka, final boolean withRepartitioning) { + public static void verify(final String kafka, final boolean withRepartitioning, final String groupProtocol) { final Properties props = new Properties(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); @@ -189,7 +191,7 @@ public class EosTestDriver extends SmokeTestUtil { final Map committedOffsets; try (final Admin adminClient = Admin.create(props)) { - ensureStreamsApplicationDown(adminClient); + ensureStreamsApplicationDown(adminClient, groupProtocol); committedOffsets = getCommittedOffsets(adminClient, withRepartitioning); } @@ -248,23 +250,33 @@ public class EosTestDriver extends SmokeTestUtil { System.out.flush(); } - private static void ensureStreamsApplicationDown(final Admin adminClient) { - + private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) { final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - ConsumerGroupDescription description; + boolean isEmpty; do { - description = getConsumerGroupDescription(adminClient); - - if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { - throw new RuntimeException( - "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " + - "Group: " + description - ); + if (Objects.equals(groupProtocol, "streams")) { + final StreamsGroupDescription description = getStreamsGroupDescription(adminClient); + isEmpty = description.members().isEmpty(); + if (System.currentTimeMillis() > maxWaitTime && !isEmpty) { + throwNotDownException(description); + } + } else { + final ConsumerGroupDescription description = getConsumerGroupDescription(adminClient); + isEmpty = description.members().isEmpty(); + if (System.currentTimeMillis() > maxWaitTime && !isEmpty) { + throwNotDownException(description); + } } sleep(1000L); - } while (!description.members().isEmpty()); + } while (!isEmpty); } + private static void throwNotDownException(final Object description) { + throw new RuntimeException( + "Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " + + "Group: " + description + ); + } private static Map getCommittedOffsets(final Admin adminClient, final boolean withRepartitioning) { @@ -636,7 +648,6 @@ public class EosTestDriver extends SmokeTestUtil { return partitions; } - private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) { final ConsumerGroupDescription description; try { @@ -650,4 +661,18 @@ public class EosTestDriver extends SmokeTestUtil { } return description; } + + private static StreamsGroupDescription getStreamsGroupDescription(final Admin adminClient) { + final StreamsGroupDescription description; + try { + description = adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID)) + .describedGroups() + .get(EosTestClient.APP_ID) + .get(10, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { + e.printStackTrace(); + throw new RuntimeException("Unexpected Exception getting group description", e); + } + return description; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java index 62223d789f0..9bb57e286d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java @@ -76,10 +76,10 @@ public class StreamsEosTest { new EosTestClient(streamsProperties, true).start(); break; case "verify": - EosTestDriver.verify(kafka, false); + EosTestDriver.verify(kafka, false, streamsProperties.getProperty("group.protocol")); break; case "verify-complex": - EosTestDriver.verify(kafka, true); + EosTestDriver.verify(kafka, true, streamsProperties.getProperty("group.protocol")); break; default: System.out.println("unknown command: " + command); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index a070166c572..3a35792a387 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -387,18 +387,20 @@ class StreamsEosTestBaseService(StreamsTestBaseService): clean_node_enabled = True - def __init__(self, test_context, kafka, command): + def __init__(self, test_context, kafka, command, group_protocol): super(StreamsEosTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsEosTest", command) + self.group_protocol = group_protocol def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.PROCESSING_GUARANTEE: "exactly_once_v2", "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment - "session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735 + "session.timeout.ms": "10000", # set back to 10s for tests. See KIP-735 + "group.protocol": self.group_protocol } cfg = KafkaConfig(**properties) @@ -443,24 +445,24 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): class StreamsEosTestDriverService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run") + super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run", "classic") class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process") + def __init__(self, test_context, kafka, group_protocol): + super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process", group_protocol) class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex") + def __init__(self, test_context, kafka, group_protocol): + super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex", group_protocol) class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify") + def __init__(self, test_context, kafka, group_protocol): + super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify", group_protocol) class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex") + def __init__(self, test_context, kafka, group_protocol): + super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex", group_protocol) class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index b3ac41b887a..fcab8adfe91 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -15,18 +15,18 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster -from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.kafka import quorum from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService +from kafkatest.tests.streams.base_streams_test import BaseStreamsTest -class StreamsEosTest(KafkaTest): +class StreamsEosTest(BaseStreamsTest): """ Test of Kafka Streams exactly-once semantics """ def __init__(self, test_context): - super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ + super(StreamsEosTest, self).__init__(test_context, num_controllers=1, num_brokers=3, topics={ 'data': {'partitions': 5, 'replication-factor': 2}, 'echo': {'partitions': 5, 'replication-factor': 2}, 'min': {'partitions': 5, 'replication-factor': 2}, @@ -38,20 +38,24 @@ class StreamsEosTest(KafkaTest): self.driver = StreamsEosTestDriverService(test_context, self.kafka) self.test_context = test_context - @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_rebalance_simple(self, metadata_quorum): - self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_rebalance_complex(self, metadata_quorum): - self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_rebalance_simple(self, metadata_quorum, group_protocol): + self.group_protocol = group_protocol + self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_rebalance_complex(self, metadata_quorum, group_protocol): + self.group_protocol = group_protocol + self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) def run_rebalance(self, processor1, processor2, processor3, verifier): """ @@ -79,20 +83,24 @@ class StreamsEosTest(KafkaTest): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) - @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_failure_and_recovery(self, metadata_quorum): - self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestJobRunnerService(self.test_context, self.kafka), - StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_failure_and_recovery_complex(self, metadata_quorum): - self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), - StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_failure_and_recovery(self, metadata_quorum, group_protocol): + self.group_protocol = group_protocol + self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) + @cluster(num_nodes=8) + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_failure_and_recovery_complex(self, metadata_quorum, group_protocol): + self.group_protocol = group_protocol + self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), + StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): """ @@ -160,10 +168,16 @@ class StreamsEosTest(KafkaTest): self.wait_for_startup(monitor1, keep_alive_processor1) def wait_for_startup(self, monitor, processor): - self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + if self.group_protocol == "classic": + self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + else: + # In the streams group protocol, not all members will take part in the rebalance. + # We can indirectly observe the progress of the group by seeing the member epoch being bumped. + self.wait_for(monitor, processor, "MemberEpochBump") self.wait_for(monitor, processor, "processed [0-9]* records from topic") - def wait_for(self, monitor, processor, output): + @staticmethod + def wait_for(monitor, processor, output): monitor.wait_until(output, timeout_sec=480, err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))