diff --git a/build.gradle b/build.gradle
index 083cbdfc848..dc7ce681d4d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1418,9 +1418,11 @@ project(':jmh-benchmarks') {
}
dependencies {
+ compile project(':core')
compile project(':clients')
compile project(':streams')
compile libs.jmhCore
+ compile libs.mockitoCore
annotationProcessor libs.jmhGeneratorAnnProcess
compile libs.jmhCoreBenchmarks
}
@@ -1431,6 +1433,9 @@ project(':jmh-benchmarks') {
}
}
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-jmh-benchmarks.xml")
+ }
task jmh(type: JavaExec, dependsOn: [':jmh-benchmarks:clean', ':jmh-benchmarks:shadowJar']) {
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
new file mode 100644
index 00000000000..49bd8c78692
--- /dev/null
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -0,0 +1,45 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 03395a4e4e5..655bdeae008 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -404,8 +404,9 @@ class Partition(val topicPartition: TopicPartition,
this.log = Some(log)
}
- def remoteReplicas: Set[Replica] =
- remoteReplicasMap.values.toSet
+ // remoteReplicas will be called in the hot path, and must be inexpensive
+ def remoteReplicas: Iterable[Replica] =
+ remoteReplicasMap.values
def futureReplicaDirChanged(newDestinationDir: String): Boolean = {
inReadLock(leaderIsrUpdateLock) {
@@ -585,31 +586,41 @@ class Partition(val topicPartition: TopicPartition,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long): Boolean = {
-
getReplica(followerId) match {
case Some(followerReplica) =>
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
+ val prevFollowerEndOffset = followerReplica.logEndOffset
followerReplica.updateFetchState(
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset)
+
val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW
+
// check if we need to expand ISR to include this replica
// if it is not in the ISR yet
- val followerFetchOffset = followerFetchOffsetMetadata.messageOffset
- val leaderHWIncremented = maybeExpandIsr(followerReplica, followerFetchTimeMs)
+ if (!inSyncReplicaIds(followerId))
+ maybeExpandIsr(followerReplica, followerFetchTimeMs)
+
+ // check if the HW of the partition can now be incremented
+ // since the replica may already be in the ISR and its LEO has just incremented
+ val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) {
+ leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
+ } else {
+ false
+ }
// some delayed operations may be unblocked after HW or LW changed
if (leaderLWIncremented || leaderHWIncremented)
tryCompleteDelayedRequests()
debug(s"Recorded replica $followerId log end offset (LEO) position " +
- s"$followerFetchOffset and log start offset $followerStartOffset.")
+ s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
true
case None =>
@@ -654,27 +665,20 @@ class Partition(val topicPartition: TopicPartition,
* whether a replica is in-sync, we only check HW.
*
* This function can be triggered when a replica's LEO has incremented.
- *
- * @return true if the high watermark has been updated
*/
- private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Boolean = {
+ private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Unit = {
inWriteLock(leaderIsrUpdateLock) {
// check if this replica needs to be added to the ISR
- leaderLogIfLocal match {
- case Some(leaderLog) =>
- val leaderHighwatermark = leaderLog.highWatermark
- if (!inSyncReplicaIds.contains(followerReplica.brokerId) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
- val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId
- info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} " +
- s"to ${newInSyncReplicaIds.mkString(",")}")
+ leaderLogIfLocal.foreach { leaderLog =>
+ val leaderHighwatermark = leaderLog.highWatermark
+ if (!inSyncReplicaIds.contains(followerReplica.brokerId) && isFollowerInSync(followerReplica, leaderHighwatermark)) {
+ val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerId
+ info(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} " +
+ s"to ${newInSyncReplicaIds.mkString(",")}")
- // update ISR in ZK and cache
- expandIsr(newInSyncReplicaIds)
- }
- // check if the HW of the partition can now be incremented
- // since the replica may already be in the ISR and its LEO has just incremented
- maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs)
- case None => false // nothing to do if no longer leader
+ // update ISR in ZK and cache
+ expandIsr(newInSyncReplicaIds)
+ }
}
}
}
@@ -749,25 +753,35 @@ class Partition(val topicPartition: TopicPartition,
* since all callers of this private API acquire that lock
*/
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
- val replicaLogEndOffsets = remoteReplicas.filter { replica =>
- curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId)
- }.map(_.logEndOffsetMetadata)
- val newHighWatermark = (replicaLogEndOffsets + leaderLog.logEndOffsetMetadata).min(new LogOffsetMetadata.OffsetOrdering)
- leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
- case Some(oldHighWatermark) =>
- debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
- true
-
- case None =>
- def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
- case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
+ inReadLock(leaderIsrUpdateLock) {
+ // maybeIncrementLeaderHW is in the hot path, the following code is written to
+ // avoid unnecessary collection generation
+ var newHighWatermark = leaderLog.logEndOffsetMetadata
+ remoteReplicasMap.values.foreach { replica =>
+ if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
+ (curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId))) {
+ newHighWatermark = replica.logEndOffsetMetadata
}
+ }
- val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata))
- val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
- trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " +
- s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
- false
+ leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {
+ case Some(oldHighWatermark) =>
+ debug(s"High watermark updated from $oldHighWatermark to $newHighWatermark")
+ true
+
+ case None =>
+ def logEndOffsetString: ((Int, LogOffsetMetadata)) => String = {
+ case (brokerId, logEndOffsetMetadata) => s"replica $brokerId: $logEndOffsetMetadata"
+ }
+
+ if (isTraceEnabled) {
+ val replicaInfo = remoteReplicas.map(replica => (replica.brokerId, replica.logEndOffsetMetadata)).toSet
+ val localLogInfo = (localBrokerId, localLogOrException.logEndOffsetMetadata)
+ trace(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old value. " +
+ s"All current LEOs are ${(replicaInfo + localLogInfo).map(logEndOffsetString)}")
+ }
+ false
+ }
}
}
@@ -779,15 +793,21 @@ class Partition(val topicPartition: TopicPartition,
def lowWatermarkIfLeader: Long = {
if (!isLeader)
throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
- val logStartOffsets = remoteReplicas.collect {
- case replica if metadataCache.getAliveBroker(replica.brokerId).nonEmpty => replica.logStartOffset
- } + localLogOrException.logStartOffset
+
+ // lowWatermarkIfLeader may be called many times when a DeleteRecordsRequest is outstanding,
+ // care has been taken to avoid generating unnecessary collections in this code
+ var lowWaterMark = localLogOrException.logStartOffset
+ remoteReplicas.foreach { replica =>
+ if (metadataCache.getAliveBroker(replica.brokerId).nonEmpty && replica.logStartOffset < lowWaterMark) {
+ lowWaterMark = replica.logStartOffset
+ }
+ }
futureLog match {
case Some(partitionFutureLog) =>
- CoreUtils.min(logStartOffsets + partitionFutureLog.logStartOffset, 0L)
+ Math.min(lowWaterMark, partitionFutureLog.logStartOffset)
case None =>
- CoreUtils.min(logStartOffsets, 0L)
+ lowWaterMark
}
}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 370074929ca..48fc9e9a7d5 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -341,16 +341,18 @@ class Log(@volatile var dir: File,
throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
s"log end offset $logEndOffsetMetadata")
- val oldHighWatermark = fetchHighWatermarkMetadata
+ lock.synchronized {
+ val oldHighWatermark = fetchHighWatermarkMetadata
- // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
- // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
- if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
- (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
- updateHighWatermarkMetadata(newHighWatermark)
- Some(oldHighWatermark)
- } else {
- None
+ // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
+ // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
+ if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+ (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
+ updateHighWatermarkMetadata(newHighWatermark)
+ Some(oldHighWatermark)
+ } else {
+ None
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 12cb507d350..9579b1f56ec 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1085,8 +1085,8 @@ class ReplicaManager(val config: KafkaConfig,
.map(replica => new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replica.logEndOffset,
- currentTimeMs - replica.lastCaughtUpTimeMs
- ))
+ currentTimeMs - replica.lastCaughtUpTimeMs))
+ .toSet
if (partition.leaderReplicaIdOpt.isDefined) {
val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e843d9f3118..7854a9beb24 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -156,6 +156,7 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
+
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
new file mode 100644
index 00000000000..ee45b6419fa
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.partition;
+
+import kafka.api.ApiVersion$;
+import kafka.cluster.DelayedOperations;
+import kafka.cluster.Partition;
+import kafka.cluster.PartitionStateStore;
+import kafka.log.CleanerConfig;
+import kafka.log.Defaults;
+import kafka.log.LogConfig;
+import kafka.log.LogManager;
+import kafka.server.BrokerState;
+import kafka.server.BrokerTopicStats;
+import kafka.server.LogDirFailureChannel;
+import kafka.server.LogOffsetMetadata;
+import kafka.server.MetadataCache;
+import kafka.server.checkpoints.OffsetCheckpoints;
+import kafka.utils.KafkaScheduler;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.LeaderAndIsrRequest;
+import org.apache.kafka.common.utils.Time;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class UpdateFollowerFetchStateBenchmark {
+ private TopicPartition topicPartition = new TopicPartition(UUID.randomUUID().toString(), 0);
+ private File logDir = new File(System.getProperty("java.io.tmpdir"), topicPartition.toString());
+ private KafkaScheduler scheduler = new KafkaScheduler(1, "scheduler", true);
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
+ private long nextOffset = 0;
+ private LogManager logManager;
+ private Partition partition;
+
+ @Setup(Level.Trial)
+ public void setUp() {
+ scheduler.startup();
+ LogConfig logConfig = createLogConfig();
+ List logDirs = Collections.singletonList(logDir);
+ logManager = new LogManager(JavaConverters.asScalaIteratorConverter(logDirs.iterator()).asScala().toSeq(),
+ JavaConverters.asScalaIteratorConverter(new ArrayList().iterator()).asScala().toSeq(),
+ new scala.collection.mutable.HashMap<>(),
+ logConfig,
+ new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false, "MD5"),
+ 1,
+ 1000L,
+ 10000L,
+ 10000L,
+ 1000L,
+ 60000,
+ scheduler,
+ new BrokerState(),
+ brokerTopicStats,
+ logDirFailureChannel,
+ Time.SYSTEM);
+ OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
+ Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), topicPartition)).thenReturn(Option.apply(0L));
+ DelayedOperations delayedOperations = new DelayedOperationsMock();
+
+ // one leader, plus two followers
+ List replicas = new ArrayList<>();
+ replicas.add(0);
+ replicas.add(1);
+ replicas.add(2);
+ LeaderAndIsrRequest.PartitionState partitionState = new LeaderAndIsrRequest.PartitionState(
+ 0, 0, 0, replicas, 1, replicas, true);
+ PartitionStateStore partitionStateStore = Mockito.mock(PartitionStateStore.class);
+ Mockito.when(partitionStateStore.fetchTopicConfig()).thenReturn(new Properties());
+ partition = new Partition(topicPartition, 100,
+ ApiVersion$.MODULE$.latestVersion(), 0, Time.SYSTEM,
+ partitionStateStore, delayedOperations,
+ Mockito.mock(MetadataCache.class), logManager);
+ partition.makeLeader(0, partitionState, 0, offsetCheckpoints);
+ }
+
+ // avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
+ private class DelayedOperationsMock extends DelayedOperations {
+ DelayedOperationsMock() {
+ super(topicPartition, null, null, null);
+ }
+
+ @Override
+ public int numDelayedDelete() {
+ return 0;
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ logManager.shutdown();
+ scheduler.shutdown();
+ }
+
+ private LogConfig createLogConfig() {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.SegmentMsProp(), Defaults.SegmentMs());
+ logProps.put(LogConfig.SegmentBytesProp(), Defaults.SegmentSize());
+ logProps.put(LogConfig.RetentionMsProp(), Defaults.RetentionMs());
+ logProps.put(LogConfig.RetentionBytesProp(), Defaults.RetentionSize());
+ logProps.put(LogConfig.SegmentJitterMsProp(), Defaults.SegmentJitterMs());
+ logProps.put(LogConfig.CleanupPolicyProp(), Defaults.CleanupPolicy());
+ logProps.put(LogConfig.MaxMessageBytesProp(), Defaults.MaxMessageSize());
+ logProps.put(LogConfig.IndexIntervalBytesProp(), Defaults.IndexInterval());
+ logProps.put(LogConfig.SegmentIndexBytesProp(), Defaults.MaxIndexSize());
+ logProps.put(LogConfig.MessageFormatVersionProp(), Defaults.MessageFormatVersion());
+ logProps.put(LogConfig.FileDeleteDelayMsProp(), Defaults.FileDeleteDelayMs());
+ return LogConfig.apply(logProps, new scala.collection.immutable.HashSet<>());
+ }
+
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.NANOSECONDS)
+ public void updateFollowerFetchStateBench() {
+ // measure the impact of two follower fetches on the leader
+ partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
+ 0, 1, nextOffset);
+ partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
+ 0, 1, nextOffset);
+ nextOffset++;
+ }
+
+ @Benchmark
+ @OutputTimeUnit(TimeUnit.NANOSECONDS)
+ public void updateFollowerFetchStateBenchNoChange() {
+ // measure the impact of two follower fetches on the leader when the follower didn't
+ // end up fetching anything
+ partition.updateFollowerFetchState(1, new LogOffsetMetadata(nextOffset, nextOffset, 0),
+ 0, 1, 100);
+ partition.updateFollowerFetchState(2, new LogOffsetMetadata(nextOffset, nextOffset, 0),
+ 0, 1, 100);
+ }
+}