KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode (#12487)

This patch prevents offline or in-controller-shutdown replicas from being added back to the ISR and therefore to become leaders in ZK mode. This is an extra line of defense to ensure that it never happens. This is a continuation of the work done in KIP-841.

Reviewers: David Mao <dmao@confluent.io>, Jason Gustafson <jason@confluent.io>, Jun Rao <jun@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Justine Olshan 2022-08-10 01:25:35 -07:00 committed by GitHub
parent ac64693434
commit 163d00b3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 40 deletions

View File

@ -26,7 +26,7 @@ import kafka.log._
import kafka.metrics.KafkaMetricsGroup
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
import kafka.utils._
import kafka.zookeeper.ZooKeeperClientException
@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
metadataCache match {
// In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
// allowed to join the ISR. This does not apply to ZK mode.
// allowed to join the ISR.
case kRaftMetadataCache: KRaftMetadataCache =>
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
// the controller will block them from being added to ISR.
case zkMetadataCache: ZkMetadataCache =>
zkMetadataCache.hasAliveBroker(followerReplicaId)
case _ => true
}
}

View File

@ -2364,7 +2364,23 @@ class KafkaController(val config: KafkaConfig,
)
None
} else {
Some(tp -> newLeaderAndIsr)
// Pull out replicas being added to ISR and verify they are all online.
// If a replica is not online, reject the update as specified in KIP-841.
val ineligibleReplicas = newLeaderAndIsr.isr.toSet -- controllerContext.liveBrokerIds
if (ineligibleReplicas.nonEmpty) {
info(s"Rejecting AlterPartition request from node $brokerId for $tp because " +
s"it specified ineligible replicas $ineligibleReplicas in the new ISR ${newLeaderAndIsr.isr}."
)
if (alterPartitionRequestVersion > 1) {
partitionResponses(tp) = Left(Errors.INELIGIBLE_REPLICA)
} else {
partitionResponses(tp) = Left(Errors.OPERATION_NOT_ATTEMPTED)
}
None
} else {
Some(tp -> newLeaderAndIsr)
}
}
case None =>

View File

