mirror of https://github.com/apache/kafka.git
MINOR: fix flaky sys test for static membership (#20594)
Fixing flakiness seen on this test, where static consumers could not join as expected after shutting down previous consumers with the same instance ID, and logs showed `UnreleasedInstanceIdException`. I expect the flakiness could happen if a consumer with instanceId1 is closed but not effectively removed from the group due to leave group fail/delayed (the leave group request is sent on a best effort, not retried if fails or times out). Fix by adding check to ensure the group is empty before attempting to reuse the instance ID Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
857b1e92cc
commit
9e9d2a23ef
|
@ -313,7 +313,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
num_rebalances = consumer.num_rebalances()
|
num_rebalances = consumer.num_rebalances()
|
||||||
conflict_consumer.start()
|
conflict_consumer.start()
|
||||||
if group_protocol == consumer_group.classic_group_protocol:
|
if group_protocol == consumer_group.classic_group_protocol:
|
||||||
# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail.
|
# Classic protocol: conflicting members should join, and the initial ones with conflicting instance id should fail.
|
||||||
self.await_members(conflict_consumer, num_conflict_consumers)
|
self.await_members(conflict_consumer, num_conflict_consumers)
|
||||||
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
|
self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers)
|
||||||
|
|
||||||
|
@ -332,6 +332,11 @@ class OffsetValidationTest(VerifiableConsumerTest):
|
||||||
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
|
wait_until(lambda: len(consumer.dead_nodes()) == len(consumer.nodes),
|
||||||
timeout_sec=60,
|
timeout_sec=60,
|
||||||
err_msg="Timed out waiting for the consumer to shutdown")
|
err_msg="Timed out waiting for the consumer to shutdown")
|
||||||
|
# Wait until the group becomes empty to ensure the instance ID is released.
|
||||||
|
# We use the 50-second timeout because the consumer session timeout is 45 seconds.
|
||||||
|
wait_until(lambda: self.group_id in self.kafka.list_consumer_groups(state="empty"),
|
||||||
|
timeout_sec=50,
|
||||||
|
err_msg="Timed out waiting for the consumers to be removed from the group.")
|
||||||
conflict_consumer.start()
|
conflict_consumer.start()
|
||||||
self.await_members(conflict_consumer, num_conflict_consumers)
|
self.await_members(conflict_consumer, num_conflict_consumers)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue