KAFKA-13198: Stop replicas when reassigned (#11216)

Stop the replica and resign the coordinators when a replica gets reassigned away from a topic partition.

1. Implement localChanges in TopicsDelta and TopicDelta to return all of the partitions that were deleted, became leader and became follower for the given broker id.
2. Add tests for TopicsDelta::localChanges
3. Resign coordinators that were moved away from the consumer offset and transaction topic partitions.
4. Add replica manager tests for testing reassignment of replicas and removal of topic.
5. Add a new type LocalReplicaChanges that encapsulates topic partitions deleted, became leader and became follower.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
José Armando García Sancio 2021-08-17 13:10:03 -07:00 committed by GitHub
parent b923ec236e
commit 9bcf4a525b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 707 additions and 240 deletions

View File

@ -113,7 +113,7 @@ class BrokerServer(
var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigHandlers: Map[String, ConfigHandler] = null
var replicaManager: ReplicaManager = null @volatile private[this] var _replicaManager: ReplicaManager = null
var credentialProvider: CredentialProvider = null var credentialProvider: CredentialProvider = null
var tokenCache: DelegationTokenCache = null var tokenCache: DelegationTokenCache = null
@ -173,6 +173,8 @@ class BrokerServer(
true true
} }
def replicaManager: ReplicaManager = _replicaManager
def startup(): Unit = { def startup(): Unit = {
if (!maybeChangeStatus(SHUTDOWN, STARTING)) return if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
try { try {
@ -250,7 +252,7 @@ class BrokerServer(
) )
alterIsrManager.start() alterIsrManager.start()
this.replicaManager = new ReplicaManager(config, metrics, time, None, this._replicaManager = new ReplicaManager(config, metrics, time, None,
kafkaScheduler, logManager, isShuttingDown, quotaManagers, kafkaScheduler, logManager, isShuttingDown, quotaManagers,
brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
threadNamePrefix) threadNamePrefix)

View File

@ -115,7 +115,7 @@ class KafkaServer(
var logDirFailureChannel: LogDirFailureChannel = null var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null var logManager: LogManager = null
var replicaManager: ReplicaManager = null @volatile private[this] var _replicaManager: ReplicaManager = null
var adminManager: ZkAdminManager = null var adminManager: ZkAdminManager = null
var tokenManager: DelegationTokenManager = null var tokenManager: DelegationTokenManager = null
@ -170,6 +170,8 @@ class KafkaServer(
private[kafka] def featureChangeListener = _featureChangeListener private[kafka] def featureChangeListener = _featureChangeListener
def replicaManager: ReplicaManager = _replicaManager
/** /**
* Start up API for bringing up a single instance of the Kafka server. * Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
@ -308,7 +310,7 @@ class KafkaServer(
} }
alterIsrManager.start() alterIsrManager.start()
replicaManager = createReplicaManager(isShuttingDown) _replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup() replicaManager.startup()
val brokerInfo = createBrokerInfo val brokerInfo = createBrokerInfo

View File

@ -60,8 +60,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{MetadataImage, TopicsDelta} import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.PartitionRegistration
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
@ -84,8 +83,6 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
} }
} }
case class LocalLeaderInfo(topicId: Uuid, partition: PartitionRegistration)
/** /**
* Result metadata of a log read operation on the log * Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read * @param info @FetchDataInfo returned by the @Log read
@ -434,7 +431,9 @@ class ReplicaManager(val config: KafkaConfig,
* @return A map from partitions to exceptions which occurred. * @return A map from partitions to exceptions which occurred.
* If no errors occurred, the map will be empty. * If no errors occurred, the map will be empty.
*/ */
protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { protected def stopPartitions(
partitionsToStop: Map[TopicPartition, Boolean]
): Map[TopicPartition, Throwable] = {
// First stop fetchers for all partitions. // First stop fetchers for all partitions.
val partitions = partitionsToStop.keySet val partitions = partitionsToStop.keySet
replicaFetcherManager.removeFetcherForPartitions(partitions) replicaFetcherManager.removeFetcherForPartitions(partitions)
@ -2074,32 +2073,6 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
: (mutable.HashMap[TopicPartition, Boolean],
mutable.HashMap[TopicPartition, LocalLeaderInfo],
mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
val deleted = new mutable.HashMap[TopicPartition, Boolean]()
delta.deletedTopicIds().forEach { topicId =>
val topicImage = delta.image().getTopic(topicId)
topicImage.partitions().keySet().forEach { partitionId =>
deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
}
}
val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]()
val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]()
delta.changedTopics().values().forEach { topicDelta =>
topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
LocalLeaderInfo(topicDelta.id(), e.getValue))
}
topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
LocalLeaderInfo(topicDelta.id(), e.getValue))
}
}
(deleted, newLocalLeaders, newLocalFollowers)
}
/** /**
* Apply a KRaft topic change delta. * Apply a KRaft topic change delta.
* *
@ -2107,15 +2080,16 @@ class ReplicaManager(val config: KafkaConfig,
* @param delta The delta to apply. * @param delta The delta to apply.
*/ */
def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
// Before taking the lock, build some hash maps that we will need. // Before taking the lock, compute the local changes
val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) val localChanges = delta.localChanges(config.nodeId)
replicaStateChangeLock.synchronized { replicaStateChangeLock.synchronized {
// Handle deleted partitions. We need to do this first because we might subsequently // Handle deleted partitions. We need to do this first because we might subsequently
// create new partitions with the same names as the ones we are deleting here. // create new partitions with the same names as the ones we are deleting here.
if (!deleted.isEmpty) { if (!localChanges.deletes.isEmpty) {
stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
stopPartitions(deleted).foreach { case (topicPartition, e) => stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
stopPartitions(deletes).foreach { case (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) { if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Unable to delete replica ${topicPartition} because " + stateChangeLogger.error(s"Unable to delete replica ${topicPartition} because " +
"the local replica for the partition is in an offline log directory") "the local replica for the partition is in an offline log directory")
@ -2125,15 +2099,16 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
} }
// Handle partitions which we are now the leader or follower for. // Handle partitions which we are now the leader or follower for.
if (!newLocalLeaders.isEmpty || !newLocalFollowers.isEmpty) { if (!localChanges.leaders.isEmpty || !localChanges.followers.isEmpty) {
val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints) val lazyOffsetCheckpoints = new LazyOffsetCheckpoints(this.highWatermarkCheckpoints)
val changedPartitions = new mutable.HashSet[Partition] val changedPartitions = new mutable.HashSet[Partition]
if (!newLocalLeaders.isEmpty) { if (!localChanges.leaders.isEmpty) {
applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, newLocalLeaders) applyLocalLeadersDelta(changedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala)
} }
if (!newLocalFollowers.isEmpty) { if (!localChanges.followers.isEmpty) {
applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, newLocalFollowers) applyLocalFollowersDelta(changedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala)
} }
maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints, maybeAddLogDirFetchers(changedPartitions, lazyOffsetCheckpoints,
name => Option(newImage.topics().getTopic(name)).map(_.id())) name => Option(newImage.topics().getTopic(name)).map(_.id()))
@ -2148,8 +2123,8 @@ class ReplicaManager(val config: KafkaConfig,
if (localLog(tp).isEmpty) if (localLog(tp).isEmpty)
markPartitionOffline(tp) markPartitionOffline(tp)
} }
newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded) localChanges.leaders.keySet.forEach(markPartitionOfflineIfNeeded)
newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded) localChanges.followers.keySet.forEach(markPartitionOfflineIfNeeded)
replicaFetcherManager.shutdownIdleFetcherThreads() replicaFetcherManager.shutdownIdleFetcherThreads()
replicaAlterLogDirsManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@ -2157,10 +2132,12 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
private def applyLocalLeadersDelta(changedPartitions: mutable.HashSet[Partition], private def applyLocalLeadersDelta(
delta: TopicsDelta, changedPartitions: mutable.Set[Partition],
offsetCheckpoints: OffsetCheckpoints, delta: TopicsDelta,
newLocalLeaders: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = { offsetCheckpoints: OffsetCheckpoints,
newLocalLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
): Unit = {
stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} partition(s) to " + stateChangeLogger.info(s"Transitioning ${newLocalLeaders.size} partition(s) to " +
"local leaders.") "local leaders.")
replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet) replicaFetcherManager.removeFetcherForPartitions(newLocalLeaders.keySet)
@ -2186,11 +2163,13 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
private def applyLocalFollowersDelta(changedPartitions: mutable.HashSet[Partition], private def applyLocalFollowersDelta(
newImage: MetadataImage, changedPartitions: mutable.Set[Partition],
delta: TopicsDelta, newImage: MetadataImage,
offsetCheckpoints: OffsetCheckpoints, delta: TopicsDelta,
newLocalFollowers: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = { offsetCheckpoints: OffsetCheckpoints,
newLocalFollowers: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]
): Unit = {
stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " + stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " +
"local followers.") "local followers.")
val shuttingDown = isShuttingDown.get() val shuttingDown = isShuttingDown.get()

