mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: Support KRaft in ReplicaFetchTest (#12345)
Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
		
							parent
							
								
									1ceaf30039
								
							
						
					
					
						commit
						a3c7017ff7
					
				| 
						 | 
				
			
			@ -17,43 +17,38 @@
 | 
			
		|||
 | 
			
		||||
package kafka.server
 | 
			
		||||
 | 
			
		||||
import scala.collection.Seq
 | 
			
		||||
 | 
			
		||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 | 
			
		||||
import kafka.server.QuorumTestHarness
 | 
			
		||||
import kafka.utils.TestUtils
 | 
			
		||||
import org.junit.jupiter.api.AfterEach
 | 
			
		||||
import kafka.utils.{TestInfoUtils, TestUtils}
 | 
			
		||||
import TestUtils._
 | 
			
		||||
import kafka.api.IntegrationTestHarness
 | 
			
		||||
import org.apache.kafka.clients.producer.ProducerRecord
 | 
			
		||||
import org.apache.kafka.common.TopicPartition
 | 
			
		||||
import org.apache.kafka.common.serialization.StringSerializer
 | 
			
		||||
import org.junit.jupiter.params.ParameterizedTest
 | 
			
		||||
import org.junit.jupiter.params.provider.ValueSource
 | 
			
		||||
 | 
			
		||||
class ReplicaFetchTest extends QuorumTestHarness  {
 | 
			
		||||
  var brokers: Seq[KafkaServer] = null
 | 
			
		||||
class ReplicaFetchTest extends IntegrationTestHarness {
 | 
			
		||||
  val topic1 = "foo"
 | 
			
		||||
  val topic2 = "bar"
 | 
			
		||||
 | 
			
		||||
  @BeforeEach
 | 
			
		||||
  override def setUp(testInfo: TestInfo): Unit = {
 | 
			
		||||
    super.setUp(testInfo)
 | 
			
		||||
    val props = createBrokerConfigs(2, zkConnect)
 | 
			
		||||
    brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_))
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @AfterEach
 | 
			
		||||
  override def tearDown(): Unit = {
 | 
			
		||||
    TestUtils.shutdownServers(brokers)
 | 
			
		||||
    super.tearDown()
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  def testReplicaFetcherThread(): Unit = {
 | 
			
		||||
  override def brokerCount: Int = 2
 | 
			
		||||
 | 
			
		||||
  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
 | 
			
		||||
  @ValueSource(strings = Array("zk", "kraft"))
 | 
			
		||||
  def testReplicaFetcherThread(quorum: String): Unit = {
 | 
			
		||||
    val partition = 0
 | 
			
		||||
    val testMessageList1 = List("test1", "test2", "test3", "test4")
 | 
			
		||||
    val testMessageList2 = List("test5", "test6", "test7", "test8")
 | 
			
		||||
 | 
			
		||||
    // create a topic and partition and await leadership
 | 
			
		||||
    for (topic <- List(topic1,topic2)) {
 | 
			
		||||
      createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
 | 
			
		||||
      createTopic(topic, replicationFactor = 2)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // send test messages to leader
 | 
			
		||||
| 
						 | 
				
			
			@ -69,9 +64,9 @@ class ReplicaFetchTest extends QuorumTestHarness  {
 | 
			
		|||
      var result = true
 | 
			
		||||
      for (topic <- List(topic1, topic2)) {
 | 
			
		||||
        val tp = new TopicPartition(topic, partition)
 | 
			
		||||
        val expectedOffset = brokers.head.getLogManager.getLog(tp).get.logEndOffset
 | 
			
		||||
        val expectedOffset = brokers.head.logManager.getLog(tp).get.logEndOffset
 | 
			
		||||
        result = result && expectedOffset > 0 && brokers.forall { item =>
 | 
			
		||||
          expectedOffset == item.getLogManager.getLog(tp).get.logEndOffset
 | 
			
		||||
          expectedOffset == item.logManager.getLog(tp).get.logEndOffset
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      result
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue