KAFKA-9839; Broker should accept control requests with newer broker epoch (#8509)

A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. 

With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Anna Povzner 2020-04-27 12:41:30 -07:00 committed by GitHub
parent 2ca19cf603
commit bd17085ec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 206 additions and 22 deletions

View File

@ -3084,10 +3084,9 @@ class KafkaApis(val requestChannel: RequestChannel,
// if the controller hasn't been upgraded to use KIP-380
if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false
else {
val curBrokerEpoch = controller.brokerEpoch
if (brokerEpochInRequest < curBrokerEpoch) true
else if (brokerEpochInRequest == curBrokerEpoch) false
else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch")
// brokerEpochInRequest > controller.brokerEpoch is possible in rare scenarios where the controller gets notified
// about the new broker epoch and sends a control request with this epoch before the broker learns about it
brokerEpochInRequest < controller.brokerEpoch
}
}

View File

@ -95,15 +95,20 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
@Test
def testControlRequestWithCorrectBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(false)
testControlRequestWithBrokerEpoch(0)
}
@Test
def testControlRequestWithStaleBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(true)
testControlRequestWithBrokerEpoch(-1)
}
private def testControlRequestWithBrokerEpoch(isEpochInRequestStale: Boolean): Unit = {
@Test
def testControlRequestWithNewerBrokerEpoch(): Unit = {
testControlRequestWithBrokerEpoch(1)
}
private def testControlRequestWithBrokerEpoch(epochInRequestDiffFromCurrentEpoch: Long): Unit = {
val tp = new TopicPartition("new-topic", 0)
// create topic with 1 partition, 2 replicas, one on each broker
@ -128,8 +133,7 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
controllerChannelManager.startup()
val broker2 = servers(brokerId2)
val epochInRequest =
if (isEpochInRequestStale) broker2.kafkaController.brokerEpoch - 1 else broker2.kafkaController.brokerEpoch
val epochInRequest = broker2.kafkaController.brokerEpoch + epochInRequestDiffFromCurrentEpoch
try {
// Send LeaderAndIsr request with correct broker epoch
@ -151,10 +155,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, nodes.toSet.asJava)
if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in LEADER_AND_ISR
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
// broker epoch in LEADER_AND_ISR >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000)
}
@ -191,10 +197,12 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest,
partitionStates.asJava, liveBrokers.asJava)
if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in UPDATE_METADATA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
}
else {
// broker epoch in UPDATE_METADATA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic, tp.partition, 10000)
assertEquals(brokerId2,
@ -217,9 +225,11 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
epochInRequest, // Correct broker epoch
false, topicStates)
if (isEpochInRequestStale) {
if (epochInRequestDiffFromCurrentEpoch < 0) {
// stale broker epoch in STOP_REPLICA
sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
} else {
// broker epoch in STOP_REPLICA >= current broker epoch
sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
assertEquals(HostedPartition.None, broker2.replicaManager.getPartition(tp))
}

View File

@ -21,8 +21,7 @@ import java.net.InetAddress
import java.nio.charset.StandardCharsets
import java.util
import java.util.Arrays.asList
import java.util.Random
import java.util.{Collections, Optional}
import java.util.{Collections, Optional, Random}
import java.util.concurrent.TimeUnit
import kafka.api.LeaderAndIsr
@ -42,7 +41,7 @@ import kafka.server.QuotaFactory.QuotaManagers
import kafka.utils.{MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
@ -1466,7 +1465,7 @@ class KafkaApisTest {
val leaderEpoch = 0
val tp0 = new TopicPartition("tp", 0)
val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0,0, Int.MaxValue, Optional.of(leaderEpoch)))
val fetchData = Collections.singletonMap(tp0, new FetchRequest.PartitionData(0, 0, Int.MaxValue, Optional.of(leaderEpoch)))
val fetchFromFollower = buildRequest(new FetchRequest.Builder(
ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(), 1, 1000, 0, fetchData
).build())
@ -1553,6 +1552,178 @@ class KafkaApisTest {
assertEquals(Errors.INVALID_REQUEST, response.error)
}
@Test
def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}
@Test
def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}
@Test
def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}
def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest)
val request = buildRequest(updateMetadataRequest)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
EasyMock.eq(request.context.correlationId),
EasyMock.anyObject()
)).andStubReturn(
Seq()
)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, controller, requestChannel)
createKafkaApis().handleUpdateMetadataRequest(request)
val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse)
.asInstanceOf[UpdateMetadataResponse]
assertEquals(expectedError, updateMetadataResponse.error())
EasyMock.verify(replicaManager)
}
@Test
def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}
@Test
def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}
@Test
def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}
def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 2
val controllerEpoch = 6
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val partitionStates = Seq(
new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setTopicName("topicW")
.setPartitionIndex(1)
.setControllerEpoch(1)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(asList(0, 1))
.setZkVersion(2)
.setReplicas(asList(0, 1, 2))
.setIsNew(false)
).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
ApiKeys.LEADER_AND_ISR.latestVersion,
controllerId,
controllerEpoch,
brokerEpochInRequest,
partitionStates,
asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
).build()
val request = buildRequest(leaderAndIsrRequest)
val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.NONE.code)
.setPartitionErrors(asList()))
EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.becomeLeaderOrFollower(
EasyMock.eq(request.context.correlationId),
EasyMock.anyObject(),
EasyMock.anyObject()
)).andStubReturn(
response
)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, controller, requestChannel)
createKafkaApis().handleLeaderAndIsrRequest(request)
val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse)
.asInstanceOf[LeaderAndIsrResponse]
assertEquals(expectedError, leaderAndIsrResponse.error())
EasyMock.verify(replicaManager)
}
@Test
def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE)
}
@Test
def testStopReplicaRequestWithNewerBrokerEpochIsValid(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE)
}
@Test
def testStopReplicaRequestWithStaleBrokerEpochIsRejected(): Unit = {
val currentBrokerEpoch = 1239875L
testStopReplicaRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH)
}
def testStopReplicaRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = {
val controllerId = 0
val controllerEpoch = 5
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val fooPartition = new TopicPartition("foo", 0)
val topicStates = Seq(
new StopReplicaTopicState()
.setTopicName(fooPartition.topic())
.setPartitionStates(Seq(new StopReplicaPartitionState()
.setPartitionIndex(fooPartition.partition())
.setLeaderEpoch(1)
.setDeletePartition(false)).asJava)
).asJava
val stopReplicaRequest = new StopReplicaRequest.Builder(
ApiKeys.STOP_REPLICA.latestVersion,
controllerId,
controllerEpoch,
brokerEpochInRequest,
false,
topicStates
).build()
val request = buildRequest(stopReplicaRequest)
EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
EasyMock.expect(replicaManager.stopReplicas(
EasyMock.eq(request.context.correlationId),
EasyMock.eq(controllerId),
EasyMock.eq(controllerEpoch),
EasyMock.eq(brokerEpochInRequest),
EasyMock.eq(stopReplicaRequest.partitionStates().asScala)
)).andStubReturn(
(mutable.Map(
fooPartition -> Errors.NONE
), Errors.NONE)
)
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(controller, replicaManager, requestChannel)
createKafkaApis().handleStopReplicaRequest(request)
val stopReplicaResponse = readResponse(ApiKeys.STOP_REPLICA, stopReplicaRequest, capturedResponse)
.asInstanceOf[StopReplicaResponse]
assertEquals(expectedError, stopReplicaResponse.error())
EasyMock.verify(replicaManager)
}
/**
* Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
*/
@ -1680,7 +1851,7 @@ class KafkaApisTest {
capturedResponse
}
private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
private def createBasicMetadataRequest(topic: String, numPartitions: Int, brokerEpoch: Long): UpdateMetadataRequest = {
val replicas = List(0.asInstanceOf[Integer]).asJava
def createPartitionState(partition: Int) = new UpdateMetadataPartitionState()
@ -1703,8 +1874,12 @@ class KafkaApisTest {
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)).asJava)
val partitionStates = (0 until numPartitions).map(createPartitionState)
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, partitionStates.asJava, Seq(broker).asJava).build()
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, brokerEpoch, partitionStates.asJava, Seq(broker).asJava).build()
}
private def setupBasicMetadataCache(topic: String, numPartitions: Int): Unit = {
val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0)
metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
}
}