View File

@ -152,11 +152,16 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Handle the case where we have new local leaders or followers for the consumer // Handle the case where we have new local leaders or followers for the consumer
// offsets topic. // offsets topic.
getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, delta).foreach { topicDelta => getTopicDelta(Topic.GROUP_METADATA_TOPIC_NAME, newImage, delta).foreach { topicDelta =>
topicDelta.newLocalLeaders(brokerId).forEach { val changes = topicDelta.localChanges(brokerId)
entry => groupCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch)
changes.deletes.forEach { topicPartition =>
groupCoordinator.onResignation(topicPartition.partition, None)
} }
topicDelta.newLocalFollowers(brokerId).forEach { changes.leaders.forEach { (topicPartition, partitionInfo) =>
entry => groupCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) groupCoordinator.onElection(topicPartition.partition, partitionInfo.partition.leaderEpoch)
}
changes.followers.forEach { (topicPartition, partitionInfo) =>
groupCoordinator.onResignation(topicPartition.partition, Some(partitionInfo.partition.leaderEpoch))
} }
} }
@ -172,11 +177,16 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// If the transaction state topic changed in a way that's relevant to this broker, // If the transaction state topic changed in a way that's relevant to this broker,
// notify the transaction coordinator. // notify the transaction coordinator.
getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, delta).foreach { topicDelta => getTopicDelta(Topic.TRANSACTION_STATE_TOPIC_NAME, newImage, delta).foreach { topicDelta =>
topicDelta.newLocalLeaders(brokerId).forEach { val changes = topicDelta.localChanges(brokerId)
entry => txnCoordinator.onElection(entry.getKey(), entry.getValue().leaderEpoch)
changes.deletes.forEach { topicPartition =>
txnCoordinator.onResignation(topicPartition.partition, None)
} }
topicDelta.newLocalFollowers(brokerId).forEach { changes.leaders.forEach { (topicPartition, partitionInfo) =>
entry => txnCoordinator.onResignation(entry.getKey(), Some(entry.getValue().leaderEpoch)) txnCoordinator.onElection(topicPartition.partition, partitionInfo.partition.leaderEpoch)
}
changes.followers.forEach { (topicPartition, partitionInfo) =>
txnCoordinator.onResignation(topicPartition.partition, Some(partitionInfo.partition.leaderEpoch))
} }
} }
@ -204,7 +214,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
tag.foreach { t => tag.foreach { t =>
val newProperties = newImage.configs().configProperties(configResource) val newProperties = newImage.configs().configProperties(configResource)
val maybeDefaultName = configResource.name() match { val maybeDefaultName = configResource.name() match {
case "" => ConfigEntityName.Default case "" => ConfigEntityName.Default
case k => k case k => k
} }
dynamicConfigHandlers(t).processConfigChanges(maybeDefaultName, newProperties) dynamicConfigHandlers(t).processConfigChanges(maybeDefaultName, newProperties)

View File

@ -402,6 +402,16 @@ class KRaftClusterTest {
} }
}, "Timed out waiting for replica assignments for topic foo. " + }, "Timed out waiting for replica assignments for topic foo. " +
s"Wanted: ${expectedMapping}. Got: ${currentMapping}") s"Wanted: ${expectedMapping}. Got: ${currentMapping}")
checkReplicaManager(
cluster,
List(
(0, List(true, true, false, true)),
(1, List(true, true, false, true)),
(2, List(true, true, true, true)),
(3, List(false, false, true, true))
)
)
} finally { } finally {
admin.close() admin.close()
} }
@ -410,6 +420,29 @@ class KRaftClusterTest {
} }
} }
private def checkReplicaManager(cluster: KafkaClusterTestKit, expectedHosting: List[(Int, List[Boolean])]): Unit = {
for ((brokerId, partitionsIsHosted) <- expectedHosting) {
val broker = cluster.brokers().get(brokerId)
for ((isHosted, partitionId) <- partitionsIsHosted.zipWithIndex) {
val topicPartition = new TopicPartition("foo", partitionId)
if (isHosted) {
assertNotEquals(
HostedPartition.None,
broker.replicaManager.getPartition(topicPartition),
s"topicPartition = $topicPartition"
)
} else {
assertEquals(
HostedPartition.None,
broker.replicaManager.getPartition(topicPartition),
s"topicPartition = $topicPartition"
)
}
}
}
}
private def translatePartitionInfoToSeq(partitions: util.List[TopicPartitionInfo]): Seq[Seq[Int]] = { private def translatePartitionInfoToSeq(partitions: util.List[TopicPartitionInfo]): Seq[Seq[Int]] = {
partitions.asScala.map(partition => partition.replicas().asScala.map(_.id()).toSeq).toSeq partitions.asScala.map(partition => partition.replicas().asScala.map(_.id()).toSeq).toSeq
} }

View File

@ -38,7 +38,7 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState
import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metadata.{PartitionChangeRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -51,8 +51,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicImage, TopicsDelta, TopicsImage } import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicsDelta, TopicsImage }
import org.apache.kafka.metadata.{PartitionRegistration, Replicas}
import org.easymock.EasyMock import org.easymock.EasyMock
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@ -2780,8 +2779,6 @@ class ReplicaManagerTest {
val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ") val BAR_UUID = Uuid.fromString("vApAP6y7Qx23VOfKBzbOBQ")
val BAZ_UUID = Uuid.fromString("7wVsX2aaTk-bdGcOxLRyVQ")
@Test @Test
def testGetOrCreatePartition(): Unit = { def testGetOrCreatePartition(): Unit = {
val brokerId = 0 val brokerId = 0
@ -2799,95 +2796,25 @@ class ReplicaManagerTest {
assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID)) assertEquals(None, replicaManager.getOrCreatePartition(bar1, emptyDelta, BAR_UUID))
} }
val TEST_IMAGE = {
val topicsById = new util.HashMap[Uuid, TopicImage]()
val topicsByName = new util.HashMap[String, TopicImage]()
val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
fooPartitions.put(0, new PartitionRegistration(Array(1, 2, 3),
Array(1, 2, 3), Replicas.NONE, Replicas.NONE, 1, 100, 200))
fooPartitions.put(1, new PartitionRegistration(Array(4, 5, 6),
Array(4, 5), Replicas.NONE, Replicas.NONE, 5, 300, 400))
val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
val barPartitions = new util.HashMap[Integer, PartitionRegistration]()
barPartitions.put(0, new PartitionRegistration(Array(2, 3, 4),
Array(2, 3, 4), Replicas.NONE, Replicas.NONE, 3, 100, 200))
val bar = new TopicImage("bar", BAR_UUID, barPartitions)
topicsById.put(FOO_UUID, foo)
topicsByName.put("foo", foo)
topicsById.put(BAR_UUID, bar)
topicsByName.put("bar", bar)
new TopicsImage(topicsById, topicsByName)
}
val TEST_DELTA = {
val delta = new TopicsDelta(TEST_IMAGE)
delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID))
delta.replay(new TopicRecord().setName("baz").setTopicId(BAZ_UUID))
delta.replay(new PartitionRecord().setPartitionId(0).
setTopicId(BAZ_UUID).
setReplicas(util.Arrays.asList(1, 2, 4)).
setIsr(util.Arrays.asList(1, 2, 4)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(1).
setLeaderEpoch(123).
setPartitionEpoch(456))
delta.replay(new PartitionRecord().setPartitionId(1).
setTopicId(BAZ_UUID).
setReplicas(util.Arrays.asList(2, 4, 1)).
setIsr(util.Arrays.asList(2, 4, 1)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(2).
setLeaderEpoch(123).
setPartitionEpoch(456))
delta.replay(new PartitionRecord().setPartitionId(2).
setTopicId(BAZ_UUID).
setReplicas(util.Arrays.asList(3, 5, 2)).
setIsr(util.Arrays.asList(3, 5, 2)).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()).
setLeader(3).
setLeaderEpoch(456).
setPartitionEpoch(789))
delta
}
@Test
def testCalculateDeltaChanges(): Unit = {
val brokerId = 1
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), brokerId)
assertEquals((
Map(new TopicPartition("foo", 0) -> true,
new TopicPartition("foo", 1) -> true),
Map(new TopicPartition("baz", 0) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(1, 2, 4), Array(1, 2, 4),
Replicas.NONE, Replicas.NONE, 1, 123, 456))),
Map(new TopicPartition("baz", 1) -> LocalLeaderInfo(BAZ_UUID,
new PartitionRegistration(Array(2, 4, 1), Array(2, 4, 1),
Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
replicaManager.calculateDeltaChanges(TEST_DELTA))
}
@Test @Test
def testDeltaFromLeaderToFollower(): Unit = { def testDeltaFromLeaderToFollower(): Unit = {
val localId = 1 val localId = 1
val otherId = localId + 1 val otherId = localId + 1
val numOfRecords = 3 val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0) val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try { try {
// Make the local replica the leader // Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) val leaderTopicsDelta = topicsCreateDelta(localId, true)
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader) assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch) assertEquals(0, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
@ -2905,8 +2832,9 @@ class ReplicaManagerTest {
assertEquals(Errors.NONE, leaderResponse.get.error) assertEquals(Errors.NONE, leaderResponse.get.error)
// Change the local replica to follower // Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Append on a follower should fail // Append on a follower should fail
val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@ -2915,7 +2843,7 @@ class ReplicaManagerTest {
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader) assertFalse(followerPartition.isLeader)
assertEquals(epoch + 1, followerPartition.getLeaderEpoch) assertEquals(1, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
@ -2931,19 +2859,19 @@ class ReplicaManagerTest {
val localId = 1 val localId = 1
val otherId = localId + 1 val otherId = localId + 1
val numOfRecords = 3 val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0) val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try { try {
// Make the local replica the follower // Make the local replica the follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch)) val followerTopicsDelta = topicsCreateDelta(localId, false)
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader) assertFalse(followerPartition.isLeader)
assertEquals(epoch, followerPartition.getLeaderEpoch) assertEquals(0, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
@ -2953,8 +2881,9 @@ class ReplicaManagerTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
// Change the local replica to leader // Change the local replica to leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch + 1)) val leaderTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, true)
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch + 1)) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Send a produce request and advance the highwatermark // Send a produce request and advance the highwatermark
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
@ -2972,7 +2901,7 @@ class ReplicaManagerTest {
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader) assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch + 1, leaderPartition.getLeaderEpoch) assertEquals(1, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
} finally { } finally {
@ -2986,30 +2915,30 @@ class ReplicaManagerTest {
def testDeltaFollowerWithNoChange(): Unit = { def testDeltaFollowerWithNoChange(): Unit = {
val localId = 1 val localId = 1
val otherId = localId + 1 val otherId = localId + 1
val epoch = 100
val topicPartition = new TopicPartition("foo", 0) val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try { try {
// Make the local replica the follower // Make the local replica the follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch)) val followerTopicsDelta = topicsCreateDelta(localId, false)
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader) assertFalse(followerPartition.isLeader)
assertEquals(epoch, followerPartition.getLeaderEpoch) assertEquals(0, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply the same delta again // Apply the same delta again
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check that the state stays the same // Check that the state stays the same
val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition)
assertFalse(noChangePartition.isLeader) assertFalse(noChangePartition.isLeader)
assertEquals(epoch, noChangePartition.getLeaderEpoch) assertEquals(0, noChangePartition.getLeaderEpoch)
val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker)) assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker))
@ -3020,25 +2949,173 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName) TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
@Test
def testDeltaFollowerToNotReplica(): Unit = {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaFollowerRemovedTopic(): Unit = {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val followerTopicsDelta = topicsCreateDelta(localId, false)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check the state of that partition and fetcher
val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition)
assertFalse(followerPartition.isLeader)
assertEquals(0, followerPartition.getLeaderEpoch)
val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition)
assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker))
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaLeaderToNotReplica(): Unit = {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
// Apply changes that remove replica
val notReplicaTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test
def testDeltaLeaderToRemovedTopic(): Unit = {
val localId = 1
val otherId = localId + 1
val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try {
// Make the local replica the follower
val leaderTopicsDelta = topicsCreateDelta(localId, true)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(0, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
// Apply changes that remove topic and replica
val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeMetadataImage, removeTopicsDelta)
// Check that the partition was removed
assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition))
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
assertEquals(None, replicaManager.logManager.getLog(topicPartition))
} finally {
replicaManager.shutdown()
}
TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
}
@Test @Test
def testDeltaToFollowerCompletesProduce(): Unit = { def testDeltaToFollowerCompletesProduce(): Unit = {
val localId = 1 val localId = 1
val otherId = localId + 1 val otherId = localId + 1
val numOfRecords = 3 val numOfRecords = 3
val epoch = 100
val topicPartition = new TopicPartition("foo", 0) val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try { try {
// Make the local replica the leader // Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) val leaderTopicsDelta = topicsCreateDelta(localId, true)
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader) assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch) assertEquals(0, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
@ -3046,8 +3123,9 @@ class ReplicaManagerTest {
val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords)
// Change the local replica to follower // Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check that the produce failed because it changed to follower before replicating // Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error)
@ -3062,20 +3140,20 @@ class ReplicaManagerTest {
def testDeltaToFollowerCompletesFetch(): Unit = { def testDeltaToFollowerCompletesFetch(): Unit = {
val localId = 1 val localId = 1
val otherId = localId + 1 val otherId = localId + 1
val epoch = 100
val topicPartition = new TopicPartition("foo", 0) val topicPartition = new TopicPartition("foo", 0)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId)
try { try {
// Make the local replica the leader // Make the local replica the leader
val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) val leaderTopicsDelta = topicsCreateDelta(localId, true)
replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderMetadataImage, leaderTopicsDelta)
// Check the state of that partition and fetcher // Check the state of that partition and fetcher
val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition)
assertTrue(leaderPartition.isLeader) assertTrue(leaderPartition.isLeader)
assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
assertEquals(epoch, leaderPartition.getLeaderEpoch) assertEquals(0, leaderPartition.getLeaderEpoch)
assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition))
@ -3091,8 +3169,9 @@ class ReplicaManagerTest {
) )
// Change the local replica to follower // Change the local replica to follower
val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) val followerTopicsDelta = topicsChangeDelta(leaderMetadataImage.topics(), localId, false)
replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
// Check that the produce failed because it changed to follower before replicating // Check that the produce failed because it changed to follower before replicating
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error)
@ -3103,34 +3182,44 @@ class ReplicaManagerTest {
TestUtils.assertNoNonDaemonThreads(this.getClass.getName) TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
} }
private def topicsImage(replica: Int, isLeader: Boolean, epoch: Int): TopicsImage = {
val leader = if (isLeader) replica else replica + 1
val topicsById = new util.HashMap[Uuid, TopicImage]()
val topicsByName = new util.HashMap[String, TopicImage]()
val fooPartitions = new util.HashMap[Integer, PartitionRegistration]()
fooPartitions.put(0, new PartitionRegistration(Array(replica, replica + 1),
Array(replica, replica + 1), Replicas.NONE, Replicas.NONE, leader, epoch, epoch))
val foo = new TopicImage("foo", FOO_UUID, fooPartitions)
topicsById.put(FOO_UUID, foo) private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
topicsByName.put("foo", foo) val leader = if (isStartIdLeader) startId else startId + 1
new TopicsImage(topicsById, topicsByName)
}
private def topicsDelta(replica: Int, isLeader: Boolean, epoch: Int): TopicsDelta = {
val leader = if (isLeader) replica else replica + 1
val delta = new TopicsDelta(TopicsImage.EMPTY) val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
delta.replay(new PartitionRecord().setPartitionId(0). delta.replay(
setTopicId(FOO_UUID). new PartitionRecord()
setReplicas(util.Arrays.asList(replica, replica + 1)). .setPartitionId(0)
setIsr(util.Arrays.asList(replica, replica + 1)). .setTopicId(FOO_UUID)
setRemovingReplicas(Collections.emptyList()). .setReplicas(util.Arrays.asList(startId, startId + 1))
setAddingReplicas(Collections.emptyList()). .setIsr(util.Arrays.asList(startId, startId + 1))
setLeader(leader). .setRemovingReplicas(Collections.emptyList())
setLeaderEpoch(epoch). .setAddingReplicas(Collections.emptyList())
setPartitionEpoch(epoch)) .setLeader(leader)
.setLeaderEpoch(0)
.setPartitionEpoch(0)
)
delta
}
private def topicsChangeDelta(topicsImage: TopicsImage, startId: Int, isStartIdLeader: Boolean): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(topicsImage)
delta.replay(
new PartitionChangeRecord()
.setPartitionId(0)
.setTopicId(FOO_UUID)
.setReplicas(util.Arrays.asList(startId, startId + 1))
.setIsr(util.Arrays.asList(startId, startId + 1))
.setLeader(leader)
)
delta
}
private def topicsDeleteDelta(topicsImage: TopicsImage): TopicsDelta = {
val delta = new TopicsDelta(topicsImage)
delta.replay(new RemoveTopicRecord().setTopicId(FOO_UUID))
delta delta
} }

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.image;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.PartitionRegistration;
import java.util.Set;
import java.util.Map;
public final class LocalReplicaChanges {
private final Set<TopicPartition> deletes;
private final Map<TopicPartition, PartitionInfo> leaders;
private final Map<TopicPartition, PartitionInfo> followers;
LocalReplicaChanges(
Set<TopicPartition> deletes,
Map<TopicPartition, PartitionInfo> leaders,
Map<TopicPartition, PartitionInfo> followers
) {
this.deletes = deletes;
this.leaders = leaders;
this.followers = followers;
}
public Set<TopicPartition> deletes() {
return deletes;
}
public Map<TopicPartition, PartitionInfo> leaders() {
return leaders;
}
public Map<TopicPartition, PartitionInfo> followers() {
return followers;
}
@Override
public String toString() {
return String.format(
"LocalReplicaChanges(deletes = %s, leaders = %s, followers = %s)",
deletes,
leaders,
followers
);
}
public static final class PartitionInfo {
private final Uuid topicId;
private final PartitionRegistration partition;
public PartitionInfo(Uuid topicId, PartitionRegistration partition) {
this.topicId = topicId;
this.partition = partition;
}
@Override
public String toString() {
return String.format("PartitionInfo(topicId = %s, partition = %s)", topicId, partition);
}
public Uuid topicId() {
return topicId;
}
public PartitionRegistration partition() {
return partition;
}
}
}

View File

@ -18,17 +18,17 @@
package org.apache.kafka.image; package org.apache.kafka.image;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas; import org.apache.kafka.metadata.Replicas;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Map;
import java.util.Set;
/** /**
* Represents changes to a topic in the metadata image. * Represents changes to a topic in the metadata image.
@ -93,43 +93,49 @@ public final class TopicDelta {
} }
/** /**
* Find the partitions that we are now leading, whose partition epoch has changed. * Find the partitions that have change based on the replica given.
* *
* @param brokerId The broker id. * The changes identified are:
* @return A list of (partition ID, partition registration) entries. * 1. partitions for which the broker is not a replica anymore
* 2. partitions for which the broker is now the leader
* 3. partitions for which the broker is now a follower
*
* @param brokerId the broker id
* @return the list of partitions which the broker should remove, become leader or become follower.
*/ */
public List<Entry<Integer, PartitionRegistration>> newLocalLeaders(int brokerId) { public LocalReplicaChanges localChanges(int brokerId) {
List<Entry<Integer, PartitionRegistration>> results = new ArrayList<>(); Set<TopicPartition> deletes = new HashSet<>();
for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) { Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
if (entry.getValue().leader == brokerId) { Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null ||
prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
results.add(entry);
}
}
}
return results;
}
/**
* Find the partitions that we are now following, whose partition epoch has changed.
*
* @param brokerId The broker id.
* @return A list of (partition ID, partition registration) entries.
*/
public List<Entry<Integer, PartitionRegistration>> newLocalFollowers(int brokerId) {
List<Entry<Integer, PartitionRegistration>> results = new ArrayList<>();
for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) { for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
if (entry.getValue().leader != brokerId && if (!Replicas.contains(entry.getValue().replicas, brokerId)) {
Replicas.contains(entry.getValue().replicas, brokerId)) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey()); PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || if (prevPartition != null && Replicas.contains(prevPartition.replicas, brokerId)) {
prevPartition.partitionEpoch != entry.getValue().partitionEpoch) { deletes.add(new TopicPartition(name(), entry.getKey()));
results.add(entry); }
} else if (entry.getValue().leader == brokerId) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
leaders.put(
new TopicPartition(name(), entry.getKey()),
new LocalReplicaChanges.PartitionInfo(id(), entry.getValue())
);
}
} else if (
entry.getValue().leader != brokerId &&
Replicas.contains(entry.getValue().replicas, brokerId)
) {
PartitionRegistration prevPartition = image.partitions().get(entry.getKey());
if (prevPartition == null || prevPartition.partitionEpoch != entry.getValue().partitionEpoch) {
followers.put(
new TopicPartition(name(), entry.getKey()),
new LocalReplicaChanges.PartitionInfo(id(), entry.getValue())
);
} }
} }
} }
return results;
return new LocalReplicaChanges(deletes, leaders, followers);
} }
} }

