KAFKA-19302 Move ReplicaState and Replica to server module (#19755)
CI / build (push) Waiting to run Details

1. Move `ReplicaState` and `Replica` to server module.
2. Rewrite `ReplicaState` and `Replica` in Java.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-05-19 10:59:12 -05:00 committed by GitHub
parent 6573b4ace1
commit cff10e6541
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 636 additions and 596 deletions

View File

@ -44,6 +44,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.replica.Replica
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
@ -795,10 +796,10 @@ class Partition(val topicPartition: TopicPartition,
// lastFetchLeaderLogEndOffset.
remoteReplicas.foreach { replica =>
replica.resetReplicaState(
currentTimeMs = currentTimeMs,
leaderEndOffset = leaderEpochStartOffset,
isNewLeader = isNewLeader,
isFollowerInSync = partitionState.isr.contains(replica.brokerId)
currentTimeMs,
leaderEpochStartOffset,
isNewLeader,
partitionState.isr.contains(replica.brokerId)
)
}
@ -1072,9 +1073,9 @@ class Partition(val topicPartition: TopicPartition,
isBrokerEpochIsrEligible(storedBrokerEpoch, cachedBrokerEpoch)
}
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Option[Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
storedBrokerEpoch.isDefined && cachedBrokerEpoch.isPresent() &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get())
private def isBrokerEpochIsrEligible(storedBrokerEpoch: Optional[java.lang.Long], cachedBrokerEpoch: Optional[java.lang.Long]): Boolean = {
storedBrokerEpoch.isPresent && cachedBrokerEpoch.isPresent &&
(storedBrokerEpoch.get == -1 || storedBrokerEpoch.get == cachedBrokerEpoch.get)
}
/*
@ -1802,7 +1803,7 @@ class Partition(val topicPartition: TopicPartition,
brokerState.setBrokerEpoch(localBrokerEpochSupplier())
} else {
val replica = remoteReplicasMap.get(brokerId)
val brokerEpoch = if (replica == null) Option.empty else replica.stateSnapshot.brokerEpoch
val brokerEpoch = if (replica == null) Optional.empty else replica.stateSnapshot.brokerEpoch
if (brokerEpoch.isEmpty) {
// There are two cases where the broker epoch can be missing:
// 1. During ISR expansion, we already held lock for the partition and did the broker epoch check, so the new

View File

@ -1,207 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
import java.util.concurrent.atomic.AtomicReference
case class ReplicaState(
// The log start offset value, kept in all replicas; for local replica it is the
// log's start offset, for remote replicas its value is only updated by follower fetch.
logStartOffset: Long,
// The log end offset value, kept in all replicas; for local replica it is the
// log's end offset, for remote replicas its value is only updated by follower fetch.
logEndOffsetMetadata: LogOffsetMetadata,
// The log end offset value at the time the leader received the last FetchRequest from this follower.
// This is used to determine the lastCaughtUpTimeMs of the follower. It is reset by the leader
// when a LeaderAndIsr request is received and might be reset when the leader appends a record
// to its log.
lastFetchLeaderLogEndOffset: Long,
// The time when the leader received the last FetchRequest from this follower.
// This is used to determine the lastCaughtUpTimeMs of the follower.
lastFetchTimeMs: Long,
// lastCaughtUpTimeMs is the largest time t such that the offset of most recent FetchRequest from this follower >=
// the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
lastCaughtUpTimeMs: Long,
// The brokerEpoch is the epoch from the Fetch request.
brokerEpoch: Option[Long]
) {
/**
* Returns the current log end offset of the replica.
*/
def logEndOffset: Long = logEndOffsetMetadata.messageOffset
/**
* Returns true when the replica is considered as "caught-up". A replica is
* considered "caught-up" when its log end offset is equals to the log end
* offset of the leader OR when its last caught up time minus the current
* time is smaller than the max replica lag.
*/
def isCaughtUp(
leaderEndOffset: Long,
currentTimeMs: Long,
replicaMaxLagMs: Long
): Boolean = {
leaderEndOffset == logEndOffset || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs
}
}
object ReplicaState {
val Empty: ReplicaState = ReplicaState(
logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchLeaderLogEndOffset = 0L,
lastFetchTimeMs = 0L,
lastCaughtUpTimeMs = 0L,
brokerEpoch = None : Option[Long],
)
}
class Replica(val brokerId: Int, val topicPartition: TopicPartition, val metadataCache: MetadataCache) extends Logging {
private val replicaState = new AtomicReference[ReplicaState](ReplicaState.Empty)
def stateSnapshot: ReplicaState = replicaState.get
/**
* Update the replica's fetch state only if the broker epoch is -1 or it is larger or equal to the current broker
* epoch. Otherwise, NOT_LEADER_OR_FOLLOWER exception will be thrown. This can fence fetch state update from a
* stale request.
*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
*
* Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
* set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
*
* This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO
* by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its
* fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at
* high frequency.
*/
def updateFetchStateOrThrow(
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
brokerEpoch: Long
): Unit = {
replicaState.updateAndGet { currentReplicaState =>
val cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId)
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.filter(_ > brokerEpoch).isPresent()) {
throw new NotLeaderOrFollowerException(s"Received stale fetch state update. broker epoch=$brokerEpoch " +
s"vs expected=${currentReplicaState.brokerEpoch.get}")
}
val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
} else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, currentReplicaState.lastFetchTimeMs)
} else {
currentReplicaState.lastCaughtUpTimeMs
}
ReplicaState(
logStartOffset = followerStartOffset,
logEndOffsetMetadata = followerFetchOffsetMetadata,
lastFetchLeaderLogEndOffset = math.max(leaderEndOffset, currentReplicaState.lastFetchLeaderLogEndOffset),
lastFetchTimeMs = followerFetchTimeMs,
lastCaughtUpTimeMs = lastCaughtUpTime,
brokerEpoch = Option(brokerEpoch)
)
}
}
/**
* When the leader is elected or re-elected, the state of the follower is reinitialized
* accordingly.
*/
def resetReplicaState(
currentTimeMs: Long,
leaderEndOffset: Long,
isNewLeader: Boolean,
isFollowerInSync: Boolean
): Unit = {
replicaState.updateAndGet { currentReplicaState =>
// When the leader is elected or re-elected, the follower's last caught up time
// is set to the current time if the follower is in the ISR, else to 0. The latter
// is done to ensure that the high watermark is not hold back unnecessarily for
// a follower which is not in the ISR anymore.
val lastCaughtUpTimeMs = if (isFollowerInSync) currentTimeMs else 0L
if (isNewLeader) {
ReplicaState(
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffsetMetadata = LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
lastCaughtUpTimeMs = lastCaughtUpTimeMs,
brokerEpoch = Option.empty
)
} else {
ReplicaState(
logStartOffset = currentReplicaState.logStartOffset,
logEndOffsetMetadata = currentReplicaState.logEndOffsetMetadata,
lastFetchLeaderLogEndOffset = leaderEndOffset,
// When the leader is re-elected, the follower's last fetch time is
// set to the current time if the follower is in the ISR, else to 0.
// The latter is done to ensure that the follower is not brought back
// into the ISR before a fetch is received.
lastFetchTimeMs = if (isFollowerInSync) currentTimeMs else 0L,
lastCaughtUpTimeMs = lastCaughtUpTimeMs,
brokerEpoch = currentReplicaState.brokerEpoch
)
}
}
trace(s"Reset state of replica to $this")
}
override def toString: String = {
val replicaState = this.replicaState.get
val replicaString = new StringBuilder
replicaString.append(s"Replica(replicaId=$brokerId")
replicaString.append(s", topic=${topicPartition.topic}")
replicaString.append(s", partition=${topicPartition.partition}")
replicaString.append(s", lastCaughtUpTimeMs=${replicaState.lastCaughtUpTimeMs}")
replicaString.append(s", logStartOffset=${replicaState.logStartOffset}")
replicaString.append(s", logEndOffset=${replicaState.logEndOffsetMetadata.messageOffset}")
replicaString.append(s", logEndOffsetMetadata=${replicaState.logEndOffsetMetadata}")
replicaString.append(s", lastFetchLeaderLogEndOffset=${replicaState.lastFetchLeaderLogEndOffset}")
replicaString.append(s", brokerEpoch=${replicaState.brokerEpoch.getOrElse(-2L)}")
replicaString.append(s", lastFetchTimeMs=${replicaState.lastFetchTimeMs}")
replicaString.append(")")
replicaString.toString
}
override def equals(that: Any): Boolean = that match {
case other: Replica => brokerId == other.brokerId && topicPartition == other.topicPartition
case _ => false
}
override def hashCode: Int = 31 + topicPartition.hashCode + 17 * brokerId
}

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.replica.Replica
import org.apache.kafka.metadata.LeaderRecoveryState
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

