KAFKA-13161; Update replica partition state and replica fetcher state on follower update (#11189)

When processing the topics delta, make sure that the replica manager partition state and replica fetcher state matches the information included in the topic delta. Also ensure that delayed operations are processed after the follower state change has been made since that is what allows them to be completed.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
José Armando García Sancio 2021-08-10 12:06:30 -07:00 committed by GitHub
parent a5daae20b5
commit 0837fba997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 566 additions and 234 deletions

View File

@ -74,11 +74,12 @@ class DelayedProduce(delayMs: Long,
* The delayed produce operation can be completed if every partition * The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following: * it produces to is satisfied by one of the following:
* *
* Case A: This broker is no longer the leader: set an error in response * Case A: Replica not assigned to partition
* Case B: This broker is the leader: * Case B: Replica is no longer the leader of this partition
* B.1 - If there was a local error thrown while checking if at least requiredAcks * Case C: This broker is the leader:
* C.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response * replicas have caught up to this operation: set an error in response
* B.2 - Otherwise, set the response with no error. * C.2 - Otherwise, set the response with no error.
*/ */
override def tryComplete(): Boolean = { override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks // check for each partition if it still has pending acks
@ -95,7 +96,7 @@ class DelayedProduce(delayMs: Long,
partition.checkEnoughReplicasReachOffset(status.requiredOffset) partition.checkEnoughReplicasReachOffset(status.requiredOffset)
} }
// Case B.1 || B.2 // Case B || C.1 || C.2
if (error != Errors.NONE || hasEnough) { if (error != Errors.NONE || hasEnough) {
status.acksPending = false status.acksPending = false
status.responseStatus.error = error status.responseStatus.error = error
@ -103,7 +104,7 @@ class DelayedProduce(delayMs: Long,
} }
} }
// check if every partition has satisfied at least one of case A or B // check if every partition has satisfied at least one of case A, B or C
if (!produceMetadata.produceStatus.values.exists(_.acksPending)) if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete() forceComplete()
else else

View File

@ -2148,8 +2148,8 @@ class ReplicaManager(val config: KafkaConfig,
if (localLog(tp).isEmpty) if (localLog(tp).isEmpty)
markPartitionOffline(tp) markPartitionOffline(tp)
} }
newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_)) newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded)
newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_)) newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded)
replicaFetcherManager.shutdownIdleFetcherThreads() replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@ -2193,7 +2193,6 @@ class ReplicaManager(val config: KafkaConfig,
newLocalFollowers: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = { newLocalFollowers: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = {
stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " + stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
"local followers.") "local followers.")
replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet)
val shuttingDown = isShuttingDown.get() val shuttingDown = isShuttingDown.get()
val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, InitialFetchState] val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, InitialFetchState]
val newFollowerTopicSet = new mutable.HashSet[String] val newFollowerTopicSet = new mutable.HashSet[String]
@ -2202,13 +2201,6 @@ class ReplicaManager(val config: KafkaConfig,
try { try {
newFollowerTopicSet.add(tp.topic()) newFollowerTopicSet.add(tp.topic())
completeDelayedFetchOrProduceRequests(tp)
// Create the local replica even if the leader is unavailable. This is required
// to ensure that we include the partition's high watermark in the checkpoint
// file (see KAFKA-1647)
partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
if (shuttingDown) { if (shuttingDown) {
stateChangeLogger.trace(s"Unable to start fetching ${tp} with topic " + stateChangeLogger.trace(s"Unable to start fetching ${tp} with topic " +
s"ID ${info.topicId} because the replica manager is shutting down.") s"ID ${info.topicId} because the replica manager is shutting down.")
@ -2216,15 +2208,30 @@ class ReplicaManager(val config: KafkaConfig,
val listenerName = config.interBrokerListenerName.value() val listenerName = config.interBrokerListenerName.value()
val leader = info.partition.leader val leader = info.partition.leader
Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) match { Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) match {
case None => stateChangeLogger.trace(s"Unable to start fetching ${tp} " + case None =>
s"with topic ID ${info.topicId} from leader ${leader} because it is not " + stateChangeLogger.trace(
"alive.") s"Unable to start fetching ${tp} with topic ID ${info.topicId} from leader " +
s"${leader} because it is not alive."
)
// Create the local replica even if the leader is unavailable. This is required
// to ensure that we include the partition's high watermark in the checkpoint
// file (see KAFKA-1647)
partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId))
case Some(node) => case Some(node) =>
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) {
val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port()) val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port())
val log = partition.localLogOrException val log = partition.localLogOrException
val fetchOffset = initialFetchOffset(log) val fetchOffset = initialFetchOffset(log)
partitionsToMakeFollower.put(tp, partitionsToMakeFollower.put(tp,
InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset))
} else {
stateChangeLogger.info(
s"Skipped the become-follower state change after marking its partition as " +
s"follower for partition $tp with id ${info.topicId} and partition state $state."
)
}
} }
} }
changedPartitions.add(partition) changedPartitions.add(partition)
@ -2235,8 +2242,16 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
} }
updateLeaderAndFollowerMetrics(newFollowerTopicSet)
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower) replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions")
partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
updateLeaderAndFollowerMetrics(newFollowerTopicSet)
} }
def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {

View File

@ -17,6 +17,14 @@
package kafka.server package kafka.server
import java.io.File
import java.net.InetAddress
import java.nio.file.Files
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.stream.IntStream
import java.util.{Collections, Optional, Properties}
import kafka.api._ import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition} import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.log._ import kafka.log._
@ -30,7 +38,9 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata import org.apache.kafka.common.replica.ClientMetadata
@ -41,25 +51,14 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicImage, TopicsDelta, TopicsImage }
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.{ArgumentMatchers, Mockito}
import java.io.File
import java.net.InetAddress
import java.nio.file.Files
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Collections, Optional, Properties}
import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage}
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.mockito.invocation.InvocationOnMock import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer import org.mockito.stubbing.Answer
import org.mockito.{ArgumentMatchers, Mockito}
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -1009,6 +1008,7 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = extraProps, topicId = Some(topicId)) expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = extraProps, topicId = Some(topicId))
try {
// Initialize partition state to follower, with leader = 1, leaderEpoch = 1 // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
val tp = new TopicPartition(topic, topicPartition) val tp = new TopicPartition(topic, topicPartition)
val partition = replicaManager.createPartition(tp) val partition = replicaManager.createPartition(tp)
@ -1034,6 +1034,11 @@ class ReplicaManagerTest {
// Truncation should have happened once // Truncation should have happened once
EasyMock.verify(mockLogMgr) EasyMock.verify(mockLogMgr)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1085,6 +1090,7 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId))
try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
@ -1119,6 +1125,11 @@ class ReplicaManagerTest {
// But only leader will compute preferred replica // But only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1136,6 +1147,7 @@ class ReplicaManagerTest {
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId))
try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
@ -1170,6 +1182,11 @@ class ReplicaManagerTest {
// Returns a preferred replica (should just be the leader, which is None) // Returns a preferred replica (should just be the leader, which is None)
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1281,6 +1298,7 @@ class ReplicaManagerTest {
def testFetchFollowerNotAllowedForOlderClients(): Unit = { def testFetchFollowerNotAllowedForOlderClients(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1))
try {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -1314,6 +1332,11 @@ class ReplicaManagerTest {
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None) fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1367,6 +1390,7 @@ class ReplicaManagerTest {
val mockTimer = new MockTimer(time) val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
try {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -1410,6 +1434,11 @@ class ReplicaManagerTest {
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1417,6 +1446,7 @@ class ReplicaManagerTest {
val mockTimer = new MockTimer(time) val mockTimer = new MockTimer(time)
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1))
try {
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
@ -1461,6 +1491,11 @@ class ReplicaManagerTest {
assertNotNull(fetchResult.get) assertNotNull(fetchResult.get)
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error) assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test @Test
@ -1573,7 +1608,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val produceResult = sendProducerAppend(replicaManager, tp0) val produceResult = sendProducerAppend(replicaManager, tp0, 3)
assertNull(produceResult.get) assertNull(produceResult.get)
Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
@ -1588,17 +1623,22 @@ class ReplicaManagerTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error)
} }
private def sendProducerAppend(replicaManager: ReplicaManager, private def sendProducerAppend(
topicPartition: TopicPartition): AtomicReference[PartitionResponse] = { replicaManager: ReplicaManager,
topicPartition: TopicPartition,
numOfRecords: Int
): AtomicReference[PartitionResponse] = {
val produceResult = new AtomicReference[PartitionResponse]() val produceResult = new AtomicReference[PartitionResponse]()
def callback(response: Map[TopicPartition, PartitionResponse]): Unit = { def callback(response: Map[TopicPartition, PartitionResponse]): Unit = {
produceResult.set(response(topicPartition)) produceResult.set(response(topicPartition))
} }
val records = MemoryRecords.withRecords(CompressionType.NONE, val records = MemoryRecords.withRecords(
new SimpleRecord("a".getBytes()), CompressionType.NONE,
new SimpleRecord("b".getBytes()), IntStream
new SimpleRecord("c".getBytes()) .range(0, numOfRecords)
.mapToObj(i => new SimpleRecord(i.toString.getBytes))
.toArray(Array.ofDim[SimpleRecord]): _*
) )
replicaManager.appendRecords( replicaManager.appendRecords(
@ -2828,4 +2868,280 @@ class ReplicaManagerTest {
Replicas.NONE, Replicas.NONE, 2, 123, 456)))), Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
replicaManager.calculateDeltaChanges(TEST_DELTA)) replicaManager.calculateDeltaChanges(TEST_DELTA))
} }
@Test
def testDeltaFromLeaderToFollower(): Unit = {
val localId = 1
val otherId = localId + 1
val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
// Send a produce request and advance the highwatermark
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
fetchMessages(
replicaManager,
otherId,
topicPartition,
new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
Int.MaxValue,
IsolationLevel.READ_UNCOMMITTED,
None
)
assertEquals(Errors.NONE, leaderResponse.get.error)
// Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
// Append on a follower should fail
val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(epoch + 1, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaFromFollowerToLeader(): Unit = {
val localId = 1
val otherId = localId + 1
val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(epoch, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Append on a follower should fail
val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
// Change the local replica to leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch + 1))
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch + 1))
// Send a produce request and advance the highwatermark
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
fetchMessages(
replicaManager,
otherId,
topicPartition,
new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
Int.MaxValue,
IsolationLevel.READ_UNCOMMITTED,
None
)
assertEquals(Errors.NONE, leaderResponse.get.error)
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch + 1, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaFollowerWithNoChange(): Unit = {
val localId = 1
val otherId = localId + 1
val epoch = 100
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch))
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(epoch, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply the same delta again
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch))
// Check that the state stays the same
val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
assertFalse(noChangePartition.isLeader)
assertEquals(epoch, noChangePartition.getLeaderEpoch)
val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaToFollowerCompletesProduce(): Unit = {
val localId = 1
val otherId = localId + 1
val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
// Send a produce request
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
// Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaToFollowerCompletesFetch(): Unit = {
val localId = 1
val otherId = localId + 1
val epoch = 100
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch))
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch))
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
// Send a fetch request
val fetchCallback = fetchMessages(
replicaManager,
otherId,
topicPartition,
new PartitionData(0, 0, Int.MaxValue, Optional.empty()),
Int.MaxValue,
IsolationLevel.READ_UNCOMMITTED,
None
)
// Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1))
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1))
// Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
private def topicsImage(replica: Int, isLeader: Boolean, epoch: Int): TopicsImage = {
val leader = if (isLeader) replica else replica + 1
val topicsById = new util.HashMap[Uuid, TopicImage]()
val topicsByName = new util.HashMap[String, TopicImage]()
val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
fooPartitions.put(0, new PartitionRegistration(Array(replica, replica + 1),
Array(replica, replica + 1), Replicas.NONE, Replicas.NONE, leader, epoch, epoch))
val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
topicsById.put(FOO_UUID, foo)
topicsByName.put("foo", foo)
new TopicsImage(topicsById, topicsByName)
}
private def topicsDelta(replica: Int, isLeader: Boolean, epoch: Int): TopicsDelta = {
val leader = if (isLeader) replica else replica + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
delta.replay(new PartitionRecord().setPartitionId(0).
setTopicId(FOO_UUID).
setReplicas(util.Arrays.asList(replica, replica + 1)).
setIsr(util.Arrays.asList(replica, replica + 1)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(leader).
setLeaderEpoch(epoch).
setPartitionEpoch(epoch))
delta
}
private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = {
new MetadataImage(
FeaturesImage.EMPTY,
ClusterImageTest.IMAGE1,
topicsImage,
ConfigurationsImage.EMPTY,
ClientQuotasImage.EMPTY
)
}
} }

View File

@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40) @Timeout(value = 40)
public class ClusterImageTest { public class ClusterImageTest {
final static ClusterImage IMAGE1; public final static ClusterImage IMAGE1;
static final List<ApiMessageAndVersion> DELTA1_RECORDS; static final List<ApiMessageAndVersion> DELTA1_RECORDS;