MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)

This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics. 

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Gardner Vickers 2020-03-25 20:53:42 -07:00 committed by GitHub
parent 08759b2531
commit 8cf781ef01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 360 additions and 211 deletions

1
.gitignore vendored
View File

@ -54,4 +54,5 @@ systest/
clients/src/generated clients/src/generated
clients/src/generated-test clients/src/generated-test
jmh-benchmarks/generated jmh-benchmarks/generated
jmh-benchmarks/src/main/generated
streams/src/generated streams/src/generated

View File

@ -1540,10 +1540,14 @@ project(':jmh-benchmarks') {
compile project(':core') compile project(':core')
compile project(':clients') compile project(':clients')
compile project(':streams') compile project(':streams')
compile project(':core')
compile project(':clients').sourceSets.test.output
compile project(':core').sourceSets.test.output
compile libs.jmhCore compile libs.jmhCore
compile libs.mockitoCore
annotationProcessor libs.jmhGeneratorAnnProcess annotationProcessor libs.jmhGeneratorAnnProcess
compile libs.jmhCoreBenchmarks compile libs.jmhCoreBenchmarks
compile libs.mockitoCore
compile libs.slf4jlog4j
} }
jar { jar {

View File

@ -39,6 +39,7 @@
<allow pkg="kafka.controller"/> <allow pkg="kafka.controller"/>
<allow pkg="kafka.coordinator"/> <allow pkg="kafka.coordinator"/>
<allow pkg="kafka.network"/> <allow pkg="kafka.network"/>
<allow pkg="kafka.utils"/>
<allow pkg="kafka.zk"/> <allow pkg="kafka.zk"/>
<allow class="kafka.utils.Pool"/> <allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/> <allow class="kafka.utils.KafkaScheduler"/>

View File

@ -284,17 +284,6 @@
</subpackage> </subpackage>
</subpackage> </subpackage>
<subpackage name="jmh">
<allow pkg="org.openjdk.jmh.annotations" />
<allow pkg="org.openjdk.jmh.runner" />
<allow pkg="org.openjdk.jmh.runner.options" />
<allow pkg="org.openjdk.jmh.infra" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.streams" />
<allow pkg="org.github.jamm" />
</subpackage>
<subpackage name="log4jappender"> <subpackage name="log4jappender">
<allow pkg="org.apache.log4j" /> <allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients" />

View File

@ -19,7 +19,7 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.{Optional, Properties} import java.util.{Optional, Properties}
import kafka.api.{ApiVersion, LeaderAndIsr, Request} import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.common.UnexpectedAppendOffsetException import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController import kafka.controller.KafkaController
import kafka.log._ import kafka.log._
@ -266,7 +266,7 @@ class Partition(val topicPartition: TopicPartition,
// current replica and the existence of the future replica, no other thread can update the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the
// current replica or remove the future replica. // current replica or remove the future replica.
inWriteLock(leaderIsrUpdateLock) { inWriteLock(leaderIsrUpdateLock) {
val currentLogDir = localLogOrException.dir.getParent val currentLogDir = localLogOrException.parentDir
if (currentLogDir == logDir) { if (currentLogDir == logDir) {
info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " + info(s"Current log directory $currentLogDir is same as requested log dir $logDir. " +
s"Skipping future replica creation.") s"Skipping future replica creation.")
@ -274,34 +274,34 @@ class Partition(val topicPartition: TopicPartition,
} else { } else {
futureLog match { futureLog match {
case Some(partitionFutureLog) => case Some(partitionFutureLog) =>
val futureLogDir = partitionFutureLog.dir.getParent val futureLogDir = partitionFutureLog.parentDir
if (futureLogDir != logDir) if (futureLogDir != logDir)
throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " + throw new IllegalStateException(s"The future log dir $futureLogDir of $topicPartition is " +
s"different from the requested log dir $logDir") s"different from the requested log dir $logDir")
false false
case None => case None =>
createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, highWatermarkCheckpoints) createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
true true
} }
} }
} }
} }
def createLogIfNotExists(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = { def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
isFutureReplica match { isFutureReplica match {
case true if futureLog.isEmpty => case true if futureLog.isEmpty =>
val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.futureLog = Option(log) this.futureLog = Option(log)
case false if log.isEmpty => case false if log.isEmpty =>
val log = createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) val log = createLog(isNew, isFutureReplica, offsetCheckpoints)
this.log = Option(log) this.log = Option(log)
case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
} }
} }
// Visible for testing // Visible for testing
private[cluster] def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val fetchLogConfig = () => { def fetchLogConfig: LogConfig = {
val props = stateStore.fetchTopicConfig() val props = stateStore.fetchTopicConfig()
LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) LogConfig.fromProps(logManager.currentDefaultConfig.originals, props)
} }
@ -309,8 +309,8 @@ class Partition(val topicPartition: TopicPartition,
logManager.initializingLog(topicPartition) logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None var maybeLog: Option[Log] = None
try { try {
val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig(), isNew, isFutureReplica) val log = logManager.getOrCreateLog(topicPartition, fetchLogConfig, isNew, isFutureReplica)
val checkpointHighWatermark = offsetCheckpoints.fetch(log.dir.getParent, topicPartition).getOrElse { val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition") info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L 0L
} }
@ -319,7 +319,7 @@ class Partition(val topicPartition: TopicPartition,
maybeLog = Some(log) maybeLog = Some(log)
log log
} finally { } finally {
logManager.finishedInitializingLog(topicPartition, maybeLog, fetchLogConfig) logManager.finishedInitializingLog(topicPartition, maybeLog, () => fetchLogConfig)
} }
} }
@ -410,7 +410,7 @@ class Partition(val topicPartition: TopicPartition,
def futureReplicaDirChanged(newDestinationDir: String): Boolean = { def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
inReadLock(leaderIsrUpdateLock) { inReadLock(leaderIsrUpdateLock) {
futureLog.exists(_.dir.getParent != newDestinationDir) futureLog.exists(_.parentDir != newDestinationDir)
} }
} }
@ -478,9 +478,7 @@ class Partition(val topicPartition: TopicPartition,
* from the time when this broker was the leader last time) and setting the new leader and ISR. * from the time when this broker was the leader last time) and setting the new leader and ISR.
* If the leader replica id does not change, return false to indicate the replica manager. * If the leader replica id does not change, return false to indicate the replica manager.
*/ */
def makeLeader(controllerId: Int, def makeLeader(partitionState: LeaderAndIsrPartitionState,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
@ -493,7 +491,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
) )
createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
val leaderLog = localLogOrException val leaderLog = localLogOrException
val leaderEpochStartOffset = leaderLog.logEndOffset val leaderEpochStartOffset = leaderLog.logEndOffset
@ -549,9 +547,7 @@ class Partition(val topicPartition: TopicPartition,
* greater (that is, no updates have been missed), return false to indicate to the * greater (that is, no updates have been missed), return false to indicate to the
* replica manager that state is already correct and the become-follower steps can be skipped * replica manager that state is already correct and the become-follower steps can be skipped
*/ */
def makeFollower(controllerId: Int, def makeFollower(partitionState: LeaderAndIsrPartitionState,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean = { highWatermarkCheckpoints: OffsetCheckpoints): Boolean = {
inWriteLock(leaderIsrUpdateLock) { inWriteLock(leaderIsrUpdateLock) {
val newLeaderBrokerId = partitionState.leader val newLeaderBrokerId = partitionState.leader
@ -566,7 +562,7 @@ class Partition(val topicPartition: TopicPartition,
addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt), addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt),
removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt) removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
) )
createLogIfNotExists(localBrokerId, partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints)
leaderEpoch = partitionState.leaderEpoch leaderEpoch = partitionState.leaderEpoch
leaderEpochStartOffsetOpt = None leaderEpochStartOffsetOpt = None

View File

@ -188,7 +188,7 @@ object RollParams {
* New log segments are created according to a configurable policy that controls the size in bytes or time interval * New log segments are created according to a configurable policy that controls the size in bytes or time interval
* for a given segment. * for a given segment.
* *
* @param dir The directory in which log segments are created. * @param _dir The directory in which log segments are created.
* @param config The log configuration settings * @param config The log configuration settings
* @param logStartOffset The earliest offset allowed to be exposed to kafka client. * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
* The logStartOffset can be updated by : * The logStartOffset can be updated by :
@ -209,7 +209,7 @@ object RollParams {
* @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
*/ */
@threadsafe @threadsafe
class Log(@volatile var dir: File, class Log(@volatile private var _dir: File,
@volatile var config: LogConfig, @volatile var config: LogConfig,
@volatile var logStartOffset: Long, @volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long, @volatile var recoveryPoint: Long,
@ -228,36 +228,17 @@ class Log(@volatile var dir: File,
/* A lock that guards all modifications to the log */ /* A lock that guards all modifications to the log */
private val lock = new Object private val lock = new Object
// The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers() // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
// After memory mapped buffer is closed, no disk IO operation should be performed for this log // After memory mapped buffer is closed, no disk IO operation should be performed for this log
@volatile private var isMemoryMappedBufferClosed = false @volatile private var isMemoryMappedBufferClosed = false
// Cache value of parent directory to avoid allocations in hot paths like ReplicaManager.checkpointHighWatermarks
@volatile private var _parentDir: String = dir.getParent
/* last time it was flushed */ /* last time it was flushed */
private val lastFlushedTime = new AtomicLong(time.milliseconds) private val lastFlushedTime = new AtomicLong(time.milliseconds)
def initFileSize: Int = {
if (config.preallocate)
config.segmentSize
else
0
}
def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}
private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _ @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/* The earliest offset which is part of an incomplete transaction. This is used to compute the /* The earliest offset which is part of an incomplete transaction. This is used to compute the
@ -316,6 +297,35 @@ class Log(@volatile var dir: File,
s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms") s"log end offset $logEndOffset in ${time.milliseconds() - startMs} ms")
} }
def dir: File = _dir
def parentDir: String = _parentDir
def parentDirFile: File = new File(_parentDir)
def initFileSize: Int = {
if (config.preallocate)
config.segmentSize
else
0
}
def updateConfig(newConfig: LogConfig): Unit = {
val oldConfig = this.config
this.config = newConfig
val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
val newRecordVersion = newConfig.messageFormatVersion.recordVersion
if (newRecordVersion.precedes(oldRecordVersion))
warn(s"Record format version has been downgraded from $oldRecordVersion to $newRecordVersion.")
if (newRecordVersion.value != oldRecordVersion.value)
initializeLeaderEpochCache()
}
private def checkIfMemoryMappedBufferClosed(): Unit = {
if (isMemoryMappedBufferClosed)
throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}
def highWatermark: Long = highWatermarkMetadata.messageOffset def highWatermark: Long = highWatermarkMetadata.messageOffset
/** /**
@ -961,7 +971,8 @@ class Log(@volatile var dir: File,
val renamedDir = new File(dir.getParent, name) val renamedDir = new File(dir.getParent, name)
Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath) Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
if (renamedDir != dir) { if (renamedDir != dir) {
dir = renamedDir _dir = renamedDir
_parentDir = renamedDir.getParent
logSegments.foreach(_.updateDir(renamedDir)) logSegments.foreach(_.updateDir(renamedDir))
producerStateManager.logDir = dir producerStateManager.logDir = dir
// re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference

View File

@ -315,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
} catch { } catch {
case e: LogCleaningException => case e: LogCleaningException =>
warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e) warn(s"Unexpected exception thrown when cleaning log ${e.log}. Marking its partition (${e.log.topicPartition}) as uncleanable", e)
cleanerManager.markPartitionUncleanable(e.log.dir.getParent, e.log.topicPartition) cleanerManager.markPartitionUncleanable(e.log.parentDir, e.log.topicPartition)
false false
} }
@ -365,11 +365,11 @@ class LogCleaner(initialConfig: CleanerConfig,
case _: LogCleaningAbortedException => // task can be aborted, let it go. case _: LogCleaningAbortedException => // task can be aborted, let it go.
case _: KafkaStorageException => // partition is already offline. let it go. case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException => case e: IOException =>
val logDirectory = cleanable.log.dir.getParent val logDirectory = cleanable.log.parentDir
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException" val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${logDirectory} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e) logDirFailureChannel.maybeAddOfflineLogDir(logDirectory, msg, e)
} finally { } finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.parentDirFile, endOffset)
} }
} }

View File

@ -182,7 +182,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now) val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
// update checkpoint for logs with invalid checkpointed offsets // update checkpoint for logs with invalid checkpointed offsets
if (offsetsToClean.forceUpdateCheckpoint) if (offsetsToClean.forceUpdateCheckpoint)
updateCheckpoints(log.dir.getParentFile(), Option(topicPartition, offsetsToClean.firstDirtyOffset)) updateCheckpoints(log.parentDirFile, Option(topicPartition, offsetsToClean.firstDirtyOffset))
val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now) val compactionDelayMs = maxCompactionDelay(log, offsetsToClean.firstDirtyOffset, now)
preCleanStats.updateMaxCompactionDelay(compactionDelayMs) preCleanStats.updateMaxCompactionDelay(compactionDelayMs)
@ -379,7 +379,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
case Some(offset) => case Some(offset) =>
// Remove this partition from the checkpoint file in the source log directory // Remove this partition from the checkpoint file in the source log directory
updateCheckpoints(sourceLogDir, None) updateCheckpoints(sourceLogDir, None)
// Add offset for this partition to the checkpoint file in the source log directory // Add offset for this partition to the checkpoint file in the destination log directory
updateCheckpoints(destLogDir, Option(topicPartition, offset)) updateCheckpoints(destLogDir, Option(topicPartition, offset))
case None => case None =>
} }
@ -478,7 +478,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = { private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
inLock(lock) { inLock(lock) {
uncleanablePartitions.get(log.dir.getParent).exists(partitions => partitions.contains(topicPartition)) uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
} }
} }
} }

View File

@ -199,7 +199,7 @@ class LogManager(logDirs: Seq[File],
cleaner.handleLogDirFailure(dir) cleaner.handleLogDirFailure(dir)
val offlineCurrentTopicPartitions = currentLogs.collect { val offlineCurrentTopicPartitions = currentLogs.collect {
case (tp, log) if log.dir.getParent == dir => tp case (tp, log) if log.parentDir == dir => tp
} }
offlineCurrentTopicPartitions.foreach { topicPartition => { offlineCurrentTopicPartitions.foreach { topicPartition => {
val removedLog = currentLogs.remove(topicPartition) val removedLog = currentLogs.remove(topicPartition)
@ -210,7 +210,7 @@ class LogManager(logDirs: Seq[File],
}} }}
val offlineFutureTopicPartitions = futureLogs.collect { val offlineFutureTopicPartitions = futureLogs.collect {
case (tp, log) if log.dir.getParent == dir => tp case (tp, log) if log.parentDir == dir => tp
} }
offlineFutureTopicPartitions.foreach { topicPartition => { offlineFutureTopicPartitions.foreach { topicPartition => {
val removedLog = futureLogs.remove(topicPartition) val removedLog = futureLogs.remove(topicPartition)
@ -282,7 +282,7 @@ class LogManager(logDirs: Seq[File],
} }
if (previous != null) { if (previous != null) {
if (log.isFuture) if (log.isFuture)
throw new IllegalStateException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) throw new IllegalStateException(s"Duplicate log directories found: ${log.dir.getAbsolutePath}, ${previous.dir.getAbsolutePath}")
else else
throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " + throw new IllegalStateException(s"Duplicate log directories for $topicPartition are found in both ${log.dir.getAbsolutePath} " +
s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " + s"and ${previous.dir.getAbsolutePath}. It is likely because log directory failure happened while broker was " +
@ -514,7 +514,7 @@ class LogManager(logDirs: Seq[File],
if (log.truncateTo(truncateOffset)) if (log.truncateTo(truncateOffset))
affectedLogs += log affectedLogs += log
if (needToStopCleaner && !isFuture) if (needToStopCleaner && !isFuture)
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
} finally { } finally {
if (needToStopCleaner && !isFuture) { if (needToStopCleaner && !isFuture) {
cleaner.resumeCleaning(Seq(topicPartition)) cleaner.resumeCleaning(Seq(topicPartition))
@ -524,7 +524,7 @@ class LogManager(logDirs: Seq[File],
} }
} }
for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) { for ((dir, logs) <- affectedLogs.groupBy(_.parentDirFile)) {
checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs) checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
} }
} }
@ -551,7 +551,7 @@ class LogManager(logDirs: Seq[File],
try { try {
log.truncateFullyAndStartAt(newOffset) log.truncateFullyAndStartAt(newOffset)
if (cleaner != null && !isFuture) { if (cleaner != null && !isFuture) {
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) cleaner.maybeTruncateCheckpoint(log.parentDirFile, topicPartition, log.activeSegment.baseOffset)
} }
} finally { } finally {
if (cleaner != null && !isFuture) { if (cleaner != null && !isFuture) {
@ -559,7 +559,7 @@ class LogManager(logDirs: Seq[File],
info(s"Compaction for partition $topicPartition is resumed") info(s"Compaction for partition $topicPartition is resumed")
} }
} }
checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log)) checkpointRecoveryOffsetsAndCleanSnapshot(log.parentDirFile, Seq(log))
} }
} }
@ -633,8 +633,8 @@ class LogManager(logDirs: Seq[File],
// The logDir should be an absolute path // The logDir should be an absolute path
def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = { def maybeUpdatePreferredLogDir(topicPartition: TopicPartition, logDir: String): Unit = {
// Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir // Do not cache the preferred log directory if either the current log or the future log for this partition exists in the specified logDir
if (!getLog(topicPartition).exists(_.dir.getParent == logDir) && if (!getLog(topicPartition).exists(_.parentDir == logDir) &&
!getLog(topicPartition, isFuture = true).exists(_.dir.getParent == logDir)) !getLog(topicPartition, isFuture = true).exists(_.parentDir == logDir))
preferredLogDirs.put(topicPartition, logDir) preferredLogDirs.put(topicPartition, logDir)
} }
@ -723,7 +723,7 @@ class LogManager(logDirs: Seq[File],
if (isFuture) { if (isFuture) {
if (preferredLogDir == null) if (preferredLogDir == null)
throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory") throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")
else if (getLog(topicPartition).get.dir.getParent == preferredLogDir) else if (getLog(topicPartition).get.parentDir == preferredLogDir)
throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition") throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")
} }
@ -818,7 +818,7 @@ class LogManager(logDirs: Seq[File],
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.") info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch { } catch {
case e: KafkaStorageException => case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e) error(s"Exception while deleting $removedLog in dir ${removedLog.parentDir}.", e)
} }
} }
} }
@ -866,7 +866,7 @@ class LogManager(logDirs: Seq[File],
futureLogs.remove(topicPartition) futureLogs.remove(topicPartition)
currentLogs.put(topicPartition, destLog) currentLogs.put(topicPartition, destLog)
if (cleaner != null) { if (cleaner != null) {
cleaner.alterCheckpointDir(topicPartition, sourceLog.dir.getParentFile, destLog.dir.getParentFile) cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile)
cleaner.resumeCleaning(Seq(topicPartition)) cleaner.resumeCleaning(Seq(topicPartition))
info(s"Compaction for partition $topicPartition is resumed") info(s"Compaction for partition $topicPartition is resumed")
} }
@ -876,8 +876,8 @@ class LogManager(logDirs: Seq[File],
// Now that replica in source log directory has been successfully renamed for deletion. // Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted. // Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close() sourceLog.close()
checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty) checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile) checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
addLogToBeDeleted(sourceLog) addLogToBeDeleted(sourceLog)
} catch { } catch {
case e: KafkaStorageException => case e: KafkaStorageException =>
@ -911,11 +911,11 @@ class LogManager(logDirs: Seq[File],
//We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
if (cleaner != null && !isFuture) { if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition) cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile) cleaner.updateCheckpoints(removedLog.parentDirFile)
} }
removedLog.renameDir(Log.logDeleteDirName(topicPartition)) removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty) checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile) checkpointLogStartOffsetsInDir(removedLog.parentDirFile)
addLogToBeDeleted(removedLog) addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
} else if (offlineLogDirs.nonEmpty) { } else if (offlineLogDirs.nonEmpty) {
@ -934,7 +934,7 @@ class LogManager(logDirs: Seq[File],
List(_liveLogDirs.peek()) List(_liveLogDirs.peek())
} else { } else {
// count the number of logs in each parent directory (including 0 for empty directories // count the number of logs in each parent directory (including 0 for empty directories
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) val logCounts = allLogs.groupBy(_.parentDir).mapValues(_.size)
val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap val zeros = _liveLogDirs.asScala.map(dir => (dir.getPath, 0)).toMap
val dirCounts = (zeros ++ logCounts).toBuffer val dirCounts = (zeros ++ logCounts).toBuffer
@ -1005,7 +1005,7 @@ class LogManager(logDirs: Seq[File],
*/ */
private def logsByDir: Map[String, Map[TopicPartition, Log]] = { private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
(this.currentLogs.toList ++ this.futureLogs.toList).toMap (this.currentLogs.toList ++ this.futureLogs.toList).toMap
.groupBy { case (_, log) => log.dir.getParent } .groupBy { case (_, log) => log.parentDir }
} }
// logDir should be an absolute path // logDir should be an absolute path

View File

@ -480,7 +480,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
def getLogDir(topicPartition: TopicPartition): Option[String] = { def getLogDir(topicPartition: TopicPartition): Option[String] = {
localLog(topicPartition).map(_.dir.getParent) localLog(topicPartition).map(_.parentDir)
} }
/** /**
@ -661,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig,
* are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented. * are included. There may be future logs (which will replace the current logs of the partition in the future) on the broker after KIP-113 is implemented.
*/ */
def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = { def describeLogDirs(partitions: Set[TopicPartition]): List[DescribeLogDirsResponseData.DescribeLogDirsResult] = {
val logsByDir = logManager.allLogs.groupBy(log => log.dir.getParent) val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)
config.logDirs.toSet.map { logDir: String => config.logDirs.toSet.map { logDir: String =>
val absolutePath = new File(logDir).getAbsolutePath val absolutePath = new File(logDir).getAbsolutePath
@ -1303,7 +1303,7 @@ class ReplicaManager(val config: KafkaConfig,
val leader = BrokerEndPoint(config.brokerId, "localhost", -1) val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
// Add future replica to partition's map // Add future replica to partition's map
partition.createLogIfNotExists(Request.FutureLocalReplicaId, isNew = false, isFutureReplica = true, partition.createLogIfNotExists(isNew = false, isFutureReplica = true,
highWatermarkCheckpoints) highWatermarkCheckpoints)
// pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move // pause cleaning for partitions that are being moved and start ReplicaAlterDirThread to move
@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig,
// Update the partition information to be the leader // Update the partition information to be the leader
partitionStates.foreach { case (partition, partitionState) => partitionStates.foreach { case (partition, partitionState) =>
try { try {
if (partition.makeLeader(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) { if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) {
partitionsToMakeLeaders += partition partitionsToMakeLeaders += partition
stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " + stateChangeLogger.trace(s"Stopped fetchers as part of become-leader request from " +
s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " + s"controller $controllerId epoch $controllerEpoch with correlation id $correlationId for partition ${partition.topicPartition} " +
@ -1451,7 +1451,7 @@ class ReplicaManager(val config: KafkaConfig,
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available // Only change partition state when the leader is available
case Some(_) => case Some(_) =>
if (partition.makeFollower(controllerId, partitionState, correlationId, highWatermarkCheckpoints)) if (partition.makeFollower(partitionState, highWatermarkCheckpoints))
partitionsToMakeFollower += partition partitionsToMakeFollower += partition
else else
stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " +
@ -1468,7 +1468,7 @@ class ReplicaManager(val config: KafkaConfig,
s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.")
// Create the local replica even if the leader is unavailable. This is required to ensure that we include // Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647) // the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.createLogIfNotExists(localBrokerId, isNew = partitionState.isNew, isFutureReplica = false, partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false,
highWatermarkCheckpoints) highWatermarkCheckpoints)
} }
} catch { } catch {
@ -1600,20 +1600,25 @@ class ReplicaManager(val config: KafkaConfig,
// Flushes the highwatermark value for all partitions to the highwatermark file // Flushes the highwatermark value for all partitions to the highwatermark file
def checkpointHighWatermarks(): Unit = { def checkpointHighWatermarks(): Unit = {
val localLogs = nonOfflinePartitionsIterator.flatMap { partition => def putHw(logDirToCheckpoints: mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]],
val logsList: mutable.Set[Log] = mutable.Set() log: Log): Unit = {
partition.log.foreach(logsList.add) val checkpoints = logDirToCheckpoints.getOrElseUpdate(log.parentDir,
partition.futureLog.foreach(logsList.add) new mutable.AnyRefMap[TopicPartition, Long]())
logsList checkpoints.put(log.topicPartition, log.highWatermark)
}.toBuffer }
val logsByDir = localLogs.groupBy(_.dir.getParent)
for ((dir, logs) <- logsByDir) { val logDirToHws = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Long]](
val hwms = logs.map(log => log.topicPartition -> log.highWatermark).toMap allPartitions.size)
try { nonOfflinePartitionsIterator.foreach { partition =>
highWatermarkCheckpoints.get(dir).foreach(_.write(hwms)) partition.log.foreach(putHw(logDirToHws, _))
} catch { partition.futureLog.foreach(putHw(logDirToHws, _))
}
for ((logDir, hws) <- logDirToHws) {
try highWatermarkCheckpoints.get(logDir).foreach(_.write(hws))
catch {
case e: KafkaStorageException => case e: KafkaStorageException =>
error(s"Error while writing to highwatermark file in directory $dir", e) error(s"Error while writing to highwatermark file in directory $logDir", e)
} }
} }
} }
@ -1632,11 +1637,11 @@ class ReplicaManager(val config: KafkaConfig,
warn(s"Stopping serving replicas in dir $dir") warn(s"Stopping serving replicas in dir $dir")
replicaStateChangeLock synchronized { replicaStateChangeLock synchronized {
val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition => val newOfflinePartitions = nonOfflinePartitionsIterator.filter { partition =>
partition.log.exists { _.dir.getParent == dir } partition.log.exists { _.parentDir == dir }
}.map(_.topicPartition).toSet }.map(_.topicPartition).toSet
val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition => val partitionsWithOfflineFutureReplica = nonOfflinePartitionsIterator.filter { partition =>
partition.futureLog.exists { _.dir.getParent == dir } partition.futureLog.exists { _.parentDir == dir }
}.toSet }.toSet
replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)

View File

@ -112,7 +112,7 @@ class AssignmentStateTest(isr: List[Integer], replicas: List[Integer],
if (original.nonEmpty) if (original.nonEmpty)
partition.assignmentState = SimpleAssignmentState(original) partition.assignmentState = SimpleAssignmentState(original)
// do the test // do the test
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
assertEquals(isReassigning, partition.isReassigning) assertEquals(isReassigning, partition.isReassigning)
if (adding.nonEmpty) if (adding.nonEmpty)
adding.foreach(r => assertTrue(partition.isAddingReplica(r))) adding.foreach(r => assertTrue(partition.isAddingReplica(r)))

View File

@ -222,8 +222,8 @@ class PartitionLockTest extends Logging {
} }
} }
override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
new SlowLog(log, mockTime, appendSemaphore) new SlowLog(log, mockTime, appendSemaphore)
} }
} }
@ -235,21 +235,21 @@ class PartitionLockTest extends Logging {
when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr])) when(stateStore.expandIsr(ArgumentMatchers.anyInt, ArgumentMatchers.any[LeaderAndIsr]))
.thenReturn(Some(2)) .thenReturn(Some(2))
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val controllerId = 0 val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava val replicas = (0 to numReplicaFetchers).map(i => Integer.valueOf(brokerId + i)).toList.asJava
val isr = replicas val isr = replicas
assertTrue("Expected become leader transition to succeed", partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() assertTrue("Expected become leader transition to succeed", partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), 0, offsetCheckpoints)) .setIsNew(true), offsetCheckpoints))
partition partition
} }
@ -310,4 +310,4 @@ class PartitionLockTest extends Logging {
appendInfo appendInfo
} }
} }
} }