@ -46,7 +46,7 @@ import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{CountDownLatch, Semaphore}
import kafka.server.epoch.LeaderEpochFileCache
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
@ -55,6 +55,8 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(alterPartitionListener.failures.get, 1)
}
@Test
def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testIsrNotExpandedIfReplicaIsFencedOrShutdown(quorum: String): Unit = {
val kraft = quorum == "kraft"
val log = logManager.getOrCreateLog(topicPartition, topicId = None)
seedLogData(log, numRecords = 10, leaderEpoch = 4)
@ -1386,7 +1391,19 @@ class PartitionTest extends AbstractPartitionTest {
val replicas = List(brokerId, remoteBrokerId)
val isr = Set(brokerId)
val metadataCache = mock(classOf[KRaftMetadataCache])
val metadataCache: MetadataCache = if (kraft) mock(classOf[KRaftMetadataCache]) else mock(classOf[ZkMetadataCache])
// Mark the remote broker as eligible or ineligible in the metadata cache of the leader.
// When using kraft, we can make the broker ineligible by fencing it.
// In ZK mode, we must mark the broker as alive for it to be eligible.
def markRemoteReplicaEligible(eligible: Boolean): Unit = {
if (kraft) {
when(metadataCache.asInstanceOf[KRaftMetadataCache].isBrokerFenced(remoteBrokerId)).thenReturn(!eligible)
} else {
when(metadataCache.hasAliveBroker(remoteBrokerId)).thenReturn(eligible)
}
}
val partition = new Partition(
topicPartition,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
@ -1414,6 +1431,8 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(isr, partition.partitionState.isr)
assertEquals(isr, partition.partitionState.maximalIsr)
markRemoteReplicaEligible(true)
// Fetch to let the follower catch up to the log end offset and
// to check if an expansion is possible.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
@ -1430,7 +1449,7 @@ class PartitionTest extends AbstractPartitionTest {
assertEquals(replicas.toSet, partition.partitionState.maximalIsr)
assertEquals(1, alterPartitionManager.isrUpdates.size)
// Controller rejects the expansion because the broker is fenced.
// Controller rejects the expansion because the broker is fenced or offline.
alterPartitionManager.failIsrUpdate(Errors.INELIGIBLE_REPLICA)
// The leader reverts back to the previous ISR.
@ -1439,8 +1458,8 @@ class PartitionTest extends AbstractPartitionTest {
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The leader eventually learns about the fenced broker.
when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(true)
// The leader eventually learns about the fenced or offline broker.
markRemoteReplicaEligible(false)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)
@ -1451,8 +1470,8 @@ class PartitionTest extends AbstractPartitionTest {
assertFalse(partition.partitionState.isInflight)
assertEquals(0, alterPartitionManager.isrUpdates.size)
// The broker is eventually unfenced.
when(metadataCache.isBrokerFenced(remoteBrokerId)).thenReturn(false)
// The broker is eventually unfenced or brought back online.
markRemoteReplicaEligible(true)
// The follower fetches again.
fetchFollower(partition, replicaId = remoteBrokerId, fetchOffset = log.logEndOffset)

View File

@ -26,11 +26,11 @@ import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
import kafka.utils.{LogCaptureAppender, TestUtils}
import kafka.zk.{FeatureZNodeStatus, _}
import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.message.AlterPartitionRequestData
import org.apache.kafka.common.message.AlterPartitionResponseData
import org.apache.kafka.common.message.{AlterPartitionRequestData, AlterPartitionResponseData}
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.{ElectionType, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.MetadataVersion
@ -40,8 +40,7 @@ import org.apache.log4j.Level
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.MethodSource
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.mockito.Mockito.{doAnswer, spy, verify}
import org.mockito.invocation.InvocationOnMock
@ -904,12 +903,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
).asJava)
).asJava)
val future = new CompletableFuture[AlterPartitionResponseData]()
controller.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
alterPartitionVersion,
future.complete
))
val future = alterPartitionFuture(alterPartitionRequest, alterPartitionVersion)
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
@ -968,12 +962,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
).asJava)
).asJava)
val future = new CompletableFuture[AlterPartitionResponseData]()
controller.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
ApiKeys.ALTER_PARTITION.latestVersion,
future.complete
))
val future = alterPartitionFuture(alterPartitionRequest, ApiKeys.ALTER_PARTITION.latestVersion)
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(new AlterPartitionResponseData.TopicData()
@ -1024,12 +1013,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
).asJava)
).asJava)
val future = new CompletableFuture[AlterPartitionResponseData]()
controller.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
future.complete
))
val future = alterPartitionFuture(alterPartitionRequest, AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION)
// When re-sending an ISR update, we should not get and error or any ISR changes
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
@ -1056,6 +1040,73 @@ class ControllerIntegrationTest extends QuorumTestHarness {
sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
servers = makeServers(2)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val otherBroker = servers.find(_.config.brokerId != controllerId).get
val brokerId = otherBroker.config.brokerId
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
val fullIsr = List(controllerId, brokerId)
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
// Shut down follower.
servers(brokerId).shutdown()
servers(brokerId).awaitShutdown()
val controller = getController().kafkaController
val leaderIsrAndControllerEpochMap = controller.controllerContext.partitionsLeadershipInfo
val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
val topicId = controller.controllerContext.topicIds(tp.topic)
val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
// We expect only the controller (online broker) to be in ISR
assertEquals(List(controllerId), leaderAndIsr.isr)
val requestTopic = new AlterPartitionRequestData.TopicData()
.setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
.setPartitionIndex(tp.partition)
.setLeaderEpoch(leaderAndIsr.leaderEpoch)
.setPartitionEpoch(leaderAndIsr.partitionEpoch)
.setNewIsr(fullIsr.map(Int.box).asJava)
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else requestTopic.setTopicName(tp.topic)
// Try to update ISR to contain the offline broker.
val alterPartitionRequest = new AlterPartitionRequestData()
.setBrokerId(controllerId)
.setBrokerEpoch(controllerEpoch)
.setTopics(Seq(requestTopic).asJava)
val future = alterPartitionFuture(alterPartitionRequest, alterPartitionVersion)
val expectedError = if (alterPartitionVersion > 1) Errors.INELIGIBLE_REPLICA else Errors.OPERATION_NOT_ATTEMPTED
val expectedResponseTopic = new AlterPartitionResponseData.TopicData()
.setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
.setPartitionIndex(tp.partition)
.setErrorCode(expectedError.code())
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)
).asJava)
if (alterPartitionVersion > 1) expectedResponseTopic.setTopicId(topicId) else expectedResponseTopic.setTopicName(tp.topic)
// We expect an ineligble replica error response for the partition.
val expectedAlterPartitionResponse = new AlterPartitionResponseData()
.setTopics(Seq(expectedResponseTopic).asJava)
val newLeaderIsrAndControllerEpochMap = controller.controllerContext.partitionsLeadershipInfo
val newLeaderAndIsr = newLeaderIsrAndControllerEpochMap(tp).leaderAndIsr
assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS))
assertEquals(List(controllerId), newLeaderAndIsr.isr)
// Bring replica back online.
servers(brokerId).startup()
// Wait for broker to rejoin ISR.
TestUtils.waitUntilTrue(() => fullIsr == zkClient.getTopicPartitionState(tp).get.leaderAndIsr.isr, "Replica did not rejoin ISR.")
}
@Test
def testAlterPartitionErrors(): Unit = {
servers = makeServers(2)
@ -1338,12 +1389,7 @@ class ControllerIntegrationTest extends QuorumTestHarness {
.setNewIsr(isr.toList.map(Int.box).asJava)
.setLeaderRecoveryState(leaderRecoveryState)).asJava)).asJava)
val future = new CompletableFuture[AlterPartitionResponseData]()
getController().kafkaController.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
if (topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 1,
future.complete
))
val future = alterPartitionFuture(alterPartitionRequest, if (topicIdOpt.isDefined) AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION else 1)
val expectedAlterPartitionResponse = if (topLevelError != Errors.NONE) {
new AlterPartitionResponseData().setErrorCode(topLevelError.code)
@ -1818,4 +1864,15 @@ class ControllerIntegrationTest extends QuorumTestHarness {
servers.filter(s => s.config.brokerId == controllerId).head
}
private def alterPartitionFuture(alterPartitionRequest: AlterPartitionRequestData,
alterPartitionVersion: Short): CompletableFuture[AlterPartitionResponseData] = {
val future = new CompletableFuture[AlterPartitionResponseData]()
getController().kafkaController.eventManager.put(AlterPartitionReceived(
alterPartitionRequest,
alterPartitionVersion,
future.complete
))
future
}
}