mirror of https://github.com/apache/kafka.git
Enable next system test with KIP-1071. Some of the validation inside the test did not make sense for KIP-1071. This is because in KIP-1071, if a member leaves or joins the group, not all members may enter a REBALANCING state. We use the wrapper introduced in [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271) to print a log line whenever the member epoch is bumped, which is the only way a member can "indirectly" observe that other members are rebalancing. Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
bff1602df3
commit
f26974b16d
|
@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.streams.topics;
|
|||
|
||||
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
@ -53,9 +54,11 @@ public record ConfiguredSubtopology(Set<String> sourceTopics,
|
|||
.setSourceTopics(sourceTopics.stream().sorted().toList())
|
||||
.setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().toList())
|
||||
.setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
|
||||
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList())
|
||||
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo)
|
||||
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
|
||||
.setStateChangelogTopics(stateChangelogTopics.values().stream()
|
||||
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList());
|
||||
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo)
|
||||
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
|
||||
}
|
||||
|
||||
}
|
|
@ -576,7 +576,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
|
|||
public static class TestConsumerWrapper extends ConsumerWrapper {
|
||||
@Override
|
||||
public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> delegate, final Map<String, Object> config, final Optional<StreamsRebalanceData> streamsRebalanceData) {
|
||||
final TestingMetricsInterceptingAsynConsumer<byte[], byte[]> consumer = new TestingMetricsInterceptingAsynConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData);
|
||||
final TestingMetricsInterceptingAsyncConsumer<byte[], byte[]> consumer = new TestingMetricsInterceptingAsyncConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData);
|
||||
INTERCEPTING_CONSUMERS.add(consumer);
|
||||
|
||||
super.wrapConsumer(consumer, config, streamsRebalanceData);
|
||||
|
@ -613,10 +613,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TestingMetricsInterceptingAsynConsumer<K, V> extends AsyncKafkaConsumer<K, V> implements TestingMetricsInterceptor {
|
||||
public static class TestingMetricsInterceptingAsyncConsumer<K, V> extends AsyncKafkaConsumer<K, V> implements TestingMetricsInterceptor {
|
||||
private final List<KafkaMetric> passedMetrics = new ArrayList<>();
|
||||
|
||||
public TestingMetricsInterceptingAsynConsumer(
|
||||
public TestingMetricsInterceptingAsyncConsumer(
|
||||
final Map<String, Object> configs,
|
||||
final Deserializer<K> keyDeserializer,
|
||||
final Deserializer<V> valueDeserializer,
|
||||
|
|
|
@ -16,18 +16,26 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.tests;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
||||
import org.apache.kafka.streams.internals.ConsumerWrapper;
|
||||
import org.apache.kafka.streams.kstream.KGroupedStream;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -38,6 +46,8 @@ public class EosTestClient extends SmokeTestUtil {
|
|||
private final Properties properties;
|
||||
private final boolean withRepartitioning;
|
||||
private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
|
||||
private static final List<CapturingConsumerWrapper> CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>();
|
||||
private int minGroupEpoch = 0;
|
||||
|
||||
private KafkaStreams streams;
|
||||
private boolean uncaughtException;
|
||||
|
@ -46,6 +56,8 @@ public class EosTestClient extends SmokeTestUtil {
|
|||
super();
|
||||
this.properties = properties;
|
||||
this.withRepartitioning = withRepartitioning;
|
||||
this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, CapturingConsumerWrapper.class);
|
||||
CAPTURING_CONSUMER_WRAPPERS.clear();
|
||||
}
|
||||
|
||||
private volatile boolean isRunning = true;
|
||||
|
@ -95,7 +107,8 @@ public class EosTestClient extends SmokeTestUtil {
|
|||
streams.close(Duration.ofSeconds(60_000L));
|
||||
streams = null;
|
||||
}
|
||||
sleep(1000);
|
||||
logGroupEpochBump();
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,4 +185,41 @@ public class EosTestClient extends SmokeTestUtil {
|
|||
System.err.flush();
|
||||
}
|
||||
}
|
||||
|
||||
// Used in the streams group protocol
|
||||
// Detect a completed rebalance by checking if the group epoch has been bumped for all threads.
|
||||
private void logGroupEpochBump() {
|
||||
int currentMin = Integer.MAX_VALUE;
|
||||
for (final CapturingConsumerWrapper consumer : CAPTURING_CONSUMER_WRAPPERS) {
|
||||
final int groupEpoch = consumer.lastSeenGroupEpoch;
|
||||
if (groupEpoch < currentMin) {
|
||||
currentMin = groupEpoch;
|
||||
}
|
||||
}
|
||||
if (currentMin > minGroupEpoch) {
|
||||
System.out.println("MemberEpochBump");
|
||||
}
|
||||
if (currentMin != Integer.MAX_VALUE) {
|
||||
minGroupEpoch = currentMin;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CapturingConsumerWrapper extends ConsumerWrapper {
|
||||
|
||||
public volatile int lastSeenGroupEpoch = 0;
|
||||
|
||||
@Override
|
||||
public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> delegate, final Map<String, Object> config, final Optional<StreamsRebalanceData> streamsRebalanceData) {
|
||||
CAPTURING_CONSUMER_WRAPPERS.add(this);
|
||||
super.wrapConsumer(delegate, config, streamsRebalanceData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerGroupMetadata groupMetadata() {
|
||||
final ConsumerGroupMetadata consumerGroupMetadata = delegate.groupMetadata();
|
||||
lastSeenGroupEpoch = consumerGroupMetadata.generationId();
|
||||
return consumerGroupMetadata;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
|
|||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
||||
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.StreamsGroupDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
|
@ -48,6 +49,7 @@ import java.util.Iterator;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -171,7 +173,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static void verify(final String kafka, final boolean withRepartitioning) {
|
||||
public static void verify(final String kafka, final boolean withRepartitioning, final String groupProtocol) {
|
||||
final Properties props = new Properties();
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
|
@ -189,7 +191,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
|
||||
final Map<TopicPartition, Long> committedOffsets;
|
||||
try (final Admin adminClient = Admin.create(props)) {
|
||||
ensureStreamsApplicationDown(adminClient);
|
||||
ensureStreamsApplicationDown(adminClient, groupProtocol);
|
||||
|
||||
committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
|
||||
}
|
||||
|
@ -248,23 +250,33 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
System.out.flush();
|
||||
}
|
||||
|
||||
private static void ensureStreamsApplicationDown(final Admin adminClient) {
|
||||
|
||||
private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) {
|
||||
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
ConsumerGroupDescription description;
|
||||
boolean isEmpty;
|
||||
do {
|
||||
description = getConsumerGroupDescription(adminClient);
|
||||
|
||||
if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) {
|
||||
throw new RuntimeException(
|
||||
"Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " +
|
||||
"Group: " + description
|
||||
);
|
||||
if (Objects.equals(groupProtocol, "streams")) {
|
||||
final StreamsGroupDescription description = getStreamsGroupDescription(adminClient);
|
||||
isEmpty = description.members().isEmpty();
|
||||
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
|
||||
throwNotDownException(description);
|
||||
}
|
||||
} else {
|
||||
final ConsumerGroupDescription description = getConsumerGroupDescription(adminClient);
|
||||
isEmpty = description.members().isEmpty();
|
||||
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
|
||||
throwNotDownException(description);
|
||||
}
|
||||
}
|
||||
sleep(1000L);
|
||||
} while (!description.members().isEmpty());
|
||||
} while (!isEmpty);
|
||||
}
|
||||
|
||||
private static void throwNotDownException(final Object description) {
|
||||
throw new RuntimeException(
|
||||
"Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " +
|
||||
"Group: " + description
|
||||
);
|
||||
}
|
||||
|
||||
private static Map<TopicPartition, Long> getCommittedOffsets(final Admin adminClient,
|
||||
final boolean withRepartitioning) {
|
||||
|
@ -636,7 +648,6 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
return partitions;
|
||||
}
|
||||
|
||||
|
||||
private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) {
|
||||
final ConsumerGroupDescription description;
|
||||
try {
|
||||
|
@ -650,4 +661,18 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
return description;
|
||||
}
|
||||
|
||||
private static StreamsGroupDescription getStreamsGroupDescription(final Admin adminClient) {
|
||||
final StreamsGroupDescription description;
|
||||
try {
|
||||
description = adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID))
|
||||
.describedGroups()
|
||||
.get(EosTestClient.APP_ID)
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
} catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException("Unexpected Exception getting group description", e);
|
||||
}
|
||||
return description;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,10 +76,10 @@ public class StreamsEosTest {
|
|||
new EosTestClient(streamsProperties, true).start();
|
||||
break;
|
||||
case "verify":
|
||||
EosTestDriver.verify(kafka, false);
|
||||
EosTestDriver.verify(kafka, false, streamsProperties.getProperty("group.protocol"));
|
||||
break;
|
||||
case "verify-complex":
|
||||
EosTestDriver.verify(kafka, true);
|
||||
EosTestDriver.verify(kafka, true, streamsProperties.getProperty("group.protocol"));
|
||||
break;
|
||||
default:
|
||||
System.out.println("unknown command: " + command);
|
||||
|
|
|
@ -387,18 +387,20 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
|
|||
|
||||
clean_node_enabled = True
|
||||
|
||||
def __init__(self, test_context, kafka, command):
|
||||
def __init__(self, test_context, kafka, command, group_protocol):
|
||||
super(StreamsEosTestBaseService, self).__init__(test_context,
|
||||
kafka,
|
||||
"org.apache.kafka.streams.tests.StreamsEosTest",
|
||||
command)
|
||||
self.group_protocol = group_protocol
|
||||
|
||||
def prop_file(self):
|
||||
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
|
||||
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
|
||||
streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
|
||||
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
|
||||
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
|
||||
"session.timeout.ms": "10000", # set back to 10s for tests. See KIP-735
|
||||
"group.protocol": self.group_protocol
|
||||
}
|
||||
|
||||
cfg = KafkaConfig(**properties)
|
||||
|
@ -443,24 +445,24 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
|
|||
|
||||
class StreamsEosTestDriverService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
|
||||
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run", "classic")
|
||||
|
||||
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process", group_protocol)
|
||||
|
||||
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex", group_protocol)
|
||||
|
||||
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify", group_protocol)
|
||||
|
||||
|
||||
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex", group_protocol)
|
||||
|
||||
|
||||
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
|
||||
|
|
|
@ -15,18 +15,18 @@
|
|||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.kafka import quorum
|
||||
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
|
||||
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
|
||||
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
|
||||
|
||||
class StreamsEosTest(KafkaTest):
|
||||
class StreamsEosTest(BaseStreamsTest):
|
||||
"""
|
||||
Test of Kafka Streams exactly-once semantics
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||
super(StreamsEosTest, self).__init__(test_context, num_controllers=1, num_brokers=3, topics={
|
||||
'data': {'partitions': 5, 'replication-factor': 2},
|
||||
'echo': {'partitions': 5, 'replication-factor': 2},
|
||||
'min': {'partitions': 5, 'replication-factor': 2},
|
||||
|
@ -38,20 +38,24 @@ class StreamsEosTest(KafkaTest):
|
|||
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
|
||||
self.test_context = test_context
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_rebalance_simple(self, metadata_quorum):
|
||||
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_rebalance_complex(self, metadata_quorum):
|
||||
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_rebalance_simple(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_rebalance_complex(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
|
||||
def run_rebalance(self, processor1, processor2, processor3, verifier):
|
||||
"""
|
||||
|
@ -79,20 +83,24 @@ class StreamsEosTest(KafkaTest):
|
|||
|
||||
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_failure_and_recovery(self, metadata_quorum):
|
||||
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_failure_and_recovery_complex(self, metadata_quorum):
|
||||
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_failure_and_recovery(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_failure_and_recovery_complex(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
|
||||
def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
|
||||
"""
|
||||
|
@ -160,10 +168,16 @@ class StreamsEosTest(KafkaTest):
|
|||
self.wait_for_startup(monitor1, keep_alive_processor1)
|
||||
|
||||
def wait_for_startup(self, monitor, processor):
|
||||
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
|
||||
if self.group_protocol == "classic":
|
||||
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
|
||||
else:
|
||||
# In the streams group protocol, not all members will take part in the rebalance.
|
||||
# We can indirectly observe the progress of the group by seeing the member epoch being bumped.
|
||||
self.wait_for(monitor, processor, "MemberEpochBump")
|
||||
self.wait_for(monitor, processor, "processed [0-9]* records from topic")
|
||||
|
||||
def wait_for(self, monitor, processor, output):
|
||||
@staticmethod
|
||||
def wait_for(monitor, processor, output):
|
||||
monitor.wait_until(output,
|
||||
timeout_sec=480,
|
||||
err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))
|
||||
|
|
Loading…
Reference in New Issue