View File

@ -109,7 +109,7 @@ class PartitionTest extends AbstractPartitionTest {
val latch = new CountDownLatch(1) val latch = new CountDownLatch(1)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
@ -153,13 +153,13 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache, metadataCache,
logManager) { logManager) {
override def createLog(replicaId: Int, isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = { override def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Log = {
val log = super.createLog(replicaId, isNew, isFutureReplica, offsetCheckpoints) val log = super.createLog(isNew, isFutureReplica, offsetCheckpoints)
new SlowLog(log, mockTime, appendSemaphore) new SlowLog(log, mockTime, appendSemaphore)
} }
} }
partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
val appendThread = new Thread { val appendThread = new Thread {
override def run(): Unit = { override def run(): Unit = {
@ -180,7 +180,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeFollower(0, partitionState, 0, offsetCheckpoints)) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
appendSemaphore.release() appendSemaphore.release()
appendThread.join() appendThread.join()
@ -194,7 +194,7 @@ class PartitionTest extends AbstractPartitionTest {
// active segment // active segment
def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = { def testMaybeReplaceCurrentWithFutureReplicaDifferentBaseOffsets(): Unit = {
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir1.getAbsolutePath)
partition.createLogIfNotExists(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = true, isFutureReplica = false, offsetCheckpoints)
logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath) logManager.maybeUpdatePreferredLogDir(topicPartition, logDir2.getAbsolutePath)
partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints) partition.maybeCreateFutureReplica(logDir2.getAbsolutePath, offsetCheckpoints)
@ -465,7 +465,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId val leader = brokerId
val follower1 = brokerId + 1 val follower1 = brokerId + 1
val follower2 = brokerId + 2 val follower2 = brokerId + 2
val controllerId = brokerId + 3
val replicas = List(leader, follower1, follower2) val replicas = List(leader, follower1, follower2)
val isr = List[Integer](leader, follower2).asJava val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8 val leaderEpoch = 8
@ -486,7 +485,7 @@ class PartitionTest extends AbstractPartitionTest {
.setIsNew(true) .setIsNew(true)
assertTrue("Expected first makeLeader() to return 'leader changed'", assertTrue("Expected first makeLeader() to return 'leader changed'",
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) partition.makeLeader(leaderState, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
@ -561,7 +560,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints)) assertTrue(partition.makeFollower(followerState, offsetCheckpoints))
// Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition // Back to leader, this resets the startLogOffset for this epoch (to 2), we're now in the fault condition
val newLeaderState = new LeaderAndIsrPartitionState() val newLeaderState = new LeaderAndIsrPartitionState()
@ -573,7 +572,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeLeader(controllerId, newLeaderState, 2, offsetCheckpoints)) assertTrue(partition.makeLeader(newLeaderState, offsetCheckpoints))
// Try to get offsets as a client // Try to get offsets as a client
fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match { fetchOffsetsForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
@ -636,34 +635,33 @@ class PartitionTest extends AbstractPartitionTest {
private def setupPartitionWithMocks(leaderEpoch: Int, private def setupPartitionWithMocks(leaderEpoch: Int,
isLeader: Boolean, isLeader: Boolean,
log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = { log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val replicas = List[Integer](brokerId, brokerId + 1).asJava val replicas = List[Integer](brokerId, brokerId + 1).asJava
val isr = replicas val isr = replicas
if (isLeader) { if (isLeader) {
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), 0, offsetCheckpoints)) .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
} else { } else {
assertTrue("Expected become follower transition to succeed", assertTrue("Expected become follower transition to succeed",
partition.makeFollower(controllerId, new LeaderAndIsrPartitionState() partition.makeFollower(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId + 1) .setLeader(brokerId + 1)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), 0, offsetCheckpoints)) .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
assertEquals(None, partition.leaderLogIfLocal) assertEquals(None, partition.leaderLogIfLocal)
} }
@ -673,7 +671,7 @@ class PartitionTest extends AbstractPartitionTest {
@Test @Test
def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = { def testAppendRecordsAsFollowerBelowLogStartOffset(): Unit = {
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val log = partition.localLogOrException val log = partition.localLogOrException
val initialLogStartOffset = 5L val initialLogStartOffset = 5L
@ -723,7 +721,6 @@ class PartitionTest extends AbstractPartitionTest {
@Test @Test
def testListOffsetIsolationLevels(): Unit = { def testListOffsetIsolationLevels(): Unit = {
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val replicas = List[Integer](brokerId, brokerId + 1).asJava val replicas = List[Integer](brokerId, brokerId + 1).asJava
@ -731,17 +728,17 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader(controllerId, new LeaderAndIsrPartitionState() partition.makeLeader(new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
.setLeaderEpoch(leaderEpoch) .setLeaderEpoch(leaderEpoch)
.setIsr(isr) .setIsr(isr)
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), 0, offsetCheckpoints)) .setIsNew(true), offsetCheckpoints))
assertEquals(leaderEpoch, partition.getLeaderEpoch) assertEquals(leaderEpoch, partition.getLeaderEpoch)
val records = createTransactionalRecords(List( val records = createTransactionalRecords(List(
@ -811,7 +808,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
partition.makeFollower(0, partitionState, 0, offsetCheckpoints) partition.makeFollower(partitionState, offsetCheckpoints)
// Request with same leader and epoch increases by only 1, do become-follower steps // Request with same leader and epoch increases by only 1, do become-follower steps
partitionState = new LeaderAndIsrPartitionState() partitionState = new LeaderAndIsrPartitionState()
@ -822,7 +819,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
.setIsNew(false) .setIsNew(false)
assertTrue(partition.makeFollower(0, partitionState, 2, offsetCheckpoints)) assertTrue(partition.makeFollower(partitionState, offsetCheckpoints))
// Request with same leader and same epoch, skip become-follower steps // Request with same leader and same epoch, skip become-follower steps
partitionState = new LeaderAndIsrPartitionState() partitionState = new LeaderAndIsrPartitionState()
@ -832,7 +829,7 @@ class PartitionTest extends AbstractPartitionTest {
.setIsr(List[Integer](0, 1, 2, brokerId).asJava) .setIsr(List[Integer](0, 1, 2, brokerId).asJava)
.setZkVersion(1) .setZkVersion(1)
.setReplicas(List[Integer](0, 1, 2, brokerId).asJava) .setReplicas(List[Integer](0, 1, 2, brokerId).asJava)
assertFalse(partition.makeFollower(0, partitionState, 2, offsetCheckpoints)) assertFalse(partition.makeFollower(partitionState, offsetCheckpoints))
} }
@Test @Test
@ -841,7 +838,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId val leader = brokerId
val follower1 = brokerId + 1 val follower1 = brokerId + 1
val follower2 = brokerId + 2 val follower2 = brokerId + 2
val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader, follower2).asJava val isr = List[Integer](leader, follower2).asJava
val leaderEpoch = 8 val leaderEpoch = 8
@ -862,7 +858,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
assertTrue("Expected first makeLeader() to return 'leader changed'", assertTrue("Expected first makeLeader() to return 'leader changed'",
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints)) partition.makeLeader(leaderState, offsetCheckpoints))
assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch) assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds) assertEquals("ISR", Set[Integer](leader, follower2), partition.inSyncReplicaIds)
@ -898,7 +894,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeFollower(controllerId, followerState, 1, offsetCheckpoints) partition.makeFollower(followerState, offsetCheckpoints)
val newLeaderState = new LeaderAndIsrPartitionState() val newLeaderState = new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
@ -909,7 +905,7 @@ class PartitionTest extends AbstractPartitionTest {
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()", assertTrue("Expected makeLeader() to return 'leader changed' after makeFollower()",
partition.makeLeader(controllerEpoch, newLeaderState, 2, offsetCheckpoints)) partition.makeLeader(newLeaderState, offsetCheckpoints))
val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset val currentLeaderEpochStartOffset = partition.localLogOrException.logEndOffset
// append records with the latest leader epoch // append records with the latest leader epoch
@ -937,7 +933,6 @@ class PartitionTest extends AbstractPartitionTest {
*/ */
@Test @Test
def testDelayedFetchAfterAppendRecords(): Unit = { def testDelayedFetchAfterAppendRecords(): Unit = {
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val replicaIds = List[Integer](brokerId, brokerId + 1).asJava val replicaIds = List[Integer](brokerId, brokerId + 1).asJava
@ -978,7 +973,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicaIds) .setReplicas(replicaIds)
.setIsNew(true) .setIsNew(true)
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
partitions += partition partitions += partition
} }
@ -1035,8 +1030,7 @@ class PartitionTest extends AbstractPartitionTest {
} }
def createTransactionalRecords(records: Iterable[SimpleRecord], def createTransactionalRecords(records: Iterable[SimpleRecord],
baseOffset: Long, baseOffset: Long): MemoryRecords = {
partitionLeaderEpoch: Int = 0): MemoryRecords = {
val producerId = 1L val producerId = 1L
val producerEpoch = 0.toShort val producerEpoch = 0.toShort
val baseSequence = 0 val baseSequence = 0
@ -1058,7 +1052,6 @@ class PartitionTest extends AbstractPartitionTest {
val leader = brokerId val leader = brokerId
val follower1 = brokerId + 1 val follower1 = brokerId + 1
val follower2 = brokerId + 2 val follower2 = brokerId + 2
val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader).asJava val isr = List[Integer](leader).asJava
val leaderEpoch = 8 val leaderEpoch = 8
@ -1073,7 +1066,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true) .setIsNew(true)
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
assertTrue(partition.isAtMinIsr) assertTrue(partition.isAtMinIsr)
} }
@ -1082,7 +1075,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 6, leaderEpoch = 4) seedLogData(log, numRecords = 6, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1091,12 +1083,11 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1105,7 +1096,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints)) offsetCheckpoints))
val remoteReplica = partition.getReplica(remoteBrokerId).get val remoteReplica = partition.getReplica(remoteBrokerId).get
@ -1146,7 +1136,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1155,11 +1144,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
"Expected become leader transition to succeed", "Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1168,7 +1156,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints) offsetCheckpoints)
) )
assertEquals(Set(brokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@ -1213,7 +1200,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1222,10 +1208,9 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1234,7 +1219,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints)) offsetCheckpoints))
assertEquals(Set(brokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId), partition.inSyncReplicaIds)
@ -1268,7 +1252,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1278,11 +1261,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
"Expected become leader transition to succeed", "Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1291,7 +1273,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints) offsetCheckpoints)
) )
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@ -1325,7 +1306,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1335,11 +1315,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
"Expected become leader transition to succeed", "Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1348,7 +1327,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints) offsetCheckpoints)
) )
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@ -1399,7 +1377,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1409,11 +1386,10 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue( assertTrue(
"Expected become leader transition to succeed", "Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1422,7 +1398,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas.map(Int.box).asJava) .setReplicas(replicas.map(Int.box).asJava)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints) offsetCheckpoints)
) )
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
@ -1458,7 +1433,6 @@ class PartitionTest extends AbstractPartitionTest {
val log = logManager.getOrCreateLog(topicPartition, logConfig) val log = logManager.getOrCreateLog(topicPartition, logConfig)
seedLogData(log, numRecords = 10, leaderEpoch = 4) seedLogData(log, numRecords = 10, leaderEpoch = 4)
val controllerId = 0
val controllerEpoch = 0 val controllerEpoch = 0
val leaderEpoch = 5 val leaderEpoch = 5
val remoteBrokerId = brokerId + 1 val remoteBrokerId = brokerId + 1
@ -1468,10 +1442,9 @@ class PartitionTest extends AbstractPartitionTest {
doNothing().when(delayedOperations).checkAndCompleteFetch() doNothing().when(delayedOperations).checkAndCompleteFetch()
val initializeTimeMs = time.milliseconds() val initializeTimeMs = time.milliseconds()
partition.createLogIfNotExists(brokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
assertTrue("Expected become leader transition to succeed", assertTrue("Expected become leader transition to succeed",
partition.makeLeader( partition.makeLeader(
controllerId,
new LeaderAndIsrPartitionState() new LeaderAndIsrPartitionState()
.setControllerEpoch(controllerEpoch) .setControllerEpoch(controllerEpoch)
.setLeader(brokerId) .setLeader(brokerId)
@ -1480,7 +1453,6 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true), .setIsNew(true),
0,
offsetCheckpoints)) offsetCheckpoints))
assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds)
assertEquals(0L, partition.localLogOrException.highWatermark) assertEquals(0L, partition.localLogOrException.highWatermark)
@ -1513,7 +1485,6 @@ class PartitionTest extends AbstractPartitionTest {
when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition)) when(offsetCheckpoints.fetch(logDir1.getAbsolutePath, topicPartition))
.thenReturn(Some(4L)) .thenReturn(Some(4L))
val controllerId = 0
val controllerEpoch = 3 val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1).asJava val replicas = List[Integer](brokerId, brokerId + 1).asJava
val leaderState = new LeaderAndIsrPartitionState() val leaderState = new LeaderAndIsrPartitionState()
@ -1524,7 +1495,7 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
assertEquals(4, partition.localLogOrException.highWatermark) assertEquals(4, partition.localLogOrException.highWatermark)
} }
@ -1553,7 +1524,6 @@ class PartitionTest extends AbstractPartitionTest {
@Test @Test
def testUnderReplicatedPartitionsCorrectSemantics(): Unit = { def testUnderReplicatedPartitionsCorrectSemantics(): Unit = {
val controllerId = 0
val controllerEpoch = 3 val controllerEpoch = 3
val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava val replicas = List[Integer](brokerId, brokerId + 1, brokerId + 2).asJava
val isr = List[Integer](brokerId, brokerId + 1).asJava val isr = List[Integer](brokerId, brokerId + 1).asJava
@ -1566,11 +1536,11 @@ class PartitionTest extends AbstractPartitionTest {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(false) .setIsNew(false)
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
assertTrue(partition.isUnderReplicated) assertTrue(partition.isUnderReplicated)
leaderState = leaderState.setIsr(replicas) leaderState = leaderState.setIsr(replicas)
partition.makeLeader(controllerId, leaderState, 0, offsetCheckpoints) partition.makeLeader(leaderState, offsetCheckpoints)
assertFalse(partition.isUnderReplicated) assertFalse(partition.isUnderReplicated)
} }
@ -1626,7 +1596,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache, metadataCache,
spyLogManager) spyLogManager)
partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called // Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@ -1660,7 +1630,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache, metadataCache,
spyLogManager) spyLogManager)
partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called // Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))
@ -1695,7 +1665,7 @@ class PartitionTest extends AbstractPartitionTest {
metadataCache, metadataCache,
spyLogManager) spyLogManager)
partition.createLog(brokerId, isNew = true, isFutureReplica = false, offsetCheckpoints) partition.createLog(isNew = true, isFutureReplica = false, offsetCheckpoints)
// Validate that initializingLog and finishedInitializingLog was called // Validate that initializingLog and finishedInitializingLog was called
verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition)) verify(spyLogManager).initializingLog(ArgumentMatchers.eq(topicPartition))

