KAFKA-19202: Enable KIP-1071 in streams_eos_test (#19700)
CI / build (push) Waiting to run Details

Enable next system test with KIP-1071.

Some of the validation inside the test did not make sense for KIP-1071.
This is because in KIP-1071, if a member leaves or joins the group, not
all members may enter a REBALANCING state. We use the wrapper introduced
in   [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271)
to print a log line whenever the member epoch is bumped, which is  the
only way a member can "indirectly" observe that other members  are
rebalancing.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-05-17 21:20:47 +02:00 committed by GitHub
parent bff1602df3
commit f26974b16d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 160 additions and 66 deletions

View File

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -53,9 +54,11 @@ public record ConfiguredSubtopology(Set<String> sourceTopics,
.setSourceTopics(sourceTopics.stream().sorted().toList())
.setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().toList())
.setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList())
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList())
.setStateChangelogTopics(stateChangelogTopics.values().stream()
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().toList());
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo)
.sorted(Comparator.comparing(StreamsGroupDescribeResponseData.TopicInfo::name)).toList());
}
}

View File

@ -576,7 +576,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
public static class TestConsumerWrapper extends ConsumerWrapper {
@Override
public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> delegate, final Map<String, Object> config, final Optional<StreamsRebalanceData> streamsRebalanceData) {
final TestingMetricsInterceptingAsynConsumer<byte[], byte[]> consumer = new TestingMetricsInterceptingAsynConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData);
final TestingMetricsInterceptingAsyncConsumer<byte[], byte[]> consumer = new TestingMetricsInterceptingAsyncConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer(), streamsRebalanceData);
INTERCEPTING_CONSUMERS.add(consumer);
super.wrapConsumer(consumer, config, streamsRebalanceData);
@ -613,10 +613,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
}
}
public static class TestingMetricsInterceptingAsynConsumer<K, V> extends AsyncKafkaConsumer<K, V> implements TestingMetricsInterceptor {
public static class TestingMetricsInterceptingAsyncConsumer<K, V> extends AsyncKafkaConsumer<K, V> implements TestingMetricsInterceptor {
private final List<KafkaMetric> passedMetrics = new ArrayList<>();
public TestingMetricsInterceptingAsynConsumer(
public TestingMetricsInterceptingAsyncConsumer(
final Map<String, Object> configs,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,

View File

@ -16,18 +16,26 @@
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.internals.ConsumerWrapper;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -38,6 +46,8 @@ public class EosTestClient extends SmokeTestUtil {
private final Properties properties;
private final boolean withRepartitioning;
private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
private static final List<CapturingConsumerWrapper> CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>();
private int minGroupEpoch = 0;
private KafkaStreams streams;
private boolean uncaughtException;
@ -46,6 +56,8 @@ public class EosTestClient extends SmokeTestUtil {
super();
this.properties = properties;
this.withRepartitioning = withRepartitioning;
this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, CapturingConsumerWrapper.class);
CAPTURING_CONSUMER_WRAPPERS.clear();
}
private volatile boolean isRunning = true;
@ -95,7 +107,8 @@ public class EosTestClient extends SmokeTestUtil {
streams.close(Duration.ofSeconds(60_000L));
streams = null;
}
sleep(1000);
logGroupEpochBump();
sleep(100);
}
}
@ -172,4 +185,41 @@ public class EosTestClient extends SmokeTestUtil {
System.err.flush();
}
}
// Used in the streams group protocol
// Detect a completed rebalance by checking if the group epoch has been bumped for all threads.
private void logGroupEpochBump() {
int currentMin = Integer.MAX_VALUE;
for (final CapturingConsumerWrapper consumer : CAPTURING_CONSUMER_WRAPPERS) {
final int groupEpoch = consumer.lastSeenGroupEpoch;
if (groupEpoch < currentMin) {
currentMin = groupEpoch;
}
}
if (currentMin > minGroupEpoch) {
System.out.println("MemberEpochBump");
}
if (currentMin != Integer.MAX_VALUE) {
minGroupEpoch = currentMin;
}
}
public static class CapturingConsumerWrapper extends ConsumerWrapper {
public volatile int lastSeenGroupEpoch = 0;
@Override
public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> delegate, final Map<String, Object> config, final Optional<StreamsRebalanceData> streamsRebalanceData) {
CAPTURING_CONSUMER_WRAPPERS.add(this);
super.wrapConsumer(delegate, config, streamsRebalanceData);
}
@Override
public ConsumerGroupMetadata groupMetadata() {
final ConsumerGroupMetadata consumerGroupMetadata = delegate.groupMetadata();
lastSeenGroupEpoch = consumerGroupMetadata.generationId();
return consumerGroupMetadata;
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -48,6 +49,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -171,7 +173,7 @@ public class EosTestDriver extends SmokeTestUtil {
}
}
public static void verify(final String kafka, final boolean withRepartitioning) {
public static void verify(final String kafka, final boolean withRepartitioning, final String groupProtocol) {
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@ -189,7 +191,7 @@ public class EosTestDriver extends SmokeTestUtil {
final Map<TopicPartition, Long> committedOffsets;
try (final Admin adminClient = Admin.create(props)) {
ensureStreamsApplicationDown(adminClient);
ensureStreamsApplicationDown(adminClient, groupProtocol);
committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
}
@ -248,23 +250,33 @@ public class EosTestDriver extends SmokeTestUtil {
System.out.flush();
}
private static void ensureStreamsApplicationDown(final Admin adminClient) {
private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) {
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
ConsumerGroupDescription description;
boolean isEmpty;
do {
description = getConsumerGroupDescription(adminClient);
if (Objects.equals(groupProtocol, "streams")) {
final StreamsGroupDescription description = getStreamsGroupDescription(adminClient);
isEmpty = description.members().isEmpty();
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
throwNotDownException(description);
}
} else {
final ConsumerGroupDescription description = getConsumerGroupDescription(adminClient);
isEmpty = description.members().isEmpty();
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
throwNotDownException(description);
}
}
sleep(1000L);
} while (!isEmpty);
}
if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) {
private static void throwNotDownException(final Object description) {
throw new RuntimeException(
"Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " +
"Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " +
"Group: " + description
);
}
sleep(1000L);
} while (!description.members().isEmpty());
}
private static Map<TopicPartition, Long> getCommittedOffsets(final Admin adminClient,
final boolean withRepartitioning) {
@ -636,7 +648,6 @@ public class EosTestDriver extends SmokeTestUtil {
return partitions;
}
private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) {
final ConsumerGroupDescription description;
try {
@ -650,4 +661,18 @@ public class EosTestDriver extends SmokeTestUtil {
}
return description;
}
private static StreamsGroupDescription getStreamsGroupDescription(final Admin adminClient) {
final StreamsGroupDescription description;
try {
description = adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID))
.describedGroups()
.get(EosTestClient.APP_ID)
.get(10, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
e.printStackTrace();
throw new RuntimeException("Unexpected Exception getting group description", e);
}
return description;
}
}

