KAFKA-18486; [1/2] Update LocalLeaderEndPointTest (#18666)

This patch is a first step towards removing `ReplicaManager#becomeLeaderOrFollower`. It updates the `LocalLeaderEndPointTest` tests.

Reviewers: Christo Lolov <lolovc@amazon.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
David Jacot 2025-01-23 10:49:16 +01:00 committed by GitHub
parent 32767599c0
commit bc807083fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 71 additions and 55 deletions

View File

@ -20,38 +20,39 @@ package kafka.server
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{CoreUtils, Logging, TestUtils} import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch} import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.util.{MockScheduler, MockTime} import org.apache.kafka.server.util.{MockScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel} import org.apache.kafka.storage.internals.log.{AppendOrigin, LogDirFailureChannel}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito.mock import org.mockito.Mockito.mock
import java.io.File import java.io.File
import java.util.Collections import scala.collection.Map
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class LocalLeaderEndPointTest extends Logging { class LocalLeaderEndPointTest extends Logging {
val time = new MockTime val time = new MockTime
val topicId: Uuid = Uuid.randomUuid() val topicId = Uuid.randomUuid()
val topic = "test" val topic = "test"
val topicPartition = new TopicPartition(topic, 5) val partition = 5
val topicPartition = new TopicPartition(topic, partition)
val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092) val sourceBroker: BrokerEndPoint = new BrokerEndPoint(0, "localhost", 9092)
var replicaManager: ReplicaManager = _ var replicaManager: ReplicaManager = _
var endPoint: LeaderEndPoint = _ var endPoint: LeaderEndPoint = _
var quotaManager: QuotaManagers = _ var quotaManager: QuotaManagers = _
var image: MetadataImage = _
@BeforeEach @BeforeEach
def setUp(): Unit = { def setUp(): Unit = {
@ -70,16 +71,35 @@ class LocalLeaderEndPointTest extends Logging {
quotaManagers = quotaManager, quotaManagers = quotaManager,
metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager) alterPartitionManager = alterPartitionManager
val partition = replicaManager.createPartition(topicPartition) )
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) val delta = new MetadataDelta(MetadataImage.EMPTY)
// Make this replica the leader. delta.replay(new TopicRecord()
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 0) .setName(topic)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) .setTopicId(topicId)
)
delta.replay(new PartitionRecord()
.setPartitionId(partition)
.setTopicId(topicId)
.setReplicas(List[Integer](sourceBroker.id).asJava)
.setIsr(List[Integer](sourceBroker.id).asJava)
.setLeader(sourceBroker.id)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
image = delta.apply(MetadataProvenance.EMPTY)
replicaManager.applyDelta(delta.topicsDelta(), image)
replicaManager.getPartitionOrException(topicPartition) replicaManager.getPartitionOrException(topicPartition)
.localLogOrException .localLogOrException
endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager, QuotaFactory.UNBOUNDED_QUOTA) endPoint = new LocalLeaderEndPoint(
sourceBroker,
config,
replicaManager,
QuotaFactory.UNBOUNDED_QUOTA
)
} }
@AfterEach @AfterEach
@ -93,11 +113,10 @@ class LocalLeaderEndPointTest extends Logging {
appendRecords(replicaManager, topicPartition, records) appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0)) assertEquals(new OffsetAndEpoch(3L, 0), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) bumpLeaderEpoch()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records) appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(6L, 4), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7)) assertEquals(new OffsetAndEpoch(6L, 1), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
} }
@Test @Test
@ -106,12 +125,11 @@ class LocalLeaderEndPointTest extends Logging {
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) bumpLeaderEpoch()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records) appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ()) replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ())
assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) assertEquals(new OffsetAndEpoch(3L, 1), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
} }
@Test @Test
@ -120,13 +138,12 @@ class LocalLeaderEndPointTest extends Logging {
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) bumpLeaderEpoch()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
appendRecords(replicaManager, topicPartition, records) appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3) replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3)
assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7)) assertEquals(new OffsetAndEpoch(0L, 0), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
assertEquals(new OffsetAndEpoch(3L, 4), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7)) assertEquals(new OffsetAndEpoch(3L, 1), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
} }
@Test @Test
@ -137,42 +154,49 @@ class LocalLeaderEndPointTest extends Logging {
var result = endPoint.fetchEpochEndOffsets(Map( var result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition() topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setLeaderEpoch(0))) .setLeaderEpoch(0)
))
var expected = Map( var expected = Map(
topicPartition -> new EpochEndOffset() topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code) .setErrorCode(Errors.NONE.code)
.setLeaderEpoch(0) .setLeaderEpoch(0)
.setEndOffset(3L)) .setEndOffset(3L)
)
assertEquals(expected, result) assertEquals(expected, result)
// Change leader epoch and end offset, and verify the behavior again. // Change leader epoch and end offset, and verify the behavior again.
val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4) bumpLeaderEpoch()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) bumpLeaderEpoch()
assertEquals(2, replicaManager.getPartitionOrException(topicPartition).getLeaderEpoch)
appendRecords(replicaManager, topicPartition, records) appendRecords(replicaManager, topicPartition, records)
.onFire(response => assertEquals(Errors.NONE, response.error)) .onFire(response => assertEquals(Errors.NONE, response.error))
result = endPoint.fetchEpochEndOffsets(Map( result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition() topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setLeaderEpoch(4))) .setLeaderEpoch(2)
))
expected = Map( expected = Map(
topicPartition -> new EpochEndOffset() topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code) .setErrorCode(Errors.NONE.code)
.setLeaderEpoch(4) .setLeaderEpoch(2)
.setEndOffset(6L)) .setEndOffset(6L)
)
assertEquals(expected, result) assertEquals(expected, result)
// Check missing epoch: 3, we expect the API to return (leader_epoch=0, end_offset=3). // Check missing epoch: 1, we expect the API to return (leader_epoch=0, end_offset=3).
result = endPoint.fetchEpochEndOffsets(Map( result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition() topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setLeaderEpoch(3))) .setLeaderEpoch(1)
))
expected = Map( expected = Map(
topicPartition -> new EpochEndOffset() topicPartition -> new EpochEndOffset()
@ -187,14 +211,16 @@ class LocalLeaderEndPointTest extends Logging {
result = endPoint.fetchEpochEndOffsets(Map( result = endPoint.fetchEpochEndOffsets(Map(
topicPartition -> new OffsetForLeaderPartition() topicPartition -> new OffsetForLeaderPartition()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setLeaderEpoch(5))) .setLeaderEpoch(5)
))
expected = Map( expected = Map(
topicPartition -> new EpochEndOffset() topicPartition -> new EpochEndOffset()
.setPartition(topicPartition.partition) .setPartition(topicPartition.partition)
.setErrorCode(Errors.NONE.code) .setErrorCode(Errors.NONE.code)
.setLeaderEpoch(-1) .setLeaderEpoch(-1)
.setEndOffset(-1L)) .setEndOffset(-1L)
)
assertEquals(expected, result) assertEquals(expected, result)
} }
@ -219,22 +245,16 @@ class LocalLeaderEndPointTest extends Logging {
} }
} }
private def buildLeaderAndIsrRequest(leaderEpoch: Int): LeaderAndIsrRequest = { private def bumpLeaderEpoch(): Unit = {
val brokerList = Seq[Integer](sourceBroker.id).asJava val delta = new MetadataDelta(image)
val topicIds = Collections.singletonMap(topic, topicId) delta.replay(new PartitionChangeRecord()
new LeaderAndIsrRequest.Builder(0, 0, 0, .setTopicId(topicId)
Seq(new LeaderAndIsrRequest.PartitionState() .setPartitionId(partition)
.setTopicName(topic) .setLeader(sourceBroker.id)
.setPartitionIndex(topicPartition.partition()) )
.setControllerEpoch(0)
.setLeader(sourceBroker.id) image = delta.apply(MetadataProvenance.EMPTY)
.setLeaderEpoch(leaderEpoch) replicaManager.applyDelta(delta.topicsDelta, image)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds,
Set(node(sourceBroker)).asJava).build()
} }
private def appendRecords(replicaManager: ReplicaManager, private def appendRecords(replicaManager: ReplicaManager,
@ -260,10 +280,6 @@ class LocalLeaderEndPointTest extends Logging {
result result
} }
private def node(endPoint: BrokerEndPoint): Node = {
new Node(endPoint.id, endPoint.host, endPoint.port)
}
private def records: MemoryRecords = { private def records: MemoryRecords = {
MemoryRecords.withRecords(Compression.NONE, MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("first message".getBytes()), new SimpleRecord("first message".getBytes()),