mirror of https://github.com/apache/kafka.git
KAFKA-18486 Migrate ReplicaManagerTest RemoteFetchExpiresPerSecMetric and RemoteLogReaderMetrics with applyDelta (#19960)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
Replace `becomeLeaderOrFollower` with `applyDelta` in method RemoteFetchExpiresPerSecMetric and RemoteLogReaderMetrics Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
991a10c19f
commit
94807bcd15
|
@ -3727,25 +3727,11 @@ class ReplicaManagerTest {
|
||||||
try {
|
try {
|
||||||
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
||||||
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||||
val partition0Replicas = Seq[Integer](0, 1).asJava
|
|
||||||
val topicIds = Map(tp0.topic -> topicId).asJava
|
|
||||||
val leaderEpoch = 0
|
val leaderEpoch = 0
|
||||||
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
|
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
|
||||||
Seq(
|
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
|
||||||
new LeaderAndIsrRequest.PartitionState()
|
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
|
||||||
.setTopicName(tp0.topic)
|
|
||||||
.setPartitionIndex(tp0.partition)
|
|
||||||
.setControllerEpoch(0)
|
|
||||||
.setLeader(leaderEpoch)
|
|
||||||
.setLeaderEpoch(0)
|
|
||||||
.setIsr(partition0Replicas)
|
|
||||||
.setPartitionEpoch(0)
|
|
||||||
.setReplicas(partition0Replicas)
|
|
||||||
.setIsNew(true)
|
|
||||||
).asJava,
|
|
||||||
topicIds,
|
|
||||||
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
|
|
||||||
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
|
|
||||||
|
|
||||||
val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty)
|
val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty)
|
||||||
val fetchOffset = 1
|
val fetchOffset = 1
|
||||||
|
@ -3840,25 +3826,11 @@ class ReplicaManagerTest {
|
||||||
try {
|
try {
|
||||||
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
|
||||||
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
|
||||||
val partition0Replicas = Seq[Integer](0, 1).asJava
|
|
||||||
val topicIds = Map(tp0.topic -> topicId).asJava
|
|
||||||
val leaderEpoch = 0
|
val leaderEpoch = 0
|
||||||
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
|
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch)
|
||||||
Seq(
|
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
|
||||||
new LeaderAndIsrRequest.PartitionState()
|
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
|
||||||
.setTopicName(tp0.topic)
|
|
||||||
.setPartitionIndex(tp0.partition)
|
|
||||||
.setControllerEpoch(0)
|
|
||||||
.setLeader(leaderEpoch)
|
|
||||||
.setLeaderEpoch(0)
|
|
||||||
.setIsr(partition0Replicas)
|
|
||||||
.setPartitionEpoch(0)
|
|
||||||
.setReplicas(partition0Replicas)
|
|
||||||
.setIsNew(true)
|
|
||||||
).asJava,
|
|
||||||
topicIds,
|
|
||||||
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
|
|
||||||
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
|
|
||||||
|
|
||||||
val mockLog = replicaManager.getPartitionOrException(tp0).log.get
|
val mockLog = replicaManager.getPartitionOrException(tp0).log.get
|
||||||
when(mockLog.endOffsetForEpoch(anyInt())).thenReturn(Optional.of(new OffsetAndEpoch(1, 1)))
|
when(mockLog.endOffsetForEpoch(anyInt())).thenReturn(Optional.of(new OffsetAndEpoch(1, 1)))
|
||||||
|
|
Loading…
Reference in New Issue