KAFKA-12455: Fix OffsetValidationTest.test_broker_rolling_bounce failure with Raft (#10322)

This test was failing when used with a Raft-based metadata quorum but succeeding with a
ZooKeeper-based quorum. This patch increases the consumers' session timeouts to 30 seconds,
which fixes the Raft case and also eliminates flakiness that has historically existed in the
Zookeeper case.

This patch also fixes a minor logging bug in RaftReplicaManager.endMetadataChangeDeferral() that
was discovered during the debugging of this issue, and it adds an extra logging statement in RaftReplicaManager.handleMetadataRecords() when a single metadata batch is applied to mirror
the same logging statement that occurs when deferred metadata changes are applied.

In the Raft system test case the consumer was sometimes receiving a METADATA response with just
1 alive broker, and then when that broker rolled the consumer wouldn't know about any alive nodes.
It would have to wait until the broker returned before it could reconnect, and by that time the group
coordinator on the second broker would have timed-out the client and initiated a group rebalance. The
test explicitly checks that no rebalances occur, so the test would fail. It turns out that the reason why
the ZooKeeper configuration wasn't seeing rebalances was just plain luck. The brokers' metadata
caches in the ZooKeeper configuration show 1 alive broker even more frequently than the Raft
configuration does. If we tweak the metadata.max.age.ms value on the consumers we can easily
get the ZooKeeper test to fail, and in fact this system test has historically been flaky for the
ZooKeeper configuration. We can get the test to pass by setting session.timeout.ms=30000 (which
is longer than the roll time of any broker), or we can increase the broker count so that the client
never sees a METADATA response with just a single alive broker and therefore never loses contact
with the cluster for an extended period of time. We have plenty of system tests with 3+ brokers, so
we choose to keep this test with 2 brokers and increase the session timeout.

Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Ron Dagostino 2021-03-16 16:57:29 -04:00 committed by GitHub
parent 36c5ad4c6a
commit b96fc7892f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 10 deletions

View File

@ -132,8 +132,8 @@ class RaftReplicaManager(config: KafkaConfig,
def endMetadataChangeDeferral(onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): Unit = {
val startMs = time.milliseconds()
val partitionsMadeFollower = mutable.Set[Partition]()
val partitionsMadeLeader = mutable.Set[Partition]()
var partitionsMadeFollower = Set.empty[Partition]
var partitionsMadeLeader = Set.empty[Partition]
replicaStateChangeLock synchronized {
stateChangeLogger.info(s"Applying deferred metadata changes")
val highWatermarkCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
@ -156,14 +156,10 @@ class RaftReplicaManager(config: KafkaConfig,
}
}
val partitionsMadeLeader = if (leaderPartitionStates.nonEmpty)
delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
else
Set.empty[Partition]
val partitionsMadeFollower = if (followerPartitionStates.nonEmpty)
delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
else
Set.empty[Partition]
if (leaderPartitionStates.nonEmpty)
partitionsMadeLeader = delegate.makeLeaders(partitionsAlreadyExisting, leaderPartitionStates, highWatermarkCheckpoints, None)
if (followerPartitionStates.nonEmpty)
partitionsMadeFollower = delegate.makeFollowers(partitionsAlreadyExisting, brokers, followerPartitionStates, highWatermarkCheckpoints, None)
// We need to transition anything that hasn't transitioned from Deferred to Offline to the Online state.
deferredPartitionsIterator.foreach { deferredPartition =>
@ -331,6 +327,8 @@ class RaftReplicaManager(config: KafkaConfig,
replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
stateChangeLogger.info(s"Metadata batch $metadataOffset: applied ${partitionsBecomeLeader.size + partitionsBecomeFollower.size} partitions: " +
s"${partitionsBecomeLeader.size} leader(s) and ${partitionsBecomeFollower.size} follower(s)")
}
// TODO: we should move aside log directories which have been deleted rather than
// purging them from the disk immediately.

View File

@ -93,6 +93,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
partition = TopicPartition(self.TOPIC, 0)
producer = self.setup_producer(self.TOPIC)
# The consumers' session timeouts must exceed the time it takes for a broker to roll. Consumers are likely
# to see cluster metadata consisting of just a single alive broker in the case where the cluster has just 2
# brokers and the cluster is rolling (which is what is happening here). When the consumer sees a single alive
# broker, and then that broker rolls, the consumer will be unable to connect to the cluster until that broker
# completes its roll. In the meantime, the consumer group will move to the group coordinator on the other
# broker, and that coordinator will fail the consumer and trigger a group rebalance if its session times out.
# This test is asserting that no rebalances occur, so we increase the session timeout for this to be the case.
self.session_timeout_sec = 30
consumer = self.setup_consumer(self.TOPIC)
producer.start()