View File

@ -76,10 +76,10 @@ public class StreamsEosTest {
new EosTestClient(streamsProperties, true).start();
break;
case "verify":
EosTestDriver.verify(kafka, false);
EosTestDriver.verify(kafka, false, streamsProperties.getProperty("group.protocol"));
break;
case "verify-complex":
EosTestDriver.verify(kafka, true);
EosTestDriver.verify(kafka, true, streamsProperties.getProperty("group.protocol"));
break;
default:
System.out.println("unknown command: " + command);

View File

@ -387,18 +387,20 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
clean_node_enabled = True
def __init__(self, test_context, kafka, command):
def __init__(self, test_context, kafka, command, group_protocol):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
self.group_protocol = group_protocol
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
"acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment
"session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735
"session.timeout.ms": "10000", # set back to 10s for tests. See KIP-735
"group.protocol": self.group_protocol
}
cfg = KafkaConfig(**properties)
@ -443,24 +445,24 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run", "classic")
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
def __init__(self, test_context, kafka, group_protocol):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process", group_protocol)
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex")
def __init__(self, test_context, kafka, group_protocol):
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex", group_protocol)
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
def __init__(self, test_context, kafka, group_protocol):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify", group_protocol)
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex")
def __init__(self, test_context, kafka, group_protocol):
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex", group_protocol)
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):

View File

@ -15,18 +15,18 @@
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.kafka import quorum
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
class StreamsEosTest(KafkaTest):
class StreamsEosTest(BaseStreamsTest):
"""
Test of Kafka Streams exactly-once semantics
"""
def __init__(self, test_context):
super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
super(StreamsEosTest, self).__init__(test_context, num_controllers=1, num_brokers=3, topics={
'data': {'partitions': 5, 'replication-factor': 2},
'echo': {'partitions': 5, 'replication-factor': 2},
'min': {'partitions': 5, 'replication-factor': 2},
@ -38,20 +38,24 @@ class StreamsEosTest(KafkaTest):
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.test_context = test_context
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_simple(self, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_complex(self, metadata_quorum):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_rebalance_simple(self, metadata_quorum, group_protocol):
self.group_protocol = group_protocol
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
@cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_rebalance_complex(self, metadata_quorum, group_protocol):
self.group_protocol = group_protocol
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
def run_rebalance(self, processor1, processor2, processor3, verifier):
"""
@ -79,20 +83,24 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery(self, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery_complex(self, metadata_quorum):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_failure_and_recovery(self, metadata_quorum, group_protocol):
self.group_protocol = group_protocol
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
@cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.combined_kraft],
group_protocol=["classic", "streams"])
def test_failure_and_recovery_complex(self, metadata_quorum, group_protocol):
self.group_protocol = group_protocol
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
"""
@ -160,10 +168,16 @@ class StreamsEosTest(KafkaTest):
self.wait_for_startup(monitor1, keep_alive_processor1)
def wait_for_startup(self, monitor, processor):
if self.group_protocol == "classic":
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
else:
# In the streams group protocol, not all members will take part in the rebalance.
# We can indirectly observe the progress of the group by seeing the member epoch being bumped.
self.wait_for(monitor, processor, "MemberEpochBump")
self.wait_for(monitor, processor, "processed [0-9]* records from topic")
def wait_for(self, monitor, processor, output):
@staticmethod
def wait_for(monitor, processor, output):
monitor.wait_until(output,
timeout_sec=480,
err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))