KAFKA-19423: Deflake streams_broker_bounce_test (#20005)

The test is resizing the `__consumer_offset` topic after broker start.
This seems to be completely unsupported. The group coordinator fetches
the number of partitions for the consumer offset topic once and never
updates it. So we can be in a state where two brokers have a different
understanding of how `__consumer_offsets` are partitioned.

The result in this test can be that two group coordinators both think
they own a certain group. The test is resizing `__consumer_offsets`
right after start-up from 3 to 50. Before the broker bounce, the GC
operates on only three partitions (0-2). During the bounce, we get new
brokers that operate on (0-49). This means that two brokers can both
think, at the same time, that they own a group.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-06-20 20:27:35 +02:00 committed by GitHub
parent 4690527fab
commit 88a73c35e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 7 additions and 4 deletions

View File

@ -112,7 +112,7 @@ class StreamsBrokerBounceTest(Test):
'configs': {"min.insync.replicas": 2} },
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": 2} },
'__consumer_offsets' : { 'partitions': 50, 'replication-factor': self.replication,
'__consumer_offsets' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": 2} }
}
@ -152,7 +152,10 @@ class StreamsBrokerBounceTest(Test):
def setup_system(self, start_processor=True, num_threads=3, group_protocol='classic'):
# Setup phase
use_streams_groups = True if group_protocol == 'streams' else False
self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics, use_streams_groups=use_streams_groups)
self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics, server_prop_overrides=[
["offsets.topic.num.partitions", self.partitions],
["offsets.topic.replication.factor", self.replication]
], use_streams_groups=use_streams_groups)
self.kafka.start()
# allow some time for topics to be created
@ -285,7 +288,7 @@ class StreamsBrokerBounceTest(Test):
# Set min.insync.replicas to 1 because in the last stage of the test there is only one broker left.
# Otherwise the last offset commit will never succeed and time out and potentially take longer as
# duration passed to the close method of the Kafka Streams client.
self.topics['__consumer_offsets'] = { 'partitions': 50, 'replication-factor': self.replication,
self.topics['__consumer_offsets'] = { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": 1} }
self.setup_system(group_protocol=group_protocol)