KAFKA-15780: Wait for consistent KRaft metadata when creating or deleting topics (#14695)

TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the ReplicaManager will be updated before the metadata cache.

In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized.

Testing:
Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after

      // Publish the new metadata image to the metadata cache.
      metadataCache.setImage(newImage)
and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test will fail locally nearly deterministically. After the change(s), the test no longer fails.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Mao 2023-11-06 17:07:56 -08:00 committed by GitHub
parent 505e5b3eaa
commit c6ea0a84ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 70 additions and 47 deletions

View File

@ -79,7 +79,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ -91,7 +91,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ -103,7 +103,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ -115,7 +115,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
verifyRemoteLogTopicConfigs(topicConfig)
}
@ -128,7 +128,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ -140,7 +140,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ -151,7 +151,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")
assertThrowsException(classOf[InvalidConfigurationException], () =>
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig))
}
@ -160,7 +160,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = {
val admin = createAdminClient()
val topicConfig = new Properties()
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
@ -181,11 +181,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfigWithRemoteStorage = new Properties()
topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
val message = assertThrowsException(classOf[InvalidConfigurationException],
() => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions,
() => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions,
numReplicationFactor, topicConfig = topicConfigWithRemoteStorage))
assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker"))
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor)
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
Collections.singleton(
@ -203,7 +203,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
@ -224,7 +224,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]()
@ -245,7 +245,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention ms is 1000
@ -265,7 +265,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val admin = createAdminClient()
val topicConfig = new Properties()
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor,
TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor,
topicConfig = topicConfig)
// inherited local retention bytes is 1024
@ -288,9 +288,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers)
assertThrowsException(classOf[UnknownTopicOrPartitionException],
() => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted")
TestUtils.waitUntilTrue(() =>
@ -304,7 +304,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
@ -326,7 +326,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount,
topicConfig = topicConfig)
val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head

View File

@ -154,7 +154,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// create topic
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
// send a normal record
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8),
@ -208,7 +208,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
timeoutMs: Long = 20000L): Unit = {
val partition = 0
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8),
@ -263,7 +263,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")
else
topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime")
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps)
val recordAndFutures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8),
@ -296,7 +296,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
try {
// create topic
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
// non-blocking send a list of records
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8),
@ -329,7 +329,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer()
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2)
val partition = 1
val now = System.currentTimeMillis()
@ -373,7 +373,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val replicas = List(0, follower)
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 -> replicas))
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 3, Map(0 -> replicas))
val partition = 0
val now = System.currentTimeMillis()
@ -422,7 +422,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(maxBlockMs = 5 * 1000L)
// create topic
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
val partition0 = 0
var futures0 = (1 to numRecords).map { i =>
@ -479,7 +479,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
def testFlush(quorum: String): Unit = {
val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue)
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic,
"value".getBytes(StandardCharsets.UTF_8))
for (_ <- 0 until 50) {
@ -499,7 +499,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null,
@ -525,7 +525,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2)
val partition = 0
consumer.assign(List(new TopicPartition(topic, partition)).asJava)
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8))

View File

@ -167,7 +167,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// create the test topic with all the brokers as replicas
val superuserAdminClient = createSuperuserAdminClient()
TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers,
TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, controllers = controllerServers,
numPartitions = 1, replicationFactor = 3, topicConfig = new Properties)
}

View File

@ -2692,11 +2692,11 @@ object PlaintextAdminIntegrationTest {
// Create topics
val topic1 = "invalid-alter-configs-topic-1"
val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1)
createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1, replicationFactor = 1)
createTopicWithAdmin(admin, topic1, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1)
val topic2 = "invalid-alter-configs-topic-2"
val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2)
createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1, replicationFactor = 1)
createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1)
val topicConfigEntries1 = Seq(
new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), // this value is invalid as it's above 1.0

View File

@ -66,7 +66,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
val producer = createProducer(batchSize = 0)
val numRecords = 10;
try {
TestUtils.createTopicWithAdmin(admin, topic, brokers, 2)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2)
val futures = for (i <- 1 to numRecords) yield {
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, s"value$i".getBytes(StandardCharsets.UTF_8))
producer.send(record)
@ -128,7 +128,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// set the TopicConfig for timestamp validation to have 1 minute threshold. Note that recordTimestamp has 5 minutes diff
val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
try {
@ -157,7 +157,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// set the TopicConfig for timestamp validation to be the same as the record timestamp
topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps)
val producer = createProducer()
@ -178,7 +178,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
// set the TopicConfig for timestamp validation to have 10 minute threshold. Note that recordTimestamp has 5 minutes diff
val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L
topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString)
TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps)
val producer = createProducer()

View File