View File

@ -89,7 +89,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks() rm.checkpointHighWatermarks()
} finally { } finally {
@ -109,7 +109,7 @@ class ReplicaManagerTest {
new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size))
try { try {
val partition = rm.createPartition(new TopicPartition(topic, 1)) val partition = rm.createPartition(new TopicPartition(topic, 1))
partition.createLogIfNotExists(1, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
rm.checkpointHighWatermarks() rm.checkpointHighWatermarks()
} finally { } finally {
@ -164,7 +164,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0)) val partition = rm.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader. // Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -216,7 +216,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0) val topicPartition = new TopicPartition(topic, 0)
replicaManager.createPartition(topicPartition) replicaManager.createPartition(topicPartition)
.createLogIfNotExists(0, isNew = false, isFutureReplica = false, .createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -270,7 +270,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader. // Make this replica the leader.
@ -330,7 +330,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader. // Make this replica the leader.
@ -436,7 +436,7 @@ class ReplicaManagerTest {
try { try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
// Make this replica the leader. // Make this replica the leader.
@ -512,7 +512,7 @@ class ReplicaManagerTest {
val brokerList = Seq[Integer](0, 1, 2).asJava val brokerList = Seq[Integer](0, 1, 2).asJava
val partition = rm.createPartition(new TopicPartition(topic, 0)) val partition = rm.createPartition(new TopicPartition(topic, 0))
partition.createLogIfNotExists(0, isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints)) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints))
// Make this replica the leader. // Make this replica the leader.
@ -668,8 +668,8 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1) val tp1 = new TopicPartition(topic, 1)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
replicaManager.createPartition(tp1).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val partition1Replicas = Seq[Integer](0, 2).asJava val partition1Replicas = Seq[Integer](0, 2).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -782,10 +782,10 @@ class ReplicaManagerTest {
val tp = new TopicPartition(topic, topicPartition) val tp = new TopicPartition(topic, topicPartition)
val partition = replicaManager.createPartition(tp) val partition = replicaManager.createPartition(tp)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.createLogIfNotExists(followerBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
partition.makeFollower(controllerId, partition.makeFollower(
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
correlationId, offsetCheckpoints) offsetCheckpoints)
// Make local partition a follower - because epoch increased by more than 1, truncation should // Make local partition a follower - because epoch increased by more than 1, truncation should
// trigger even though leader does not change // trigger even though leader does not change
@ -808,7 +808,6 @@ class ReplicaManagerTest {
val topicPartition = 0 val topicPartition = 0
val followerBrokerId = 0 val followerBrokerId = 0
val leaderBrokerId = 1 val leaderBrokerId = 1
val controllerId = 0
val leaderEpoch = 1 val leaderEpoch = 1
val leaderEpochIncrement = 2 val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId) val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
@ -823,11 +822,9 @@ class ReplicaManagerTest {
val partition = replicaManager.createPartition(tp) val partition = replicaManager.createPartition(tp)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
partition.makeLeader( partition.makeLeader(
controllerId,
leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds),
correlationId,
offsetCheckpoints offsetCheckpoints
) )
@ -977,7 +974,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState() Seq(new LeaderAndIsrPartitionState()
@ -1016,7 +1013,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -1064,7 +1061,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -1112,7 +1109,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -1154,7 +1151,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -1193,7 +1190,7 @@ class ReplicaManagerTest {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
@ -1293,7 +1290,7 @@ class ReplicaManagerTest {
val mockBrokerTopicStats = new BrokerTopicStats val mockBrokerTopicStats = new BrokerTopicStats
val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val mockLog = new Log( val mockLog = new Log(
dir = new File(new File(config.logDirs.head), s"$topic-0"), _dir = new File(new File(config.logDirs.head), s"$topic-0"),
config = LogConfig(), config = LogConfig(),
logStartOffset = 0L, logStartOffset = 0L,
recoveryPoint = 0L, recoveryPoint = 0L,

View File

@ -212,10 +212,11 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Package name="org.apache.kafka.jmh.cache.generated"/> <Package name="org.apache.kafka.jmh.cache.generated"/>
<Package name="org.apache.kafka.jmh.common.generated"/> <Package name="org.apache.kafka.jmh.common.generated"/>
<Package name="org.apache.kafka.jmh.record.generated"/> <Package name="org.apache.kafka.jmh.record.generated"/>
<Package name="org.apache.kafka.jmh.producer.generated"/>
<Package name="org.apache.kafka.jmh.partition.generated"/> <Package name="org.apache.kafka.jmh.partition.generated"/>
<Package name="org.apache.kafka.jmh.producer.generated"/>
<Package name="org.apache.kafka.jmh.fetchsession.generated"/> <Package name="org.apache.kafka.jmh.fetchsession.generated"/>
<Package name="org.apache.kafka.jmh.fetcher.generated"/> <Package name="org.apache.kafka.jmh.fetcher.generated"/>
<Package name="org.apache.kafka.jmh.server.generated"/>
</Or> </Or>
</Match> </Match>

View File

@ -159,7 +159,7 @@ public class ReplicaFetcherThreadBenchmark {
0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp), 0, Time.SYSTEM, partitionStateStore, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager); Mockito.mock(MetadataCache.class), logManager);
partition.makeFollower(0, partitionState, 0, offsetCheckpoints); partition.makeFollower(partitionState, offsetCheckpoints);
pool.put(tp, partition); pool.put(tp, partition);
offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0)); offsetAndEpochs.put(tp, new OffsetAndEpoch(0, 0));
BaseRecords fetched = new BaseRecords() { BaseRecords fetched = new BaseRecords() {

View File

@ -121,7 +121,7 @@ public class PartitionMakeFollowerBenchmark {
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, delayedOperations, partitionStateStore, delayedOperations,
Mockito.mock(MetadataCache.class), logManager); Mockito.mock(MetadataCache.class), logManager);
partition.createLogIfNotExists(0, true, false, offsetCheckpoints); partition.createLogIfNotExists(true, false, offsetCheckpoints);
executorService.submit((Runnable) () -> { executorService.submit((Runnable) () -> {
SimpleRecord[] simpleRecords = new SimpleRecord[] { SimpleRecord[] simpleRecords = new SimpleRecord[] {
new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)), new SimpleRecord(1L, "foo".getBytes(StandardCharsets.UTF_8), "1".getBytes(StandardCharsets.UTF_8)),
@ -154,7 +154,7 @@ public class PartitionMakeFollowerBenchmark {
.setZkVersion(1) .setZkVersion(1)
.setReplicas(replicas) .setReplicas(replicas)
.setIsNew(true); .setIsNew(true);
return partition.makeFollower(0, partitionState, 0, offsetCheckpoints); return partition.makeFollower(partitionState, offsetCheckpoints);
} }
private static LogConfig createLogConfig() { private static LogConfig createLogConfig() {

View File

@ -119,7 +119,7 @@ public class UpdateFollowerFetchStateBenchmark {
ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM, ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
partitionStateStore, delayedOperations, partitionStateStore, delayedOperations,
Mockito.mock(MetadataCache.class), logManager); Mockito.mock(MetadataCache.class), logManager);
partition.makeLeader(0, partitionState, 0, offsetCheckpoints); partition.makeLeader(partitionState, offsetCheckpoints);
} }
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.server;
import java.util.Properties;
import kafka.cluster.Partition;
import kafka.cluster.PartitionStateStore;
import kafka.log.CleanerConfig;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.LogDirFailureChannel;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.utils.KafkaScheduler;
import kafka.utils.MockTime;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import scala.Option;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import scala.jdk.CollectionConverters;
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@Fork(3)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
public class HighwatermarkCheckpointBench {
@Param({"100", "1000", "2000"})
public int numTopics;
@Param({"3"})
public int numPartitions;
private final String topicName = "foo";
private Scheduler scheduler;
private Metrics metrics;
private MockTime time;
private KafkaConfig brokerProperties;
private ReplicaManager replicaManager;
private QuotaFactory.QuotaManagers quotaManagers;
private LogDirFailureChannel failureChannel;
private LogManager logManager;
@Setup(Level.Trial)
public void setup() {
this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1));
this.metrics = new Metrics();
this.time = new MockTime();
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
final List<File> files =
CollectionConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
this.logManager = TestUtils.createLogManager(CollectionConverters.asScalaBuffer(files),
LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
1024 * 1024, 32 * 1024 * 1024,
Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
scheduler.startup();
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
final MetadataCache metadataCache =
new MetadataCache(this.brokerProperties.brokerId());
this.quotaManagers =
QuotaFactory.instantiate(this.brokerProperties,
this.metrics,
this.time, "");
KafkaZkClient zkClient = new KafkaZkClient(null, false, Time.SYSTEM) {
@Override
public Properties getEntityConfigs(String rootEntityType, String sanitizedEntityName) {
return new Properties();
}
};
this.replicaManager = new ReplicaManager(
this.brokerProperties,
this.metrics,
this.time,
zkClient,
this.scheduler,
this.logManager,
new AtomicBoolean(false),
this.quotaManagers,
brokerTopicStats,
metadataCache,
this.failureChannel,
Option.empty());
replicaManager.startup();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (int topicNum = 0; topicNum < numTopics; topicNum++) {
final String topicName = this.topicName + "-" + topicNum;
for (int partitionNum = 0; partitionNum < numPartitions; partitionNum++) {
topicPartitions.add(new TopicPartition(topicName, partitionNum));
}
}
PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
for (TopicPartition topicPartition : topicPartitions) {
final Partition partition = this.replicaManager.createPartition(topicPartition);
partition.createLogIfNotExists(true, false, checkpoints);
}
replicaManager.checkpointHighWatermarks();
}
@TearDown(Level.Trial)
public void tearDown() throws Exception {
this.replicaManager.shutdown(false);
this.metrics.close();
this.scheduler.shutdown();
this.quotaManagers.shutdown();
for (File dir : CollectionConverters.asJavaCollection(logManager.liveLogDirs())) {
Utils.delete(dir);
}
}
@Benchmark
@Threads(1)
public void measureCheckpointHighWatermarks() {
this.replicaManager.checkpointHighWatermarks();
}
}