KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test (#15661)

The current AssignmentValidationTest only tests EAGER assignment protocol and does not support incremental assignment like CooperativeStickyAssignor and consumer protocol. Therefore in the ConsumerEventHandler, I subclassed the existing handler overridden the assigned and revoke event handling methods, to permit incremental changes to the current assignments.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Kirk True <ktrue@confluent.io>
This commit is contained in:
Philip Nee 2024-04-10 09:52:05 -07:00 committed by GitHub
parent e2e2f82f2b
commit dc9fbe453c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 8 deletions

View File

@ -19,7 +19,7 @@ import os
from ducktape.services.background_thread import BackgroundThreadService from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.verifiable_client import VerifiableClientMixin from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0 from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0
@ -135,6 +135,28 @@ class ConsumerEventHandler(object):
else: else:
return None return None
# This needs to be used for cooperative and consumer protocol
class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
def __init__(self, node, verify_offsets, idx):
super().__init__(node, verify_offsets, idx)
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
def handle_partitions_assigned(self, event):
self.assigned_count += 1
self.state = ConsumerState.Joined
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.append(TopicPartition(topic, partition))
class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.VerifiableConsumer for use in """This service wraps org.apache.kafka.tools.VerifiableConsumer for use in
@ -207,7 +229,10 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
def _worker(self, idx, node): def _worker(self, idx, node):
with self.lock: with self.lock:
if node not in self.event_handlers: if node not in self.event_handlers:
self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx) if self._isEager():
self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx)
else:
self.event_handlers[node] = IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
handler = self.event_handlers[node] handler = self.event_handlers[node]
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False) node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
@ -263,6 +288,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou
else: else:
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event)) self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))
def _isEager(self):
return self.group_protocol == consumer_group.classic_group_protocol and self.assignment_strategy != "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
def _update_global_position(self, consumed_event, node): def _update_global_position(self, consumed_event, node):
for consumed_partition in consumed_event["partitions"]: for consumed_partition in consumed_event["partitions"]:
tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"]) tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])

View File

@ -549,14 +549,16 @@ class AssignmentValidationTest(VerifiableConsumerTest):
@matrix( @matrix(
assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor",
"org.apache.kafka.clients.consumer.StickyAssignor"], "org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.zk, quorum.isolated_kraft], metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False] use_new_coordinator=[False]
) )
@matrix( @matrix(
assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor",
"org.apache.kafka.clients.consumer.StickyAssignor"], "org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.isolated_kraft], metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True], use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol], group_protocol=[consumer_group.classic_group_protocol],
@ -584,7 +586,8 @@ class AssignmentValidationTest(VerifiableConsumerTest):
for num_started, node in enumerate(consumer.nodes, 1): for num_started, node in enumerate(consumer.nodes, 1):
consumer.start_node(node) consumer.start_node(node)
self.await_members(consumer, num_started) self.await_members(consumer, num_started)
assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()), \ wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()),
"expected valid assignments of %d partitions when num_started %d: %s" % \ timeout_sec=15,
(self.NUM_PARTITIONS, num_started, \ err_msg="expected valid assignments of %d partitions when num_started %d: %s" % \
[(str(node.account), a) for node, a in consumer.current_assignment().items()]) (self.NUM_PARTITIONS, num_started, \
[(str(node.account), a) for node, a in consumer.current_assignment().items()]))