mirror of https://github.com/apache/kafka.git
KAFKA-18080 Replace DelayedItem by Long type (#17927)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
48d60efe98
commit
cebec91470
|
|
@ -20,7 +20,7 @@ package kafka.server
|
|||
import com.yammer.metrics.core.Meter
|
||||
import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
|
||||
import kafka.utils.CoreUtils.inLock
|
||||
import kafka.utils.{DelayedItem, Logging, Pool}
|
||||
import kafka.utils.{Logging, Pool}
|
||||
import org.apache.kafka.common.errors._
|
||||
import org.apache.kafka.common.internals.PartitionStates
|
||||
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
|
||||
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.common.protocol.Errors
|
|||
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
|
||||
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
|
||||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.common.{ClientIdAndBroker, InvalidRecordException, TopicPartition, Uuid}
|
||||
import org.apache.kafka.server.common.OffsetAndEpoch
|
||||
import org.apache.kafka.server.metrics.KafkaMetricsGroup
|
||||
|
|
@ -789,7 +790,7 @@ abstract class AbstractFetcherThread(name: String,
|
|||
Option(partitionStates.stateValue(partition)).foreach { currentFetchState =>
|
||||
if (!currentFetchState.isDelayed) {
|
||||
partitionStates.updateAndMoveToEnd(partition, PartitionFetchState(currentFetchState.topicId, currentFetchState.fetchOffset,
|
||||
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(new DelayedItem(delay)),
|
||||
currentFetchState.lag, currentFetchState.currentLeaderEpoch, Some(delay),
|
||||
currentFetchState.state, currentFetchState.lastFetchedEpoch))
|
||||
}
|
||||
}
|
||||
|
|
@ -945,25 +946,27 @@ object PartitionFetchState {
|
|||
/**
|
||||
* case class to keep partition offset and its state(truncatingLog, delayed)
|
||||
* This represents a partition as being either:
|
||||
* (1) Truncating its log, for example having recently become a follower
|
||||
* (2) Delayed, for example due to an error, where we subsequently back off a bit
|
||||
* (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
|
||||
* (1) Truncating its log, for example, having recently become a follower
|
||||
* (2) Delayed, for example, due to an error, where we subsequently back off a bit
|
||||
* (3) ReadyForFetch, the active state where the thread is actively fetching data.
|
||||
*/
|
||||
case class PartitionFetchState(topicId: Option[Uuid],
|
||||
fetchOffset: Long,
|
||||
lag: Option[Long],
|
||||
currentLeaderEpoch: Int,
|
||||
delay: Option[DelayedItem],
|
||||
delay: Option[Long],
|
||||
state: ReplicaState,
|
||||
lastFetchedEpoch: Option[Int]) {
|
||||
|
||||
private val dueMs = delay.map(_ + Time.SYSTEM.milliseconds)
|
||||
|
||||
def isReadyForFetch: Boolean = state == Fetching && !isDelayed
|
||||
|
||||
def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
|
||||
|
||||
def isTruncating: Boolean = state == Truncating && !isDelayed
|
||||
|
||||
def isDelayed: Boolean = delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
|
||||
def isDelayed: Boolean = dueMs.exists(_ > Time.SYSTEM.milliseconds)
|
||||
|
||||
override def toString: String = {
|
||||
s"FetchState(topicId=$topicId" +
|
||||
|
|
@ -972,7 +975,7 @@ case class PartitionFetchState(topicId: Option[Uuid],
|
|||
s", lastFetchedEpoch=$lastFetchedEpoch" +
|
||||
s", state=$state" +
|
||||
s", lag=$lag" +
|
||||
s", delay=${delay.map(_.delayMs).getOrElse(0)}ms" +
|
||||
s", delay=${delay.getOrElse(0)}ms" +
|
||||
s")"
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,44 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.utils
|
||||
|
||||
import java.util.concurrent._
|
||||
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.math._
|
||||
|
||||
class DelayedItem(val delayMs: Long) extends Delayed with Logging {
|
||||
|
||||
private val dueMs = Time.SYSTEM.milliseconds + delayMs
|
||||
|
||||
def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
|
||||
|
||||
/**
|
||||
* The remaining delay time
|
||||
*/
|
||||
def getDelay(unit: TimeUnit): Long = {
|
||||
unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
def compareTo(d: Delayed): Int = {
|
||||
val other = d.asInstanceOf[DelayedItem]
|
||||
java.lang.Long.compare(dueMs, other.dueMs)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -22,7 +22,7 @@ import kafka.server.AbstractFetcherThread.ResultWithPartitions
|
|||
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
|
||||
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
|
||||
import kafka.server.metadata.ZkMetadataCache
|
||||
import kafka.utils.{DelayedItem, TestUtils}
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.errors.KafkaStorageException
|
||||
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
|
||||
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
|
||||
|
|
@ -1168,7 +1168,7 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
// one partition is ready and one is delayed
|
||||
val ResultWithPartitions(fetchRequest2Opt, partitionsWithError2) = thread.leader.buildFetch(Map(
|
||||
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, state = Fetching, lastFetchedEpoch = None),
|
||||
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
|
||||
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
|
||||
|
||||
assertTrue(fetchRequest2Opt.isDefined)
|
||||
val fetchRequest2 = fetchRequest2Opt.get
|
||||
|
|
@ -1181,8 +1181,8 @@ class ReplicaAlterLogDirsThreadTest {
|
|||
|
||||
// both partitions are delayed
|
||||
val ResultWithPartitions(fetchRequest3Opt, partitionsWithError3) = thread.leader.buildFetch(Map(
|
||||
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None),
|
||||
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(new DelayedItem(5000)), state = Fetching, lastFetchedEpoch = None)))
|
||||
t1p0 -> PartitionFetchState(Some(topicId), 140, None, leaderEpoch, delay = Some(5000), state = Fetching, lastFetchedEpoch = None),
|
||||
t1p1 -> PartitionFetchState(Some(topicId), 160, None, leaderEpoch, delay = Some(5000), state = Fetching, lastFetchedEpoch = None)))
|
||||
assertTrue(fetchRequest3Opt.isEmpty, "Expected no fetch requests since all partitions are delayed")
|
||||
assertFalse(partitionsWithError3.nonEmpty)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue