mirror of https://github.com/apache/kafka.git
KAFKA-13858; Kraft should not shutdown metadata listener until controller shutdown is finished (#12187)
When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown finishes and the socket server is stopped. That delay can be as much as 5 seconds and probably even worse. This PR revises the controlled shutdown procedure as follow: * The broker signals to the replica manager that it is about to start the controlled shutdown. * The broker requests a controlled shutdown to the controller. * The controller moves leaders off from the broker, removes the broker from any ISR that it is a member of, and writes those changes to the metadata log. * When the broker receives a partition metadata change, it looks if it is in the ISR. If it is, it updates the partition as usual. If it is not or if there is no leader defined--as would be the case if the broker was the last member of the ISR--it stops the fetcher/replica. This basically stops all the partitions for which the broker was part of their ISR. When the broker is a replica of a partition but it is not in the ISR, the controller does not do anything. The leader epoch is not bumped. In this particular case, the follower will continue to run until the replica manager shuts down. In this time, the replica could become in-sync and the leader could try to bring it back to the ISR. This remaining issue will be addressed separately. Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
46630a0610
commit
76477ffd2d
|
@ -700,13 +700,14 @@ class Partition(val topicPartition: TopicPartition,
|
|||
val leaderEpochEndOffset = followerLog.logEndOffset
|
||||
stateChangeLogger.info(s"Follower $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
|
||||
s"offset $leaderEpochEndOffset with partition epoch ${partitionState.partitionEpoch} and " +
|
||||
s"high watermark ${followerLog.highWatermark}. Previous leader epoch was $leaderEpoch.")
|
||||
s"high watermark ${followerLog.highWatermark}. Current leader is ${partitionState.leader}. " +
|
||||
s"Previous leader epoch was $leaderEpoch.")
|
||||
} else {
|
||||
stateChangeLogger.info(s"Skipped the become-follower state change for $topicPartition with topic id $topicId " +
|
||||
s"and partition state $partitionState since it is already a follower with leader epoch $leaderEpoch.")
|
||||
}
|
||||
|
||||
leaderReplicaIdOpt = Some(partitionState.leader)
|
||||
leaderReplicaIdOpt = Option(partitionState.leader)
|
||||
leaderEpoch = partitionState.leaderEpoch
|
||||
leaderEpochStartOffsetOpt = None
|
||||
partitionEpoch = partitionState.partitionEpoch
|
||||
|
|
|
@ -484,11 +484,9 @@ class BrokerServer(
|
|||
info("shutting down")
|
||||
|
||||
if (config.controlledShutdownEnable) {
|
||||
// Shut down the broker metadata listener, so that we don't get added to any
|
||||
// more ISRs.
|
||||
if (metadataListener != null) {
|
||||
metadataListener.beginShutdown()
|
||||
}
|
||||
if (replicaManager != null)
|
||||
replicaManager.beginControlledShutdown()
|
||||
|
||||
lifecycleManager.beginControlledShutdown()
|
||||
try {
|
||||
lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
|
||||
|
@ -499,6 +497,10 @@ class BrokerServer(
|
|||
error("Got unexpected exception waiting for controlled shutdown future", e)
|
||||
}
|
||||
}
|
||||
|
||||
if (metadataListener != null)
|
||||
metadataListener.beginShutdown()
|
||||
|
||||
lifecycleManager.beginShutdown()
|
||||
|
||||
// Stop socket server to stop accepting any more connections and requests.
|
||||
|
@ -513,7 +515,7 @@ class BrokerServer(
|
|||
if (controlPlaneRequestProcessor != null)
|
||||
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
|
||||
CoreUtils.swallow(authorizer.foreach(_.close()), this)
|
||||
if (metadataListener != null) {
|
||||
if (metadataListener != null) {
|
||||
CoreUtils.swallow(metadataListener.close(), this)
|
||||
}
|
||||
metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this))
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.util.Optional
|
|||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.Lock
|
||||
|
||||
import com.yammer.metrics.core.Meter
|
||||
import kafka.api._
|
||||
import kafka.cluster.{BrokerEndPoint, Partition}
|
||||
|
@ -60,6 +59,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
|
|||
import org.apache.kafka.common.requests._
|
||||
import org.apache.kafka.common.utils.Time
|
||||
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
|
||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
||||
import org.apache.kafka.server.common.MetadataVersion._
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
@ -231,6 +231,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
@volatile private[server] var highWatermarkCheckpoints: Map[String, OffsetCheckpointFile] = logManager.liveLogDirs.map(dir =>
|
||||
(dir.getAbsolutePath, new OffsetCheckpointFile(new File(dir, ReplicaManager.HighWatermarkFilename), logDirFailureChannel))).toMap
|
||||
|
||||
@volatile private var isInControlledShutdown = false
|
||||
|
||||
this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
|
||||
protected val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
|
||||
|
||||
|
@ -1878,6 +1880,10 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
removeMetric("PartitionsWithLateTransactionsCount")
|
||||
}
|
||||
|
||||
def beginControlledShutdown(): Unit = {
|
||||
isInControlledShutdown = true
|
||||
}
|
||||
|
||||
// High watermark do not need to be checkpointed only when under unit tests
|
||||
def shutdown(checkpointHW: Boolean = true): Unit = {
|
||||
info("Shutting down")
|
||||
|
@ -2088,12 +2094,12 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
changedPartitions: mutable.Set[Partition],
|
||||
delta: TopicsDelta,
|
||||
offsetCheckpoints: OffsetCheckpoints,
|
||||
newLocalLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
|
||||
localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
|
||||
): Unit = {
|
||||
stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} partition(s) to " +
|
||||
stateChangeLogger.info(s"Transitioning ${localLeaders.size} partition(s) to " +
|
||||
"local leaders.")
|
||||
replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
|
||||
newLocalLeaders.forKeyValue { (tp, info) =>
|
||||
replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
|
||||
localLeaders.forKeyValue { (tp, info) =>
|
||||
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
|
||||
try {
|
||||
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
|
||||
|
@ -2118,36 +2124,39 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
newImage: MetadataImage,
|
||||
delta: TopicsDelta,
|
||||
offsetCheckpoints: OffsetCheckpoints,
|
||||
newLocalFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
|
||||
localFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
|
||||
): Unit = {
|
||||
stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
|
||||
stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " +
|
||||
"local followers.")
|
||||
val shuttingDown = isShuttingDown.get()
|
||||
val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, Partition]
|
||||
val newFollowerTopicSet = new mutable.HashSet[String]
|
||||
newLocalFollowers.forKeyValue { (tp, info) =>
|
||||
val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
|
||||
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
|
||||
val followerTopicSet = new mutable.HashSet[String]
|
||||
localFollowers.forKeyValue { (tp, info) =>
|
||||
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) =>
|
||||
try {
|
||||
newFollowerTopicSet.add(tp.topic)
|
||||
followerTopicSet.add(tp.topic)
|
||||
|
||||
if (shuttingDown) {
|
||||
stateChangeLogger.trace(s"Unable to start fetching $tp with topic " +
|
||||
s"ID ${info.topicId} because the replica manager is shutting down.")
|
||||
} else {
|
||||
val leader = info.partition.leader
|
||||
if (newImage.cluster.broker(leader) == null) {
|
||||
stateChangeLogger.trace(s"Unable to start fetching $tp with topic ID ${info.topicId} " +
|
||||
s"from leader $leader because it is not alive.")
|
||||
// We always update the follower state.
|
||||
// - This ensure that a replica with no leader can step down;
|
||||
// - This also ensures that the local replica is created even if the leader
|
||||
// is unavailable. This is required to ensure that we include the partition's
|
||||
// high watermark in the checkpoint file (see KAFKA-1647).
|
||||
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
|
||||
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))
|
||||
|
||||
// Create the local replica even if the leader is unavailable. This is required
|
||||
// to ensure that we include the partition's high watermark in the checkpoint
|
||||
// file (see KAFKA-1647).
|
||||
partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
|
||||
} else {
|
||||
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
|
||||
if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
|
||||
partitionsToMakeFollower.put(tp, partition)
|
||||
}
|
||||
if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
|
||||
!info.partition.isr.contains(config.brokerId))) {
|
||||
// During controlled shutdown, replica with no leaders and replica
|
||||
// where this broker is not in the ISR are stopped.
|
||||
partitionsToStopFetching.put(tp, false)
|
||||
} else if (isNewLeaderEpoch) {
|
||||
// Otherwise, fetcher is restarted if the leader epoch has changed.
|
||||
partitionsToStartFetching.put(tp, partition)
|
||||
}
|
||||
}
|
||||
changedPartitions.add(partition)
|
||||
|
@ -2170,33 +2179,47 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
}
|
||||
}
|
||||
|
||||
// Stopping the fetchers must be done first in order to initialize the fetch
|
||||
// position correctly.
|
||||
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
|
||||
stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
|
||||
if (partitionsToStartFetching.nonEmpty) {
|
||||
// Stopping the fetchers must be done first in order to initialize the fetch
|
||||
// position correctly.
|
||||
replicaFetcherManager.removeFetcherForPartitions(partitionsToStartFetching.keySet)
|
||||
stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
|
||||
|
||||
val listenerName = config.interBrokerListenerName.value
|
||||
val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
|
||||
partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
|
||||
val node = partition.leaderReplicaIdOpt
|
||||
.flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
|
||||
.flatMap(_.node(listenerName).asScala)
|
||||
.getOrElse(Node.noNode)
|
||||
val log = partition.localLogOrException
|
||||
partitionAndOffsets.put(topicPartition, InitialFetchState(
|
||||
log.topicId,
|
||||
new BrokerEndPoint(node.id, node.host, node.port),
|
||||
partition.getLeaderEpoch,
|
||||
initialFetchOffset(log)
|
||||
))
|
||||
val listenerName = config.interBrokerListenerName.value
|
||||
val partitionAndOffsets = new mutable.HashMap[TopicPartition, InitialFetchState]
|
||||
|
||||
partitionsToStartFetching.forKeyValue { (topicPartition, partition) =>
|
||||
val nodeOpt = partition.leaderReplicaIdOpt
|
||||
.flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
|
||||
.flatMap(_.node(listenerName).asScala)
|
||||
|
||||
nodeOpt match {
|
||||
case Some(node) =>
|
||||
val log = partition.localLogOrException
|
||||
partitionAndOffsets.put(topicPartition, InitialFetchState(
|
||||
log.topicId,
|
||||
new BrokerEndPoint(node.id, node.host, node.port),
|
||||
partition.getLeaderEpoch,
|
||||
initialFetchOffset(log)
|
||||
))
|
||||
case None =>
|
||||
stateChangeLogger.trace(s"Unable to start fetching $topicPartition with topic ID ${partition.topicId} " +
|
||||
s"from leader ${partition.leaderReplicaIdOpt} because it is not alive.")
|
||||
}
|
||||
}
|
||||
|
||||
replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
|
||||
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToStartFetching.size} partitions")
|
||||
|
||||
partitionsToStartFetching.keySet.foreach(completeDelayedFetchOrProduceRequests)
|
||||
|
||||
updateLeaderAndFollowerMetrics(followerTopicSet)
|
||||
}
|
||||
|
||||
replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
|
||||
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
|
||||
|
||||
partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
|
||||
|
||||
updateLeaderAndFollowerMetrics(newFollowerTopicSet)
|
||||
if (partitionsToStopFetching.nonEmpty) {
|
||||
stopPartitions(partitionsToStopFetching)
|
||||
stateChangeLogger.info(s"Stopped fetchers as part of controlled shutdown for ${partitionsToStopFetching.size} partitions")
|
||||
}
|
||||
}
|
||||
|
||||
def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
|
|||
import org.apache.kafka.common.utils.{Time, Utils}
|
||||
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
|
||||
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, ProducerIdsImage, TopicsDelta, TopicsImage}
|
||||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
|
||||
import org.apache.kafka.metadata.LeaderRecoveryState
|
||||
import org.apache.kafka.raft.{OffsetAndEpoch => RaftOffsetAndEpoch}
|
||||
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
|
||||
|
@ -64,7 +65,7 @@ import org.mockito.invocation.InvocationOnMock
|
|||
import org.mockito.stubbing.Answer
|
||||
import org.mockito.ArgumentMatchers
|
||||
import org.mockito.ArgumentMatchers.{any, anyInt}
|
||||
import org.mockito.Mockito.{mock, reset, times, verify, when}
|
||||
import org.mockito.Mockito.{mock, never, reset, times, verify, when}
|
||||
|
||||
import scala.collection.{Map, Seq, mutable}
|
||||
import scala.jdk.CollectionConverters._
|
||||
|
@ -3826,8 +3827,8 @@ class ReplicaManagerTest {
|
|||
assertEquals(1, followerPartition.getPartitionEpoch)
|
||||
|
||||
// Verify that partition's fetcher was not impacted.
|
||||
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set.empty)
|
||||
verify(mockReplicaFetcherManager).addFetcherForPartitions(Map.empty)
|
||||
verify(mockReplicaFetcherManager, never()).removeFetcherForPartitions(any())
|
||||
verify(mockReplicaFetcherManager, never()).addFetcherForPartitions(any())
|
||||
|
||||
reset(mockReplicaFetcherManager)
|
||||
|
||||
|
@ -3840,7 +3841,7 @@ class ReplicaManagerTest {
|
|||
.setIsr(util.Arrays.asList(localId, localId + 1, localId + 2))
|
||||
.setLeader(localId + 2)
|
||||
)
|
||||
println(followerTopicsDelta.changedTopic(FOO_UUID).partitionChanges())
|
||||
|
||||
followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
|
||||
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
|
||||
|
||||
|
@ -3863,6 +3864,136 @@ class ReplicaManagerTest {
|
|||
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
|
||||
}
|
||||
|
||||
@Test
|
||||
def testReplicasAreStoppedWhileInControlledShutdownWithKRaft(): Unit = {
|
||||
val localId = 0
|
||||
val foo0 = new TopicPartition("foo", 0)
|
||||
val foo1 = new TopicPartition("foo", 1)
|
||||
val foo2 = new TopicPartition("foo", 2)
|
||||
|
||||
val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
|
||||
val replicaManager = setupReplicaManagerWithMockedPurgatories(
|
||||
timer = new MockTimer(time),
|
||||
brokerId = localId,
|
||||
mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
|
||||
)
|
||||
|
||||
try {
|
||||
when(mockReplicaFetcherManager.removeFetcherForPartitions(
|
||||
Set(foo0, foo1))
|
||||
).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
|
||||
|
||||
var topicsDelta = new TopicsDelta(TopicsImage.EMPTY)
|
||||
topicsDelta.replay(new TopicRecord()
|
||||
.setName("foo")
|
||||
.setTopicId(FOO_UUID)
|
||||
)
|
||||
|
||||
// foo0 is a follower in the ISR.
|
||||
topicsDelta.replay(new PartitionRecord()
|
||||
.setPartitionId(0)
|
||||
.setTopicId(FOO_UUID)
|
||||
.setReplicas(util.Arrays.asList(localId, localId + 1))
|
||||
.setIsr(util.Arrays.asList(localId, localId + 1))
|
||||
.setLeader(localId + 1)
|
||||
.setLeaderEpoch(0)
|
||||
.setPartitionEpoch(0)
|
||||
)
|
||||
|
||||
// foo1 is a leader with only himself in the ISR.
|
||||
topicsDelta.replay(new PartitionRecord()
|
||||
.setPartitionId(1)
|
||||
.setTopicId(FOO_UUID)
|
||||
.setReplicas(util.Arrays.asList(localId, localId + 1))
|
||||
.setIsr(util.Arrays.asList(localId))
|
||||
.setLeader(localId)
|
||||
.setLeaderEpoch(0)
|
||||
.setPartitionEpoch(0)
|
||||
)
|
||||
|
||||
// foo2 is a follower NOT in the ISR.
|
||||
topicsDelta.replay(new PartitionRecord()
|
||||
.setPartitionId(2)
|
||||
.setTopicId(FOO_UUID)
|
||||
.setReplicas(util.Arrays.asList(localId, localId + 1))
|
||||
.setIsr(util.Arrays.asList(localId + 1))
|
||||
.setLeader(localId + 1)
|
||||
.setLeaderEpoch(0)
|
||||
.setPartitionEpoch(0)
|
||||
)
|
||||
|
||||
// Apply the delta.
|
||||
var metadataImage = imageFromTopics(topicsDelta.apply())
|
||||
replicaManager.applyDelta(topicsDelta, metadataImage)
|
||||
|
||||
// Check the state of the partitions.
|
||||
val HostedPartition.Online(fooPartition0) = replicaManager.getPartition(foo0)
|
||||
assertFalse(fooPartition0.isLeader)
|
||||
assertEquals(0, fooPartition0.getLeaderEpoch)
|
||||
assertEquals(0, fooPartition0.getPartitionEpoch)
|
||||
|
||||
val HostedPartition.Online(fooPartition1) = replicaManager.getPartition(foo1)
|
||||
assertTrue(fooPartition1.isLeader)
|
||||
assertEquals(0, fooPartition1.getLeaderEpoch)
|
||||
assertEquals(0, fooPartition1.getPartitionEpoch)
|
||||
|
||||
val HostedPartition.Online(fooPartition2) = replicaManager.getPartition(foo2)
|
||||
assertFalse(fooPartition2.isLeader)
|
||||
assertEquals(0, fooPartition2.getLeaderEpoch)
|
||||
assertEquals(0, fooPartition2.getPartitionEpoch)
|
||||
|
||||
reset(mockReplicaFetcherManager)
|
||||
|
||||
// The replica begins the controlled shutdown.
|
||||
replicaManager.beginControlledShutdown()
|
||||
|
||||
// When the controller receives the controlled shutdown
|
||||
// request, it does the following:
|
||||
// - Shrinks the ISR of foo0 to remove this replica.
|
||||
// - Sets the leader of foo1 to NO_LEADER because it cannot elect another leader.
|
||||
// - Does nothing for foo2 because this replica is not in the ISR.
|
||||
topicsDelta = new TopicsDelta(metadataImage.topics())
|
||||
topicsDelta.replay(new PartitionChangeRecord()
|
||||
.setPartitionId(0)
|
||||
.setTopicId(FOO_UUID)
|
||||
.setReplicas(util.Arrays.asList(localId, localId + 1))
|
||||
.setIsr(util.Arrays.asList(localId + 1))
|
||||
.setLeader(localId + 1)
|
||||
)
|
||||
topicsDelta.replay(new PartitionChangeRecord()
|
||||
.setPartitionId(1)
|
||||
.setTopicId(FOO_UUID)
|
||||
.setReplicas(util.Arrays.asList(localId, localId + 1))
|
||||
.setIsr(util.Arrays.asList(localId))
|
||||
.setLeader(NO_LEADER)
|
||||
)
|
||||
metadataImage = imageFromTopics(topicsDelta.apply())
|
||||
replicaManager.applyDelta(topicsDelta, metadataImage)
|
||||
|
||||
// Partition foo0 and foo1 are updated.
|
||||
assertFalse(fooPartition0.isLeader)
|
||||
assertEquals(1, fooPartition0.getLeaderEpoch)
|
||||
assertEquals(1, fooPartition0.getPartitionEpoch)
|
||||
assertFalse(fooPartition1.isLeader)
|
||||
assertEquals(1, fooPartition1.getLeaderEpoch)
|
||||
assertEquals(1, fooPartition1.getPartitionEpoch)
|
||||
|
||||
// Partition foo2 is not.
|
||||
assertFalse(fooPartition2.isLeader)
|
||||
assertEquals(0, fooPartition2.getLeaderEpoch)
|
||||
assertEquals(0, fooPartition2.getPartitionEpoch)
|
||||
|
||||
// Fetcher for foo0 and foo1 are stopped.
|
||||
verify(mockReplicaFetcherManager).removeFetcherForPartitions(Set(foo0, foo1))
|
||||
} finally {
|
||||
// Fetcher for foo2 is stopped when the replica manager shuts down
|
||||
// because this replica was not in the ISR.
|
||||
replicaManager.shutdown()
|
||||
}
|
||||
|
||||
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
|
||||
}
|
||||
|
||||
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
|
||||
val leader = if (isStartIdLeader) startId else startId + 1
|
||||
val delta = new TopicsDelta(TopicsImage.EMPTY)
|
||||
|
|
Loading…
Reference in New Issue