View File

@ -1,350 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.cluster
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.Mockito.{mock, when}
import java.util.Optional
object ReplicaTest {
val BrokerId: Int = 0
val Partition: TopicPartition = new TopicPartition("foo", 0)
val ReplicaLagTimeMaxMs: Long = 30000
}
class ReplicaTest {
import ReplicaTest._
val time = new MockTime()
var replica: Replica = _
@BeforeEach
def setup(): Unit = {
val metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Optional.of(1L))
replica = new Replica(BrokerId, Partition, metadataCache)
}
private def assertReplicaState(
logStartOffset: Long,
logEndOffset: Long,
lastCaughtUpTimeMs: Long,
lastFetchLeaderLogEndOffset: Long,
lastFetchTimeMs: Long,
brokerEpoch: Option[Long] = Option[Long](1L)
): Unit = {
val replicaState = replica.stateSnapshot
assertEquals(logStartOffset, replicaState.logStartOffset,
"Unexpected Log Start Offset")
assertEquals(logEndOffset, replicaState.logEndOffset,
"Unexpected Log End Offset")
assertEquals(lastCaughtUpTimeMs, replicaState.lastCaughtUpTimeMs,
"Unexpected Last Caught Up Time")
assertEquals(lastFetchLeaderLogEndOffset, replicaState.lastFetchLeaderLogEndOffset,
"Unexpected Last Fetch Leader Log End Offset")
assertEquals(lastFetchTimeMs, replicaState.lastFetchTimeMs,
"Unexpected Last Fetch Time")
assertEquals(brokerEpoch, replicaState.brokerEpoch,
"Broker Epoch Mismatch")
}
def assertReplicaStateDoesNotChange(
op: => Unit
): Unit = {
val previousState = replica.stateSnapshot
op
assertReplicaState(
logStartOffset = previousState.logStartOffset,
logEndOffset = previousState.logEndOffset,
lastCaughtUpTimeMs = previousState.lastCaughtUpTimeMs,
lastFetchLeaderLogEndOffset = previousState.lastFetchLeaderLogEndOffset,
lastFetchTimeMs = previousState.lastFetchTimeMs
)
}
private def updateFetchState(
followerFetchOffset: Long,
followerStartOffset: Long,
leaderEndOffset: Long
): Long = {
val currentTimeMs = time.milliseconds()
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(followerFetchOffset),
followerStartOffset = followerStartOffset,
followerFetchTimeMs = currentTimeMs,
leaderEndOffset = leaderEndOffset,
brokerEpoch = 1L
)
currentTimeMs
}
private def resetReplicaState(
leaderEndOffset: Long,
isNewLeader: Boolean,
isFollowerInSync: Boolean
): Long = {
val currentTimeMs = time.milliseconds()
replica.resetReplicaState(
currentTimeMs = currentTimeMs,
leaderEndOffset = leaderEndOffset,
isNewLeader = isNewLeader,
isFollowerInSync = isFollowerInSync
)
currentTimeMs
}
private def isCaughtUp(
leaderEndOffset: Long
): Boolean = {
replica.stateSnapshot.isCaughtUp(
leaderEndOffset = leaderEndOffset,
currentTimeMs = time.milliseconds(),
replicaMaxLagMs = ReplicaLagTimeMaxMs
)
}
@Test
def testInitialState(): Unit = {
assertReplicaState(
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = 0L,
lastFetchTimeMs = 0L,
brokerEpoch = Option.empty
)
}
@Test
def testUpdateFetchState(): Unit = {
val fetchTimeMs1 = updateFetchState(
followerFetchOffset = 5L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
assertReplicaState(
logStartOffset = 1L,
logEndOffset = 5L,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = 10L,
lastFetchTimeMs = fetchTimeMs1
)
val fetchTimeMs2 = updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 2L,
leaderEndOffset = 15L
)
assertReplicaState(
logStartOffset = 2L,
logEndOffset = 10L,
lastCaughtUpTimeMs = fetchTimeMs1,
lastFetchLeaderLogEndOffset = 15L,
lastFetchTimeMs = fetchTimeMs2
)
val fetchTimeMs3 = updateFetchState(
followerFetchOffset = 15L,
followerStartOffset = 3L,
leaderEndOffset = 15L
)
assertReplicaState(
logStartOffset = 3L,
logEndOffset = 15L,
lastCaughtUpTimeMs = fetchTimeMs3,
lastFetchLeaderLogEndOffset = 15L,
lastFetchTimeMs = fetchTimeMs3
)
}
@Test
def testResetReplicaStateWhenLeaderIsReelectedAndReplicaIsInSync(): Unit = {
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
val resetTimeMs1 = resetReplicaState(
leaderEndOffset = 11L,
isNewLeader = false,
isFollowerInSync = true
)
assertReplicaState(
logStartOffset = 1L,
logEndOffset = 10L,
lastCaughtUpTimeMs = resetTimeMs1,
lastFetchLeaderLogEndOffset = 11L,
lastFetchTimeMs = resetTimeMs1
)
}
@Test
def testResetReplicaStateWhenLeaderIsReelectedAndReplicaIsNotInSync(): Unit = {
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
resetReplicaState(
leaderEndOffset = 11L,
isNewLeader = false,
isFollowerInSync = false
)
assertReplicaState(
logStartOffset = 1L,
logEndOffset = 10L,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = 11L,
lastFetchTimeMs = 0L
)
}
@Test
def testResetReplicaStateWhenNewLeaderIsElectedAndReplicaIsInSync(): Unit = {
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
val resetTimeMs1 = resetReplicaState(
leaderEndOffset = 11L,
isNewLeader = true,
isFollowerInSync = true
)
assertReplicaState(
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = resetTimeMs1,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
brokerEpoch = Option.empty
)
}
@Test
def testResetReplicaStateWhenNewLeaderIsElectedAndReplicaIsNotInSync(): Unit = {
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
resetReplicaState(
leaderEndOffset = 11L,
isNewLeader = true,
isFollowerInSync = false
)
assertReplicaState(
logStartOffset = UnifiedLog.UNKNOWN_OFFSET,
logEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastCaughtUpTimeMs = 0L,
lastFetchLeaderLogEndOffset = UnifiedLog.UNKNOWN_OFFSET,
lastFetchTimeMs = 0L,
brokerEpoch = Option.empty
)
}
@Test
def testIsCaughtUpWhenReplicaIsCaughtUpToLogEnd(): Unit = {
assertFalse(isCaughtUp(leaderEndOffset = 10L))
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
assertTrue(isCaughtUp(leaderEndOffset = 10L))
time.sleep(ReplicaLagTimeMaxMs + 1)
assertTrue(isCaughtUp(leaderEndOffset = 10L))
}
@Test
def testIsCaughtUpWhenReplicaIsNotCaughtUpToLogEnd(): Unit = {
assertFalse(isCaughtUp(leaderEndOffset = 10L))
updateFetchState(
followerFetchOffset = 5L,
followerStartOffset = 1L,
leaderEndOffset = 10L
)
assertFalse(isCaughtUp(leaderEndOffset = 10L))
updateFetchState(
followerFetchOffset = 10L,
followerStartOffset = 1L,
leaderEndOffset = 15L
)
assertTrue(isCaughtUp(leaderEndOffset = 16L))
time.sleep(ReplicaLagTimeMaxMs + 1)
assertFalse(isCaughtUp(leaderEndOffset = 16L))
}
@Test
def testFenceStaleUpdates(): Unit = {
val metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.getAliveBrokerEpoch(BrokerId)).thenReturn(Optional.of(2L))
val replica = new Replica(BrokerId, Partition, metadataCache)
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 1L,
followerFetchTimeMs = 1,
leaderEndOffset = 10L,
brokerEpoch = 2L
)
assertThrows(classOf[NotLeaderOrFollowerException], () => replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
followerFetchTimeMs = 3,
leaderEndOffset = 10L,
brokerEpoch = 1L
))
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(5L),
followerStartOffset = 2L,
followerFetchTimeMs = 4,
leaderEndOffset = 10L,
brokerEpoch = -1L
)
}
}

