KAFKA-9198; Complete purgatory operations on receiving StopReplica (#7701)

Force completion of delayed operations when receiving a StopReplica request. In the case of a partition reassignment, we cannot rely on receiving a LeaderAndIsr request in order to complete these operations because the leader may no longer be a replica. Previously when this happened, the delayed operations were left to eventually timeout. 

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>

Co-Authored-By: Kun Du <kidkun@users.noreply.github.com>
This commit is contained in:
Jason Gustafson 2019-11-18 15:11:42 -08:00 committed by GitHub
parent 305f134abf
commit 6ae1af842a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 155 additions and 32 deletions

View File

@ -29,8 +29,11 @@ import scala.collection._
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) { case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {
override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " + override def toString: String = {
"fetchInfo: " + fetchInfo + "]" "[startOffsetMetadata: " + startOffsetMetadata +
", fetchInfo: " + fetchInfo +
"]"
}
} }
/** /**

View File

@ -17,7 +17,6 @@
package kafka.server package kafka.server
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock import java.util.concurrent.locks.Lock
@ -86,17 +85,15 @@ class DelayedProduce(delayMs: Long,
trace(s"Checking produce satisfaction for $topicPartition, current status $status") trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied // skip those partitions that have already been satisfied
if (status.acksPending) { if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartition(topicPartition) match { val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
case HostedPartition.Online(partition) => case Left(err) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case HostedPartition.Offline =>
(false, Errors.KAFKA_STORAGE_ERROR)
case HostedPartition.None =>
// Case A // Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION) (false, err)
case Right(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
} }
// Case B.1 || B.2 // Case B.1 || B.2
if (error != Errors.NONE || hasEnough) { if (error != Errors.NONE || hasEnough) {
status.acksPending = false status.acksPending = false

View File

@ -383,9 +383,20 @@ class ReplicaManager(val config: KafkaConfig,
if (logManager.getLog(topicPartition, isFuture = true).isDefined) if (logManager.getLog(topicPartition, isFuture = true).isDefined)
logManager.asyncDelete(topicPartition, isFuture = true) logManager.asyncDelete(topicPartition, isFuture = true)
} }
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition") stateChangeLogger.trace(s"Finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
} }
private def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit = {
val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
}
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = { def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = {
replicaStateChangeLock synchronized { replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicPartition, Errors] val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
@ -454,12 +465,24 @@ class ReplicaManager(val config: KafkaConfig,
} }
def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = { def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
getPartitionOrError(topicPartition, expectLeader) match {
case Left(Errors.KAFKA_STORAGE_ERROR) =>
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
case Left(error) =>
throw error.exception(s"Error while fetching partition state for $topicPartition")
case Right(partition) => partition
}
}
def getPartitionOrError(topicPartition: TopicPartition, expectLeader: Boolean): Either[Errors, Partition] = {
getPartition(topicPartition) match { getPartition(topicPartition) match {
case HostedPartition.Online(partition) => case HostedPartition.Online(partition) =>
partition Right(partition)
case HostedPartition.Offline => case HostedPartition.Offline =>
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory") Left(Errors.KAFKA_STORAGE_ERROR)
case HostedPartition.None if metadataCache.contains(topicPartition) => case HostedPartition.None if metadataCache.contains(topicPartition) =>
if (expectLeader) { if (expectLeader) {
@ -467,13 +490,13 @@ class ReplicaManager(val config: KafkaConfig,
// forces clients to refresh metadata to find the new location. This can happen, for example, // forces clients to refresh metadata to find the new location. This can happen, for example,
// during a partition reassignment if a produce request from the client is sent to a broker after // during a partition reassignment if a produce request from the client is sent to a broker after
// the local replica has been deleted. // the local replica has been deleted.
throw new NotLeaderForPartitionException(s"Broker $localBrokerId is not a replica of $topicPartition") Left(Errors.NOT_LEADER_FOR_PARTITION)
} else { } else {
throw new ReplicaNotAvailableException(s"Partition $topicPartition is not available") Left(Errors.REPLICA_NOT_AVAILABLE)
} }
case HostedPartition.None => case HostedPartition.None =>
throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist") Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
} }
} }
@ -1484,9 +1507,7 @@ class ReplicaManager(val config: KafkaConfig,
} }
partitionsToMakeFollower.foreach { partition => partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = TopicPartitionOperationKey(partition.topicPartition) completeDelayedFetchOrProduceRequests(partition.topicPartition)
delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
} }
partitionsToMakeFollower.foreach { partition => partitionsToMakeFollower.foreach { partition =>

View File

@ -49,6 +49,7 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition}
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Before, Test} import org.junit.{After, Before, Test}
import org.mockito.Mockito
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq} import scala.collection.{Map, Seq}
@ -944,14 +945,14 @@ class ReplicaManagerTest {
Optional.of(0)) Optional.of(0))
var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE) assertEquals(Errors.NONE, fetchResult.get.error)
// Fetch from follower, with empty ClientMetadata (which implies an older version) // Fetch from follower, with empty ClientMetadata (which implies an older version)
partitionData = new FetchRequest.PartitionData(0L, 0L, 100, partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(0)) Optional.of(0))
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None) fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION) assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
} }
@Test @Test
@ -999,7 +1000,7 @@ class ReplicaManagerTest {
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NOT_LEADER_FOR_PARTITION) assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
} }
@Test @Test
@ -1048,7 +1049,7 @@ class ReplicaManagerTest {
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.FENCED_LEADER_EPOCH) assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error)
} }
@Test @Test
@ -1079,13 +1080,114 @@ class ReplicaManagerTest {
Optional.of(1)) Optional.of(1))
var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE) assertEquals(Errors.NONE, fetchResult.get.error)
partitionData = new FetchRequest.PartitionData(0L, 0L, 100, partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.empty()) Optional.empty())
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata))
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(fetchResult.get.error, Errors.NONE) assertEquals(Errors.NONE, fetchResult.get.error)
}
@Test
def testClearFetchPurgatoryOnStopReplica(): Unit = {
// As part of a reassignment, we may send StopReplica to the old leader.
// In this case, we should ensure that pending purgatory operations are cancelled
// immediately rather than sitting around to timeout.
val mockTimer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val partitionData = new FetchRequest.PartitionData(0L, 0L, 100,
Optional.of(1))
val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
assertNull(fetchResult.get)
Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
// We have a fetch in purgatory, now receive a stop replica request and
// assert that the fetch returns with a NOT_LEADER error
replicaManager.stopReplica(tp0, deletePartition = true)
assertNotNull(fetchResult.get)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
}
@Test
def testClearProducePurgatoryOnStopReplica(): Unit = {
val mockTimer = new MockTimer
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(0, isNew = false, isFutureReplica = false, offsetCheckpoints)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
Seq(new LeaderAndIsrPartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setZkVersion(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val produceResult = sendProducerAppend(replicaManager, tp0)
assertNull(produceResult.get)
Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
replicaManager.stopReplica(tp0, deletePartition = true)
assertNotNull(produceResult.get)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResult.get.error)
}
private def sendProducerAppend(replicaManager: ReplicaManager,
topicPartition: TopicPartition): AtomicReference[PartitionResponse] = {
val produceResult = new AtomicReference[PartitionResponse]()
def callback(response: Map[TopicPartition, PartitionResponse]): Unit = {
produceResult.set(response(topicPartition))
}
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("a".getBytes()),
new SimpleRecord("b".getBytes()),
new SimpleRecord("c".getBytes())
)
replicaManager.appendRecords(
timeout = 10,
requiredAcks = -1,
internalTopicsAllowed = false,
isFromClient = true,
entriesPerPartition = Map(topicPartition -> records),
responseCallback = callback
)
produceResult
} }
private def sendConsumerFetch(replicaManager: ReplicaManager, private def sendConsumerFetch(replicaManager: ReplicaManager,
@ -1370,14 +1472,14 @@ class ReplicaManagerTest {
val logProps = new Properties() val logProps = new Properties()
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps)) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId)) val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() val metadataCache: MetadataCache = Mockito.mock(classOf[MetadataCache])
Mockito.when(metadataCache.getAliveBrokers).thenReturn(aliveBrokers)
aliveBrokerIds.foreach { brokerId => aliveBrokerIds.foreach { brokerId =>
EasyMock.expect(metadataCache.getAliveBroker(EasyMock.eq(brokerId))) Mockito.when(metadataCache.getAliveBroker(brokerId))
.andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId))) .thenReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
.anyTimes()
} }
EasyMock.replay(metadataCache)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false) purgatoryName = "Produce", timer, reaperEnabled = false)