diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 964da379778..5e7e7bf385c 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -74,11 +74,12 @@ class DelayedProduce(delayMs: Long, * The delayed produce operation can be completed if every partition * it produces to is satisfied by one of the following: * - * Case A: This broker is no longer the leader: set an error in response - * Case B: This broker is the leader: - * B.1 - If there was a local error thrown while checking if at least requiredAcks + * Case A: Replica not assigned to partition + * Case B: Replica is no longer the leader of this partition + * Case C: This broker is the leader: + * C.1 - If there was a local error thrown while checking if at least requiredAcks * replicas have caught up to this operation: set an error in response - * B.2 - Otherwise, set the response with no error. + * C.2 - Otherwise, set the response with no error. */ override def tryComplete(): Boolean = { // check for each partition if it still has pending acks @@ -95,7 +96,7 @@ class DelayedProduce(delayMs: Long, partition.checkEnoughReplicasReachOffset(status.requiredOffset) } - // Case B.1 || B.2 + // Case B || C.1 || C.2 if (error != Errors.NONE || hasEnough) { status.acksPending = false status.responseStatus.error = error @@ -103,7 +104,7 @@ class DelayedProduce(delayMs: Long, } } - // check if every partition has satisfied at least one of case A or B + // check if every partition has satisfied at least one of case A, B or C if (!produceMetadata.produceStatus.values.exists(_.acksPending)) forceComplete() else diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e2f65fd2022..be835a82fdf 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2148,8 +2148,8 @@ class ReplicaManager(val config: KafkaConfig, if (localLog(tp).isEmpty) markPartitionOffline(tp) } - newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded(_)) - newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded(_)) + newLocalLeaders.keySet.foreach(markPartitionOfflineIfNeeded) + newLocalFollowers.keySet.foreach(markPartitionOfflineIfNeeded) replicaFetcherManager.shutdownIdleFetcherThreads() replicaAlterLogDirsManager.shutdownIdleFetcherThreads() @@ -2193,7 +2193,6 @@ class ReplicaManager(val config: KafkaConfig, newLocalFollowers: mutable.HashMap[TopicPartition, LocalLeaderInfo]): Unit = { stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} partition(s) to " + "local followers.") - replicaFetcherManager.removeFetcherForPartitions(newLocalFollowers.keySet) val shuttingDown = isShuttingDown.get() val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, InitialFetchState] val newFollowerTopicSet = new mutable.HashSet[String] @@ -2202,13 +2201,6 @@ class ReplicaManager(val config: KafkaConfig, try { newFollowerTopicSet.add(tp.topic()) - completeDelayedFetchOrProduceRequests(tp) - - // Create the local replica even if the leader is unavailable. This is required - // to ensure that we include the partition's high watermark in the checkpoint - // file (see KAFKA-1647) - partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId)) - if (shuttingDown) { stateChangeLogger.trace(s"Unable to start fetching ${tp} with topic " + s"ID ${info.topicId} because the replica manager is shutting down.") @@ -2216,15 +2208,30 @@ class ReplicaManager(val config: KafkaConfig, val listenerName = config.interBrokerListenerName.value() val leader = info.partition.leader Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala) match { - case None => stateChangeLogger.trace(s"Unable to start fetching ${tp} " + - s"with topic ID ${info.topicId} from leader ${leader} because it is not " + - "alive.") + case None => + stateChangeLogger.trace( + s"Unable to start fetching ${tp} with topic ID ${info.topicId} from leader " + + s"${leader} because it is not alive." + ) + + // Create the local replica even if the leader is unavailable. This is required + // to ensure that we include the partition's high watermark in the checkpoint + // file (see KAFKA-1647) + partition.createLogIfNotExists(isNew, false, offsetCheckpoints, Some(info.topicId)) case Some(node) => - val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port()) - val log = partition.localLogOrException - val fetchOffset = initialFetchOffset(log) - partitionsToMakeFollower.put(tp, - InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) + val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) + if (partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))) { + val leaderEndPoint = new BrokerEndPoint(node.id(), node.host(), node.port()) + val log = partition.localLogOrException + val fetchOffset = initialFetchOffset(log) + partitionsToMakeFollower.put(tp, + InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) + } else { + stateChangeLogger.info( + s"Skipped the become-follower state change after marking its partition as " + + s"follower for partition $tp with id ${info.topicId} and partition state $state." + ) + } } } changedPartitions.add(partition) @@ -2235,8 +2242,16 @@ class ReplicaManager(val config: KafkaConfig, } } } - updateLeaderAndFollowerMetrics(newFollowerTopicSet) + + replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet) + stateChangeLogger.info(s"Stopped fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions") + replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower) + stateChangeLogger.info(s"Started fetchers as part of become-follower for ${partitionsToMakeFollower.size} partitions") + + partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests) + + updateLeaderAndFollowerMetrics(newFollowerTopicSet) } def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bcc8ac1d5be..90fc8004686 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -17,6 +17,14 @@ package kafka.server +import java.io.File +import java.net.InetAddress +import java.nio.file.Files +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.stream.IntStream +import java.util.{Collections, Optional, Properties} import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition} import kafka.log._ @@ -30,7 +38,9 @@ import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState +import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record._ import org.apache.kafka.common.replica.ClientMetadata @@ -41,25 +51,14 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid} +import org.apache.kafka.image.{ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, TopicImage, TopicsDelta, TopicsImage } +import org.apache.kafka.metadata.{PartitionRegistration, Replicas} import org.easymock.EasyMock import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.mockito.{ArgumentMatchers, Mockito} - -import java.io.File -import java.net.InetAddress -import java.nio.file.Files -import java.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Collections, Optional, Properties} -import org.apache.kafka.common.metadata.{PartitionRecord, RemoveTopicRecord, TopicRecord} -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.image.{TopicImage, TopicsDelta, TopicsImage} -import org.apache.kafka.metadata.{PartitionRegistration, Replicas} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer - +import org.mockito.{ArgumentMatchers, Mockito} import scala.collection.{Map, Seq, mutable} import scala.jdk.CollectionConverters._ @@ -1009,31 +1008,37 @@ class ReplicaManagerTest { topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = expectTruncation, localLogOffset = Some(10), extraProps = extraProps, topicId = Some(topicId)) - // Initialize partition state to follower, with leader = 1, leaderEpoch = 1 - val tp = new TopicPartition(topic, topicPartition) - val partition = replicaManager.createPartition(tp) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - partition.makeFollower( - leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), - offsetCheckpoints, - None) + try { + // Initialize partition state to follower, with leader = 1, leaderEpoch = 1 + val tp = new TopicPartition(topic, topicPartition) + val partition = replicaManager.createPartition(tp) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + partition.makeFollower( + leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds), + offsetCheckpoints, + None) - // Make local partition a follower - because epoch increased by more than 1, truncation should - // trigger even though leader does not change - leaderEpoch += leaderEpochIncrement - val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, - controllerId, controllerEpoch, brokerEpoch, - Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava, - Collections.singletonMap(topic, topicId), - Set(new Node(followerBrokerId, "host1", 0), - new Node(leaderBrokerId, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0, - (_, followers) => assertEquals(followerBrokerId, followers.head.partitionId)) - assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS)) + // Make local partition a follower - because epoch increased by more than 1, truncation should + // trigger even though leader does not change + leaderEpoch += leaderEpochIncrement + val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, controllerEpoch, brokerEpoch, + Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, aliveBrokerIds)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(followerBrokerId, "host1", 0), + new Node(leaderBrokerId, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0, + (_, followers) => assertEquals(followerBrokerId, followers.head.partitionId)) + assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS)) - // Truncation should have happened once - EasyMock.verify(mockLogMgr) + // Truncation should have happened once + EasyMock.verify(mockLogMgr) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1085,40 +1090,46 @@ class ReplicaManagerTest { topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) - val brokerList = Seq[Integer](0, 1).asJava + try { + val brokerList = Seq[Integer](0, 1).asJava - val tp0 = new TopicPartition(topic, 0) + val tp0 = new TopicPartition(topic, 0) - initializeLogAndTopicId(replicaManager, tp0, topicId) + initializeLogAndTopicId(replicaManager, tp0, topicId) - // Make this replica the follower - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(1) - .setIsr(brokerList) - .setZkVersion(0) - .setReplicas(brokerList) - .setIsNew(false)).asJava, - Collections.singletonMap(topic, topicId), - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) - val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", - InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") - val consumerResult = fetchAsConsumer(replicaManager, tp0, - new PartitionData(0, 0, 100000, Optional.empty()), - clientMetadata = Some(metadata)) + val consumerResult = fetchAsConsumer(replicaManager, tp0, + new PartitionData(0, 0, 100000, Optional.empty()), + clientMetadata = Some(metadata)) - // Fetch from follower succeeds - assertTrue(consumerResult.isFired) + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) - // But only leader will compute preferred replica - assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) + // But only leader will compute preferred replica + assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1136,40 +1147,46 @@ class ReplicaManagerTest { topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, expectTruncation = true, topicId = Some(topicId)) - val brokerList = Seq[Integer](0, 1).asJava + try { + val brokerList = Seq[Integer](0, 1).asJava - val tp0 = new TopicPartition(topic, 0) + val tp0 = new TopicPartition(topic, 0) - initializeLogAndTopicId(replicaManager, tp0, topicId) + initializeLogAndTopicId(replicaManager, tp0, topicId) - // Make this replica the follower - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(brokerList) - .setZkVersion(0) - .setReplicas(brokerList) - .setIsNew(false)).asJava, - Collections.singletonMap(topic, topicId), - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) - val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", - InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") + val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default") - val consumerResult = fetchAsConsumer(replicaManager, tp0, - new PartitionData(0, 0, 100000, Optional.empty()), - clientMetadata = Some(metadata)) + val consumerResult = fetchAsConsumer(replicaManager, tp0, + new PartitionData(0, 0, 100000, Optional.empty()), + clientMetadata = Some(metadata)) - // Fetch from follower succeeds - assertTrue(consumerResult.isFired) + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) - // Returns a preferred replica (should just be the leader, which is None) - assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) + // Returns a preferred replica (should just be the leader, which is None) + assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1281,39 +1298,45 @@ class ReplicaManagerTest { def testFetchFollowerNotAllowedForOlderClients(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1)) - val tp0 = new TopicPartition(topic, 0) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + try { + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(0) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) - val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") - var partitionData = new FetchRequest.PartitionData(0L, 0L, 100, - Optional.of(0)) - var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) - assertNotNull(fetchResult.get) - assertEquals(Errors.NONE, fetchResult.get.error) + // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+) + val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") + var partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(0)) + var fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata)) + assertNotNull(fetchResult.get) + assertEquals(Errors.NONE, fetchResult.get.error) - // Fetch from follower, with empty ClientMetadata (which implies an older version) - partitionData = new FetchRequest.PartitionData(0L, 0L, 100, - Optional.of(0)) - fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None) - assertNotNull(fetchResult.get) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + // Fetch from follower, with empty ClientMetadata (which implies an older version) + partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(0)) + fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None) + assertNotNull(fetchResult.get) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1367,49 +1390,55 @@ class ReplicaManagerTest { val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) - val tp0 = new TopicPartition(topic, 0) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava + try { + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, - Optional.empty()) - val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) - assertNull(fetchResult.get) + val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.empty()) + val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) + assertNull(fetchResult.get) - // Become a follower and ensure that the delayed fetch returns immediately - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(2) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + // Become a follower and ensure that the delayed fetch returns immediately + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(2) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - assertNotNull(fetchResult.get) - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + assertNotNull(fetchResult.get) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1417,50 +1446,56 @@ class ReplicaManagerTest { val mockTimer = new MockTimer(time) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1)) - val tp0 = new TopicPartition(topic, 0) - val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) - replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava + try { + val tp0 = new TopicPartition(topic, 0) + val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints) + replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) + val partition0Replicas = Seq[Integer](0, 1).asJava - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(1) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") - val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, - Optional.of(1)) - val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata), timeout = 10) - assertNull(fetchResult.get) + val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") + val partitionData = new FetchRequest.PartitionData(0L, 0L, 100, + Optional.of(1)) + val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, Some(clientMetadata), timeout = 10) + assertNull(fetchResult.get) - // Become a follower and ensure that the delayed fetch returns immediately - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, - Seq(new LeaderAndIsrPartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(2) - .setIsr(partition0Replicas) - .setZkVersion(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + // Become a follower and ensure that the delayed fetch returns immediately + val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(tp0.topic) + .setPartitionIndex(tp0.partition) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(2) + .setIsr(partition0Replicas) + .setZkVersion(0) + .setReplicas(partition0Replicas) + .setIsNew(true)).asJava, + topicIds.asJava, + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) - assertNotNull(fetchResult.get) - assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error) + assertNotNull(fetchResult.get) + assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.get.error) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } @Test @@ -1573,7 +1608,7 @@ class ReplicaManagerTest { Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) - val produceResult = sendProducerAppend(replicaManager, tp0) + val produceResult = sendProducerAppend(replicaManager, tp0, 3) assertNull(produceResult.get) Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) @@ -1588,17 +1623,22 @@ class ReplicaManagerTest { assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error) } - private def sendProducerAppend(replicaManager: ReplicaManager, - topicPartition: TopicPartition): AtomicReference[PartitionResponse] = { + private def sendProducerAppend( + replicaManager: ReplicaManager, + topicPartition: TopicPartition, + numOfRecords: Int + ): AtomicReference[PartitionResponse] = { val produceResult = new AtomicReference[PartitionResponse]() def callback(response: Map[TopicPartition, PartitionResponse]): Unit = { produceResult.set(response(topicPartition)) } - val records = MemoryRecords.withRecords(CompressionType.NONE, - new SimpleRecord("a".getBytes()), - new SimpleRecord("b".getBytes()), - new SimpleRecord("c".getBytes()) + val records = MemoryRecords.withRecords( + CompressionType.NONE, + IntStream + .range(0, numOfRecords) + .mapToObj(i => new SimpleRecord(i.toString.getBytes)) + .toArray(Array.ofDim[SimpleRecord]): _* ) replicaManager.appendRecords( @@ -2828,4 +2868,280 @@ class ReplicaManagerTest { Replicas.NONE, Replicas.NONE, 2, 123, 456)))), replicaManager.calculateDeltaChanges(TEST_DELTA)) } + + @Test + def testDeltaFromLeaderToFollower(): Unit = { + val localId = 1 + val otherId = localId + 1 + val numOfRecords = 3 + val epoch = 100 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the leader + val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) + replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) + + // Check the state of that partition and fetcher + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) + assertEquals(epoch, leaderPartition.getLeaderEpoch) + + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + + // Send a produce request and advance the highwatermark + val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) + fetchMessages( + replicaManager, + otherId, + topicPartition, + new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()), + Int.MaxValue, + IsolationLevel.READ_UNCOMMITTED, + None + ) + assertEquals(Errors.NONE, leaderResponse.get.error) + + // Change the local replica to follower + val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) + + // Append on a follower should fail + val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(epoch + 1, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaFromFollowerToLeader(): Unit = { + val localId = 1 + val otherId = localId + 1 + val numOfRecords = 3 + val epoch = 100 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the follower + val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch)) + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(epoch, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Append on a follower should fail + val followerResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error) + + // Change the local replica to leader + val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch + 1)) + replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch + 1)) + + // Send a produce request and advance the highwatermark + val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) + fetchMessages( + replicaManager, + otherId, + topicPartition, + new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()), + Int.MaxValue, + IsolationLevel.READ_UNCOMMITTED, + None + ) + assertEquals(Errors.NONE, leaderResponse.get.error) + + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) + assertEquals(epoch + 1, leaderPartition.getLeaderEpoch) + + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaFollowerWithNoChange(): Unit = { + val localId = 1 + val otherId = localId + 1 + val epoch = 100 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the follower + val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch)) + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(epoch, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply the same delta again + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch)) + + // Check that the state stays the same + val HostedPartition.Online(noChangePartition) = replicaManager.getPartition(topicPartition) + assertFalse(noChangePartition.isLeader) + assertEquals(epoch, noChangePartition.getLeaderEpoch) + + val noChangeFetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), noChangeFetcher.map(_.sourceBroker)) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaToFollowerCompletesProduce(): Unit = { + val localId = 1 + val otherId = localId + 1 + val numOfRecords = 3 + val epoch = 100 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the leader + val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) + replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) + + // Check the state of that partition and fetcher + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) + assertEquals(epoch, leaderPartition.getLeaderEpoch) + + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + + // Send a produce request + val leaderResponse = sendProducerAppend(replicaManager, topicPartition, numOfRecords) + + // Change the local replica to follower + val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) + + // Check that the produce failed because it changed to follower before replicating + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, leaderResponse.get.error) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaToFollowerCompletesFetch(): Unit = { + val localId = 1 + val otherId = localId + 1 + val epoch = 100 + val topicPartition = new TopicPartition("foo", 0) + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + + try { + // Make the local replica the leader + val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch)) + replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, epoch)) + + // Check the state of that partition and fetcher + val HostedPartition.Online(leaderPartition) = replicaManager.getPartition(topicPartition) + assertTrue(leaderPartition.isLeader) + assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds) + assertEquals(epoch, leaderPartition.getLeaderEpoch) + + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + + // Send a fetch request + val fetchCallback = fetchMessages( + replicaManager, + otherId, + topicPartition, + new PartitionData(0, 0, Int.MaxValue, Optional.empty()), + Int.MaxValue, + IsolationLevel.READ_UNCOMMITTED, + None + ) + + // Change the local replica to follower + val followerMetadataImage = imageFromTopics(topicsImage(localId, false, epoch + 1)) + replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, false, epoch + 1)) + + // Check that the produce failed because it changed to follower before replicating + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchCallback.assertFired.error) + } finally { + replicaManager.shutdown() + } + + TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + private def topicsImage(replica: Int, isLeader: Boolean, epoch: Int): TopicsImage = { + val leader = if (isLeader) replica else replica + 1 + val topicsById = new util.HashMap[Uuid, TopicImage]() + val topicsByName = new util.HashMap[String, TopicImage]() + val fooPartitions = new util.HashMap[Integer, PartitionRegistration]() + fooPartitions.put(0, new PartitionRegistration(Array(replica, replica + 1), + Array(replica, replica + 1), Replicas.NONE, Replicas.NONE, leader, epoch, epoch)) + val foo = new TopicImage("foo", FOO_UUID, fooPartitions) + + topicsById.put(FOO_UUID, foo) + topicsByName.put("foo", foo) + + new TopicsImage(topicsById, topicsByName) + } + + private def topicsDelta(replica: Int, isLeader: Boolean, epoch: Int): TopicsDelta = { + val leader = if (isLeader) replica else replica + 1 + val delta = new TopicsDelta(TopicsImage.EMPTY) + delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) + delta.replay(new PartitionRecord().setPartitionId(0). + setTopicId(FOO_UUID). + setReplicas(util.Arrays.asList(replica, replica + 1)). + setIsr(util.Arrays.asList(replica, replica + 1)). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setLeader(leader). + setLeaderEpoch(epoch). + setPartitionEpoch(epoch)) + + delta + } + + private def imageFromTopics(topicsImage: TopicsImage): MetadataImage = { + new MetadataImage( + FeaturesImage.EMPTY, + ClusterImageTest.IMAGE1, + topicsImage, + ConfigurationsImage.EMPTY, + ClientQuotasImage.EMPTY + ) + } } diff --git a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java index 703a6e165fd..6908cf2a78b 100644 --- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java @@ -46,7 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 40) public class ClusterImageTest { - final static ClusterImage IMAGE1; + public final static ClusterImage IMAGE1; static final List DELTA1_RECORDS;