View File

@ -100,11 +100,11 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset - 1
for (replica <- partition0.remoteReplicas)
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset,
brokerEpoch = 1L)
new LogOffsetMetadata(leaderLogEndOffset - 1),
0L,
time.milliseconds,
leaderLogEndOffset,
1L)
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals(Set.empty[Int], partition0OSR, "No replica should be out of sync")
@ -150,11 +150,11 @@ class IsrExpirationTest {
// Make the remote replica not read to the end of log. It should be not be out of sync for at least 100 ms
for (replica <- partition0.remoteReplicas)
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 2),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset,
brokerEpoch = 1L)
new LogOffsetMetadata(leaderLogEndOffset - 2),
0L,
time.milliseconds,
leaderLogEndOffset,
1L)
// Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of the log.
// The replicas will no longer be in ISR. We do 2 fetches because we want to simulate the case where the replica is lagging but is not stuck
@ -165,11 +165,11 @@ class IsrExpirationTest {
partition0.remoteReplicas.foreach { r =>
r.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset - 1),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset,
brokerEpoch = 1L)
new LogOffsetMetadata(leaderLogEndOffset - 1),
0L,
time.milliseconds,
leaderLogEndOffset,
1L)
}
partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals(Set.empty[Int], partition0OSR, "No replica should be out of sync")
@ -183,11 +183,11 @@ class IsrExpirationTest {
// Now actually make a fetch to the end of the log. The replicas should be back in ISR
partition0.remoteReplicas.foreach { r =>
r.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset,
brokerEpoch = 1L)
new LogOffsetMetadata(leaderLogEndOffset),
0L,
time.milliseconds,
leaderLogEndOffset,
1L)
}
partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals(Set.empty[Int], partition0OSR, "No replica should be out of sync")
@ -208,11 +208,11 @@ class IsrExpirationTest {
// let the follower catch up to the Leader logEndOffset
for (replica <- partition0.remoteReplicas)
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(leaderLogEndOffset),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = leaderLogEndOffset,
brokerEpoch = 1L)
new LogOffsetMetadata(leaderLogEndOffset),
0L,
time.milliseconds,
leaderLogEndOffset,
1L)
var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaLagTimeMaxMs)
assertEquals(Set.empty[Int], partition0OSR, "No replica should be out of sync")
@ -245,11 +245,11 @@ class IsrExpirationTest {
// set lastCaughtUpTime to current time
for (replica <- partition.remoteReplicas)
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata = new LogOffsetMetadata(0L),
followerStartOffset = 0L,
followerFetchTimeMs= time.milliseconds,
leaderEndOffset = 0L,
brokerEpoch = 1L)
new LogOffsetMetadata(0L),
0L,
time.milliseconds,
0L,
1L)
// set the leader and its hw and the hw update time
partition.leaderReplicaIdOpt = Some(leaderId)