View File

@ -17,11 +17,13 @@
package org.apache.kafka.image; package org.apache.kafka.image;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.Replicas;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -162,4 +164,41 @@ public final class TopicsDelta {
public Set<Uuid> deletedTopicIds() { public Set<Uuid> deletedTopicIds() {
return deletedTopicIds; return deletedTopicIds;
} }
/**
* Find the topic partitions that have change based on the replica given.
*
* The changes identified are:
* 1. topic partitions for which the broker is not a replica anymore
* 2. topic partitions for which the broker is now the leader
* 3. topic partitions for which the broker is now a follower
*
* @param brokerId the broker id
* @return the list of topic partitions which the broker should remove, become leader or become follower.
*/
public LocalReplicaChanges localChanges(int brokerId) {
Set<TopicPartition> deletes = new HashSet<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> leaders = new HashMap<>();
Map<TopicPartition, LocalReplicaChanges.PartitionInfo> followers = new HashMap<>();
for (TopicDelta delta : changedTopics.values()) {
LocalReplicaChanges changes = delta.localChanges(brokerId);
deletes.addAll(changes.deletes());
leaders.putAll(changes.leaders());
followers.putAll(changes.followers());
}
// Add all of the removed topic partitions to the set of locally removed partitions
deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = image().getTopic(topicId);
topicImage.partitions().forEach((partitionId, prevPartition) -> {
if (Replicas.contains(prevPartition.replicas, brokerId)) {
deletes.add(new TopicPartition(topicImage.name(), partitionId));
}
});
});
return new LocalReplicaChanges(deletes, leaders, followers);
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.kafka.image; package org.apache.kafka.image;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecord;
@ -34,6 +35,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,13 +48,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40) @Timeout(value = 40)
public class TopicsImageTest { public class TopicsImageTest {
final static TopicsImage IMAGE1; static final TopicsImage IMAGE1;
static final List<ApiMessageAndVersion> DELTA1_RECORDS; static final List<ApiMessageAndVersion> DELTA1_RECORDS;
final static TopicsDelta DELTA1; static final TopicsDelta DELTA1;
final static TopicsImage IMAGE2; static final TopicsImage IMAGE2;
static final List<TopicImage> TOPIC_IMAGES1;
private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistration... partitions) { private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistration... partitions) {
Map<Integer, PartitionRegistration> partitionMap = new HashMap<>(); Map<Integer, PartitionRegistration> partitionMap = new HashMap<>();
@ -80,16 +84,19 @@ public class TopicsImageTest {
} }
static { static {
List<TopicImage> topics1 = Arrays.asList( TOPIC_IMAGES1 = Arrays.asList(
newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"), newTopicImage("foo", Uuid.fromString("ThIaNwRnSM2Nt9Mx1v0RvA"),
new PartitionRegistration(new int[] {2, 3, 4}, new PartitionRegistration(new int[] {2, 3, 4},
new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345), new int[] {2, 3}, Replicas.NONE, Replicas.NONE, 2, 1, 345),
new PartitionRegistration(new int[] {3, 4, 5}, new PartitionRegistration(new int[] {3, 4, 5},
new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684)), new int[] {3, 4, 5}, Replicas.NONE, Replicas.NONE, 3, 4, 684),
new PartitionRegistration(new int[] {2, 4, 5},
new int[] {2, 4, 5}, Replicas.NONE, Replicas.NONE, 2, 10, 84)),
newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"), newTopicImage("bar", Uuid.fromString("f62ptyETTjet8SL5ZeREiw"),
new PartitionRegistration(new int[] {0, 1, 2, 3, 4}, new PartitionRegistration(new int[] {0, 1, 2, 3, 4},
new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345))); new int[] {0, 1, 2, 3}, new int[] {1}, new int[] {3, 4}, 0, 1, 345)));
IMAGE1 = new TopicsImage(newTopicsByIdMap(topics1), newTopicsByNameMap(topics1));
IMAGE1 = new TopicsImage(newTopicsByIdMap(TOPIC_IMAGES1), newTopicsByNameMap(TOPIC_IMAGES1));
DELTA1_RECORDS = new ArrayList<>(); DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord(). DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveTopicRecord().
@ -126,6 +133,220 @@ public class TopicsImageTest {
IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2));
} }
private ApiMessageAndVersion newPartitionRecord(Uuid topicId, int partitionId, List<Integer> replicas) {
return new ApiMessageAndVersion(
new PartitionRecord()
.setPartitionId(partitionId)
.setTopicId(topicId)
.setReplicas(replicas)
.setIsr(replicas)
.setLeader(replicas.get(0))
.setLeaderEpoch(1)
.setPartitionEpoch(1),
PARTITION_RECORD.highestSupportedVersion()
);
}
private PartitionRegistration newPartition(int[] replicas) {
return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1);
}
@Test
public void testBasicLocalChanges() {
int localId = 3;
/* Changes already include in DELTA1_RECORDS and IMAGE1:
* foo - topic id deleted
* bar-0 - stay as follower with different partition epoch
* baz-0 - new topic to leader
*/
List<ApiMessageAndVersion> topicRecords = new ArrayList<>(DELTA1_RECORDS);
// Create a new foo topic with a different id
Uuid newFooId = Uuid.fromString("b66ybsWIQoygs01vdjH07A");
topicRecords.add(
new ApiMessageAndVersion(
new TopicRecord().setName("foo") .setTopicId(newFooId),
TOPIC_RECORD.highestSupportedVersion()
)
);
topicRecords.add(newPartitionRecord(newFooId, 0, Arrays.asList(0, 1, 2)));
topicRecords.add(newPartitionRecord(newFooId, 1, Arrays.asList(0, 1, localId)));
// baz-1 - new partion to follower
topicRecords.add(
new ApiMessageAndVersion(
new PartitionRecord()
.setPartitionId(1)
.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
.setReplicas(Arrays.asList(4, 2, localId))
.setIsr(Arrays.asList(4, 2, localId))
.setLeader(4)
.setLeaderEpoch(2)
.setPartitionEpoch(1),
PARTITION_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(IMAGE1);
RecordTestUtils.replayAll(delta, topicRecords);
LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1))),
changes.deletes()
);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
changes.leaders().keySet()
);
assertEquals(
new HashSet<>(
Arrays.asList(new TopicPartition("baz", 1), new TopicPartition("bar", 0), new TopicPartition("foo", 1))
),
changes.followers().keySet()
);
}
@Test
public void testDeleteAfterChanges() {
int localId = 3;
Uuid zooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"zoo",
zooId,
newPartition(new int[] {localId, 1, 2})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
// leader to follower
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(zooId).setPartitionId(0).setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// remove zoo topic
topicRecords.add(
new ApiMessageAndVersion(
new RemoveTopicRecord().setTopicId(zooId),
REMOVE_TOPIC_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes());
assertEquals(Collections.emptyMap(), changes.leaders());
assertEquals(Collections.emptyMap(), changes.followers());
}
@Test
public void testLocalReassignmentChanges() {
int localId = 3;
Uuid zooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
List<TopicImage> topics = new ArrayList<>();
topics.add(
newTopicImage(
"zoo",
zooId,
newPartition(new int[] {0, 1, localId}),
newPartition(new int[] {localId, 1, 2}),
newPartition(new int[] {0, 1, localId}),
newPartition(new int[] {localId, 1, 2}),
newPartition(new int[] {0, 1, 2}),
newPartition(new int[] {0, 1, 2})
)
);
TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics));
List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
// zoo-0 - follower to leader
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(zooId).setPartitionId(0).setLeader(localId),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// zoo-1 - leader to follower
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(zooId).setPartitionId(1).setLeader(1),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// zoo-2 - follower to removed
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(zooId)
.setPartitionId(2)
.setIsr(Arrays.asList(0, 1, 2))
.setReplicas(Arrays.asList(0, 1, 2)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// zoo-3 - leader to removed
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(zooId)
.setPartitionId(3)
.setLeader(0)
.setIsr(Arrays.asList(0, 1, 2))
.setReplicas(Arrays.asList(0, 1, 2)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// zoo-4 - not replica to leader
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(zooId)
.setPartitionId(4)
.setLeader(localId)
.setIsr(Arrays.asList(localId, 1, 2))
.setReplicas(Arrays.asList(localId, 1, 2)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
// zoo-5 - not replica to follower
topicRecords.add(
new ApiMessageAndVersion(
new PartitionChangeRecord()
.setTopicId(zooId)
.setPartitionId(5)
.setIsr(Arrays.asList(0, 1, localId))
.setReplicas(Arrays.asList(0, 1, localId)),
PARTITION_CHANGE_RECORD.highestSupportedVersion()
)
);
TopicsDelta delta = new TopicsDelta(image);
RecordTestUtils.replayAll(delta, topicRecords);
LocalReplicaChanges changes = delta.localChanges(localId);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 2), new TopicPartition("zoo", 3))),
changes.deletes()
);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0), new TopicPartition("zoo", 4))),
changes.leaders().keySet()
);
assertEquals(
new HashSet<>(Arrays.asList(new TopicPartition("zoo", 1), new TopicPartition("zoo", 5))),
changes.followers().keySet()
);
}
@Test @Test
public void testEmptyImageRoundTrip() throws Throwable { public void testEmptyImageRoundTrip() throws Throwable {
testToImageAndBack(TopicsImage.EMPTY); testToImageAndBack(TopicsImage.EMPTY);