diff --git a/config/server.properties b/config/server.properties index 46208b1523f..b1cf5c45416 100644 --- a/config/server.properties +++ b/config/server.properties @@ -123,7 +123,7 @@ log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=6000 +zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2089e5c74a7..85afe1102cd 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -176,7 +176,7 @@ case class SimpleAssignmentState(replicas: Seq[Int]) extends AssignmentState * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR */ class Partition(val topicPartition: TopicPartition, - replicaLagTimeMaxMs: Long, + val replicaLagTimeMaxMs: Long, interBrokerProtocolVersion: ApiVersion, localBrokerId: Int, time: Time, @@ -864,11 +864,11 @@ class Partition(val topicPartition: TopicPartition, */ private def tryCompleteDelayedRequests(): Unit = delayedOperations.checkAndCompleteAll() - def maybeShrinkIsr(replicaMaxLagTimeMs: Long): Unit = { + def maybeShrinkIsr(): Unit = { val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => - val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaMaxLagTimeMs) + val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs) if (outOfSyncReplicaIds.nonEmpty) { val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds assert(newInSyncReplicaIds.nonEmpty) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 17ebd278f19..4e347b22e2f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -46,7 +46,7 @@ import scala.collection.{Map, Seq} object Defaults { /** ********* Zookeeper Configuration ***********/ - val ZkSessionTimeoutMs = 6000 + val ZkSessionTimeoutMs = 18000 val ZkSyncTimeMs = 2000 val ZkEnableSecureAcls = false val ZkMaxInFlightRequests = 10 @@ -128,7 +128,7 @@ object Defaults { val ControllerSocketTimeoutMs = RequestTimeoutMs val ControllerMessageQueueSize = Int.MaxValue val DefaultReplicationFactor = 1 - val ReplicaLagTimeMaxMs = 10000L + val ReplicaLagTimeMaxMs = 30000L val ReplicaSocketTimeoutMs = 30 * 1000 val ReplicaSocketReceiveBufferBytes = 64 * 1024 val ReplicaFetchMaxBytes = 1024 * 1024 diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4e7ceda75b5..d428773fcf8 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1550,7 +1550,7 @@ class ReplicaManager(val config: KafkaConfig, // Shrink ISRs for non offline partitions allPartitions.keys.foreach { topicPartition => - nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr()) } } diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index f57dd859cfa..4c527d2538c 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -946,7 +946,7 @@ class PartitionTest extends AbstractPartitionTest { // Invoke some operation that acquires leaderIsrUpdate write lock on one thread executor.submit(CoreUtils.runnable { while (!done.get) { - partitions.foreach(_.maybeShrinkIsr(10000)) + partitions.foreach(_.maybeShrinkIsr()) } }) // Append records to partitions, one partition-per-thread @@ -1240,11 +1240,11 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(Log.UnknownOffset, remoteReplica.logStartOffset) // On initialization, the replica is considered caught up and should not be removed - partition.maybeShrinkIsr(10000) + partition.maybeShrinkIsr() assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) // If enough time passes without a fetch update, the ISR should shrink - time.sleep(10001) + time.sleep(partition.replicaLagTimeMaxMs + 1) val updatedLeaderAndIsr = LeaderAndIsr( leader = brokerId, leaderEpoch = leaderEpoch, @@ -1252,7 +1252,7 @@ class PartitionTest extends AbstractPartitionTest { zkVersion = 1) when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(Some(2)) - partition.maybeShrinkIsr(10000) + partition.maybeShrinkIsr() assertEquals(Set(brokerId), partition.inSyncReplicaIds) assertEquals(10L, partition.localLogOrException.highWatermark) } @@ -1325,7 +1325,7 @@ class PartitionTest extends AbstractPartitionTest { // The ISR should not be shrunk because the follower has caught up with the leader at the // time of the first fetch. - partition.maybeShrinkIsr(10000) + partition.maybeShrinkIsr() assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) } @@ -1383,7 +1383,7 @@ class PartitionTest extends AbstractPartitionTest { time.sleep(10001) // The ISR should not be shrunk because the follower is caught up to the leader's log end - partition.maybeShrinkIsr(10000) + partition.maybeShrinkIsr() assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) } @@ -1434,7 +1434,7 @@ class PartitionTest extends AbstractPartitionTest { zkVersion = 1) when(stateStore.shrinkIsr(controllerEpoch, updatedLeaderAndIsr)).thenReturn(None) - partition.maybeShrinkIsr(10000) + partition.maybeShrinkIsr() assertEquals(Set(brokerId, remoteBrokerId), partition.inSyncReplicaIds) assertEquals(0L, partition.localLogOrException.highWatermark) } diff --git a/docs/upgrade.html b/docs/upgrade.html index 8bc0fb92215..a3920ce0e7f 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,13 @@