@ -87,7 +87,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
val admin = TestUtils.createAdminClient(Seq(broker),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
try {
TestUtils.createTopicWithAdmin(admin, topic, Seq(broker))
TestUtils.createTopicWithAdmin(admin, topic, Seq(broker), controllerServers)
} finally {
admin.close()
}

View File

@ -156,8 +156,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
createAdminClient(SecurityProtocol.SSL, SecureInternal)
TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers,
numPartitions = servers.head.config.offsetsTopicPartitions,
replicationFactor = numServers,
topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs)
@ -356,7 +356,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
@ValueSource(strings = Array("zk", "kraft"))
def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions, replicationFactor = numServers)
// Start a producer and consumer that work with the current broker keystore.
// This should continue working while changes are made
@ -578,7 +578,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val topic2 = "testtopic2"
val topicProps = new Properties
topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
var (logOpt, found) = TestUtils.computeUntilTrue {

View File

@ -65,6 +65,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
admin,
topic,
brokers,
controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
TestUtils.waitUntilLeaderIsKnown(brokers, new TopicPartition(topic, 0))
@ -108,6 +109,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
admin,
topic,
brokers,
controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)
@ -135,6 +137,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest {
admin,
topic,
brokers,
controllerServers,
replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId))
)

View File

@ -152,7 +152,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
): Unit = {
if (isKRaftTest()) {
resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin =>
TestUtils.createOffsetsTopicWithAdmin(admin, brokers)
TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers)
}
} else {
TestUtils.createOffsetsTopic(zkClient, servers)
@ -178,6 +178,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
admin = admin,
topic = topic,
brokers = brokers,
controllers = controllerServers,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
topicConfig = topicConfig
@ -211,7 +212,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
admin = admin,
topic = topic,
replicaAssignment = partitionReplicaAssignment,
brokers = brokers
brokers = brokers,
controllers = controllerServers
)
}
} else {
@ -232,7 +234,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
TestUtils.deleteTopicWithAdmin(
admin = admin,
topic = topic,
brokers = aliveBrokers)
brokers = aliveBrokers,
controllers = controllerServers)
}
} else {
adminZkClient.deleteTopic(topic)

View File

@ -74,7 +74,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// in this test because it does not use FindCoordinator API.
TestUtils.createOffsetsTopicWithAdmin(
admin = admin,
brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala
brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala,
controllers = raftCluster.controllerServers().asScala.toSeq
)
// Heartbeat request to join the group. Note that the member subscribes

View File

@ -44,10 +44,19 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
}
private def controllerServers(): Seq[ControllerServer] = {
if (cluster.isKRaftTest) {
cluster.asInstanceOf[RaftClusterInstance].controllerServers().asScala.toSeq
} else {
Seq.empty
}
}
protected def createOffsetsTopic(): Unit = {
TestUtils.createOffsetsTopicWithAdmin(
admin = cluster.createAdminClient(),
brokers = brokers()
brokers = brokers(),
controllers = controllerServers()
)
}
@ -58,6 +67,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
TestUtils.createTopicWithAdmin(
admin = cluster.createAdminClient(),
brokers = brokers(),
controllers = controllerServers(),
topic = topic,
numPartitions = numPartitions
)

View File

@ -455,6 +455,7 @@ object TestUtils extends Logging {
admin: Admin,
topic: String,
brokers: Seq[B],
controllers: Seq[ControllerServer],
numPartitions: Int = 1,
replicationFactor: Int = 1,
replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty,
@ -492,6 +493,7 @@ object TestUtils extends Logging {
// wait until we've propagated all partitions metadata to all brokers
val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions)
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller))
(0 until effectiveNumPartitions).map { i =>
i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse(
@ -521,7 +523,8 @@ object TestUtils extends Logging {
def createOffsetsTopicWithAdmin[B <: KafkaBroker](
admin: Admin,
brokers: Seq[B]
brokers: Seq[B],
controllers: Seq[ControllerServer]
): Map[Int, Int] = {
val broker = brokers.head
createTopicWithAdmin(
@ -530,6 +533,7 @@ object TestUtils extends Logging {
numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
brokers = brokers,
controllers = controllers,
topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs,
)
}
@ -538,6 +542,7 @@ object TestUtils extends Logging {
admin: Admin,
topic: String,
brokers: Seq[B],
controllers: Seq[ControllerServer]
): Unit = {
try {
admin.deleteTopics(Collections.singletonList(topic)).all().get()
@ -547,6 +552,7 @@ object TestUtils extends Logging {
// ignore
}
waitForAllPartitionsMetadata(brokers, topic, 0)
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller))
}
/**

View File

@ -179,7 +179,7 @@ public final class TieredStorageTestContext implements AutoCloseable {
}
public void deleteTopic(String topic) {
TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers());
TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(), harness.controllerServers());
}
/**