View File

@ -20,7 +20,6 @@ package org.apache.kafka.jmh.partition;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.DelayedOperations;
import kafka.cluster.Partition;
import kafka.cluster.Replica;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.builders.LogManagerBuilder;
@ -31,6 +30,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.replica.Replica;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
import org.apache.kafka.storage.internals.log.CleanerConfig;

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.replica;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class Replica {
private static final Logger LOGGER = LoggerFactory.getLogger(Replica.class);
private final int brokerId;
private final TopicPartition topicPartition;
private final MetadataCache metadataCache;
private final AtomicReference<ReplicaState> replicaState;
public Replica(int brokerId, TopicPartition topicPartition, MetadataCache metadataCache) {
this.brokerId = brokerId;
this.topicPartition = topicPartition;
this.metadataCache = metadataCache;
this.replicaState = new AtomicReference<>(ReplicaState.EMPTY);
}
public ReplicaState stateSnapshot() {
return replicaState.get();
}
public int brokerId() {
return brokerId;
}
/**
* Update the replica's fetch state only if the broker epoch is -1 or it is larger or equal to the current broker
* epoch. Otherwise, NOT_LEADER_OR_FOLLOWER exception will be thrown. This can fence fetch state update from a
* stale request.
*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
*
* Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
* set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
*
* This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO
* by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its
* fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at
* high frequency.
*/
public void updateFetchStateOrThrow(
LogOffsetMetadata followerFetchOffsetMetadata,
long followerStartOffset,
long followerFetchTimeMs,
long leaderEndOffset,
long brokerEpoch
) {
replicaState.updateAndGet(currentReplicaState -> {
var cachedBrokerEpoch = metadataCache.getAliveBrokerEpoch(brokerId);
// Fence the update if it provides a stale broker epoch.
if (brokerEpoch != -1 && cachedBrokerEpoch.filter(e -> e > brokerEpoch).isPresent()) {
throw new NotLeaderOrFollowerException("Received stale fetch state update. broker epoch=" + brokerEpoch +
" vs expected=" + currentReplicaState.brokerEpoch());
}
long lastCaughtUpTime;
if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
lastCaughtUpTime = Math.max(currentReplicaState.lastCaughtUpTimeMs(), followerFetchTimeMs);
} else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset()) {
lastCaughtUpTime = Math.max(currentReplicaState.lastCaughtUpTimeMs(), currentReplicaState.lastFetchTimeMs());
} else {
lastCaughtUpTime = currentReplicaState.lastCaughtUpTimeMs();
}
return new ReplicaState(
followerStartOffset,
followerFetchOffsetMetadata,
Math.max(leaderEndOffset, currentReplicaState.lastFetchLeaderLogEndOffset()),
followerFetchTimeMs,
lastCaughtUpTime,
Optional.of(brokerEpoch)
);
});
}
/**
* When the leader is elected or re-elected, the state of the follower is reinitialized
* accordingly.
*/
public void resetReplicaState(
long currentTimeMs,
long leaderEndOffset,
boolean isNewLeader,
boolean isFollowerInSync
) {
replicaState.updateAndGet(currentReplicaState -> {
// When the leader is elected or re-elected, the follower's last caught up time
// is set to the current time if the follower is in the ISR, else to 0. The latter
// is done to ensure that the high watermark is not hold back unnecessarily for
// a follower which is not in the ISR anymore.
long lastCaughtUpTimeMs = isFollowerInSync ? currentTimeMs : 0L;
if (isNewLeader) {
return new ReplicaState(
UnifiedLog.UNKNOWN_OFFSET,
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
UnifiedLog.UNKNOWN_OFFSET,
0L,
lastCaughtUpTimeMs,
Optional.empty()
);
} else {
return new ReplicaState(
currentReplicaState.logStartOffset(),
currentReplicaState.logEndOffsetMetadata(),
leaderEndOffset,
// When the leader is re-elected, the follower's last fetch time is
// set to the current time if the follower is in the ISR, else to 0.
// The latter is done to ensure that the follower is not brought back
// into the ISR before a fetch is received.
isFollowerInSync ? currentTimeMs : 0L,
lastCaughtUpTimeMs,
currentReplicaState.brokerEpoch()
);
}
});
LOGGER.trace("Reset state of replica to {}", this);
}
@Override
public String toString() {
ReplicaState replicaState = this.replicaState.get();
return "Replica(replicaId=" + brokerId +
", topic=" + topicPartition.topic() +
", partition=" + topicPartition.partition() +
", lastCaughtUpTimeMs=" + replicaState.lastCaughtUpTimeMs() +
", logStartOffset=" + replicaState.logStartOffset() +
", logEndOffset=" + replicaState.logEndOffsetMetadata().messageOffset +
", logEndOffsetMetadata=" + replicaState.logEndOffsetMetadata() +
", lastFetchLeaderLogEndOffset=" + replicaState.lastFetchLeaderLogEndOffset() +
", brokerEpoch=" + replicaState.brokerEpoch().orElse(-2L) +
", lastFetchTimeMs=" + replicaState.lastFetchTimeMs() +
")";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Replica other = (Replica) o;
return brokerId == other.brokerId && topicPartition.equals(other.topicPartition);
}
@Override
public int hashCode() {
return 31 + topicPartition.hashCode() + 17 * brokerId;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.replica;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import java.util.Optional;
/**
* @param logStartOffset The log start offset value, kept in all replicas; for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch.
* @param logEndOffsetMetadata The log end offset value, kept in all replicas; for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch.
* @param lastFetchLeaderLogEndOffset The log end offset value at the time the leader received the last FetchRequest from this follower. This is used to determine the lastCaughtUpTimeMs of the follower. It is reset by the leader when a LeaderAndIsr request is received and might be reset when the leader appends a record to its log.
* @param lastFetchTimeMs The time when the leader received the last FetchRequest from this follower. This is used to determine the lastCaughtUpTimeMs of the follower.
* @param lastCaughtUpTimeMs lastCaughtUpTimeMs is the largest time t such that the offset of most recent FetchRequest from this follower >= the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
* @param brokerEpoch The brokerEpoch is the epoch from the Fetch request.
*/
public record ReplicaState(
long logStartOffset,
LogOffsetMetadata logEndOffsetMetadata,
long lastFetchLeaderLogEndOffset,
long lastFetchTimeMs,
long lastCaughtUpTimeMs,
Optional<Long> brokerEpoch
) {
public static final ReplicaState EMPTY = new ReplicaState(
UnifiedLog.UNKNOWN_OFFSET,
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
0L,
0L,
0L,
Optional.empty()
);
/**
* Returns the current log end offset of the replica.
*/
public long logEndOffset() {
return logEndOffsetMetadata.messageOffset;
}
/**
* Returns true when the replica is considered as "caught-up". A replica is
* considered "caught-up" when its log end offset is equals to the log end
* offset of the leader OR when its last caught up time minus the current
* time is smaller than the max replica lag.
*/
public boolean isCaughtUp(
long leaderEndOffset,
long currentTimeMs,
long replicaMaxLagMs) {
return leaderEndOffset == logEndOffset() || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs;
}
}

View File

@ -0,0 +1,351 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.replica;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.UnifiedLog;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ReplicaTest {
private static final int BROKER_ID = 0;
private static final TopicPartition PARTITION = new TopicPartition("foo", 0);
private static final long REPLICA_LAG_TIME_MAX_MS = 30000L;
private MockTime time;
private Replica replica;
@BeforeEach
public void setup() {
time = new MockTime();
MetadataCache metadataCache = mock(MetadataCache.class);
when(metadataCache.getAliveBrokerEpoch(BROKER_ID)).thenReturn(Optional.of(1L));
replica = new Replica(BROKER_ID, PARTITION, metadataCache);
}
private void assertReplicaState(
long logStartOffset,
long logEndOffset,
long lastCaughtUpTimeMs,
long lastFetchLeaderLogEndOffset,
long lastFetchTimeMs,
Optional<Long> brokerEpoch
) {
ReplicaState replicaState = replica.stateSnapshot();
assertEquals(logStartOffset, replicaState.logStartOffset(),
"Unexpected Log Start Offset");
assertEquals(logEndOffset, replicaState.logEndOffset(),
"Unexpected Log End Offset");
assertEquals(lastCaughtUpTimeMs, replicaState.lastCaughtUpTimeMs(),
"Unexpected Last Caught Up Time");
assertEquals(lastFetchLeaderLogEndOffset, replicaState.lastFetchLeaderLogEndOffset(),
"Unexpected Last Fetch Leader Log End Offset");
assertEquals(lastFetchTimeMs, replicaState.lastFetchTimeMs(),
"Unexpected Last Fetch Time");
assertEquals(brokerEpoch, replicaState.brokerEpoch(),
"Broker Epoch Mismatch");
}
private void assertReplicaState(
long logStartOffset,
long logEndOffset,
long lastCaughtUpTimeMs,
long lastFetchLeaderLogEndOffset,
long lastFetchTimeMs
) {
assertReplicaState(logStartOffset, logEndOffset, lastCaughtUpTimeMs, lastFetchLeaderLogEndOffset,
lastFetchTimeMs, Optional.of(1L));
}
private long updateFetchState(
long followerFetchOffset,
long followerStartOffset,
long leaderEndOffset
) {
long currentTimeMs = time.milliseconds();
replica.updateFetchStateOrThrow(
new LogOffsetMetadata(followerFetchOffset),
followerStartOffset,
currentTimeMs,
leaderEndOffset,
1L
);
return currentTimeMs;
}
private long resetReplicaState(
long leaderEndOffset,
boolean isNewLeader,
boolean isFollowerInSync
) {
long currentTimeMs = time.milliseconds();
replica.resetReplicaState(
currentTimeMs,
leaderEndOffset,
isNewLeader,
isFollowerInSync
);
return currentTimeMs;
}
private boolean isCaughtUp(long leaderEndOffset) {
return replica.stateSnapshot().isCaughtUp(
leaderEndOffset,
time.milliseconds(),
REPLICA_LAG_TIME_MAX_MS
);
}
@Test
public void testInitialState() {
assertReplicaState(
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
0L,
0L,
0L,
Optional.empty()
);
}
@Test
public void testUpdateFetchState() {
long fetchTimeMs1 = updateFetchState(
5L,
1L,
10L
);
assertReplicaState(
1L,
5L,
0L,
10L,
fetchTimeMs1
);
long fetchTimeMs2 = updateFetchState(
10L,
2L,
15L
);
assertReplicaState(
2L,
10L,
fetchTimeMs1,
15L,
fetchTimeMs2
);
long fetchTimeMs3 = updateFetchState(
15L,
3L,
15L
);
assertReplicaState(
3L,
15L,
fetchTimeMs3,
15L,
fetchTimeMs3
);
}
@Test
public void testResetReplicaStateWhenLeaderIsReelectedAndReplicaIsInSync() {
updateFetchState(
10L,
1L,
10L
);
long resetTimeMs1 = resetReplicaState(
11L,
false,
true
);
assertReplicaState(
1L,
10L,
resetTimeMs1,
11L,
resetTimeMs1
);
}
@Test
public void testResetReplicaStateWhenLeaderIsReelectedAndReplicaIsNotInSync() {
updateFetchState(
10L,
1L,
10L
);
resetReplicaState(
11L,
false,
false
);
assertReplicaState(
1L,
10L,
0L,
11L,
0L
);
}
@Test
public void testResetReplicaStateWhenNewLeaderIsElectedAndReplicaIsInSync() {
updateFetchState(
10L,
1L,
10L
);
long resetTimeMs1 = resetReplicaState(
11L,
true,
true
);
assertReplicaState(
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
resetTimeMs1,
UnifiedLog.UNKNOWN_OFFSET,
0L,
Optional.empty()
);
}
@Test
public void testResetReplicaStateWhenNewLeaderIsElectedAndReplicaIsNotInSync() {
updateFetchState(
10L,
1L,
10L
);
resetReplicaState(
11L,
true,
false
);
assertReplicaState(
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
0L,
UnifiedLog.UNKNOWN_OFFSET,
0L,
Optional.empty()
);
}
@Test
public void testIsCaughtUpWhenReplicaIsCaughtUpToLogEnd() {
assertFalse(isCaughtUp(10L));
updateFetchState(
10L,
1L,
10L
);
assertTrue(isCaughtUp(10L));
time.sleep(REPLICA_LAG_TIME_MAX_MS + 1);
assertTrue(isCaughtUp(10L));
}
@Test
public void testIsCaughtUpWhenReplicaIsNotCaughtUpToLogEnd() {
assertFalse(isCaughtUp(10L));
updateFetchState(
5L,
1L,
10L
);
assertFalse(isCaughtUp(10L));
updateFetchState(
10L,
1L,
15L
);
assertTrue(isCaughtUp(16L));
time.sleep(REPLICA_LAG_TIME_MAX_MS + 1);
assertFalse(isCaughtUp(16L));
}
@Test
public void testFenceStaleUpdates() {
MetadataCache metadataCache = mock(MetadataCache.class);
when(metadataCache.getAliveBrokerEpoch(BROKER_ID)).thenReturn(Optional.of(2L));
Replica replica = new Replica(BROKER_ID, PARTITION, metadataCache);
replica.updateFetchStateOrThrow(
new LogOffsetMetadata(5L),
1L,
1,
10L,
2L
);
assertThrows(NotLeaderOrFollowerException.class, () -> replica.updateFetchStateOrThrow(
new LogOffsetMetadata(5L),
2L,
3,
10L,
1L
));
replica.updateFetchStateOrThrow(
new LogOffsetMetadata(5L),
2L,
4,
10L,
-1L
);
}
}