mirror of https://github.com/apache/kafka.git
MINOR: KRaftMetadataCache.getPartitionInfo must set all relevant fields
Fix a case where KRaftMetadataCache.getPartitionInfo was not setting all the PartitionInfo fields it should have been. Add a regression test. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
2c1cf03a89
commit
7159f6c1a8
|
@ -228,6 +228,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
|
|||
flatMap(_.node(listenerName.value()).asScala).toSeq
|
||||
}
|
||||
|
||||
// Does NOT include offline replica metadata
|
||||
override def getPartitionInfo(topicName: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
|
||||
Option(_currentImage.topics().getTopic(topicName)).
|
||||
flatMap(topic => Option(topic.partitions().get(partitionId))).
|
||||
|
@ -238,7 +239,8 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
|
|||
setLeader(partition.leader).
|
||||
setLeaderEpoch(partition.leaderEpoch).
|
||||
setIsr(Replicas.toList(partition.isr)).
|
||||
setZkVersion(partition.partitionEpoch)))
|
||||
setZkVersion(partition.partitionEpoch).
|
||||
setReplicas(Replicas.toList(partition.replicas))))
|
||||
}
|
||||
|
||||
override def numPartitions(topicName: String): Option[Int] = {
|
||||
|
|
|
@ -706,4 +706,63 @@ class MetadataCacheTest {
|
|||
assertEquals(100L, metadataCache.getAliveBrokerEpoch(0).getOrElse(-1L))
|
||||
assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).getOrElse(-1L))
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(Array("cacheProvider"))
|
||||
def testGetPartitionInfo(cache: MetadataCache): Unit = {
|
||||
val topic = "topic"
|
||||
val partitionIndex = 0
|
||||
val controllerEpoch = 1
|
||||
val leader = 0
|
||||
val leaderEpoch = 0
|
||||
val isr = asList[Integer](2, 3, 0)
|
||||
val zkVersion = 3
|
||||
val replicas = asList[Integer](2, 3, 0, 1, 4)
|
||||
val offlineReplicas = asList[Integer](0)
|
||||
|
||||
val partitionStates = Seq(new UpdateMetadataPartitionState()
|
||||
.setTopicName(topic)
|
||||
.setPartitionIndex(partitionIndex)
|
||||
.setControllerEpoch(controllerEpoch)
|
||||
.setLeader(leader)
|
||||
.setLeaderEpoch(leaderEpoch)
|
||||
.setIsr(isr)
|
||||
.setZkVersion(zkVersion)
|
||||
.setReplicas(replicas)
|
||||
.setOfflineReplicas(offlineReplicas))
|
||||
|
||||
val version = ApiKeys.UPDATE_METADATA.latestVersion
|
||||
|
||||
val controllerId = 2
|
||||
val securityProtocol = SecurityProtocol.PLAINTEXT
|
||||
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
|
||||
val brokers = Seq(new UpdateMetadataBroker()
|
||||
.setId(0)
|
||||
.setRack("rack1")
|
||||
.setEndpoints(Seq(new UpdateMetadataEndpoint()
|
||||
.setHost("foo")
|
||||
.setPort(9092)
|
||||
.setSecurityProtocol(securityProtocol.id)
|
||||
.setListener(listenerName.value)).asJava))
|
||||
val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, brokerEpoch,
|
||||
partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(), false).build()
|
||||
MetadataCacheTest.updateCache(cache, updateMetadataRequest)
|
||||
|
||||
val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
|
||||
assertEquals(topic, partitionState.topicName())
|
||||
assertEquals(partitionIndex, partitionState.partitionIndex())
|
||||
if (cache.isInstanceOf[ZkMetadataCache]) {
|
||||
assertEquals(controllerEpoch, partitionState.controllerEpoch())
|
||||
} else {
|
||||
assertEquals(-1, partitionState.controllerEpoch())
|
||||
}
|
||||
assertEquals(leader, partitionState.leader())
|
||||
assertEquals(leaderEpoch, partitionState.leaderEpoch())
|
||||
assertEquals(isr, partitionState.isr())
|
||||
assertEquals(zkVersion, partitionState.zkVersion())
|
||||
assertEquals(replicas, partitionState.replicas())
|
||||
if (cache.isInstanceOf[ZkMetadataCache]) {
|
||||
assertEquals(offlineReplicas, partitionState.offlineReplicas())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue