From c6ea0a84ab107327d26b454cfe8e595c8befcc9c Mon Sep 17 00:00:00 2001 From: David Mao <47232755+splett2@users.noreply.github.com> Date: Mon, 6 Nov 2023 17:07:56 -0800 Subject: [PATCH] KAFKA-15780: Wait for consistent KRaft metadata when creating or deleting topics (#14695) TestUtils.createTopicWithAdmin calls waitForAllPartitionsMetadata which waits for partition(s) to be present in each brokers' metadata cache. This is a sufficient check in ZK mode because the controller sends an LISR request before sending an UpdateMetadataRequest which means that the partition in the ReplicaManager will be updated before the metadata cache. In KRaft mode, the metadata cache is updated first, so the check may return before partitions and other metadata listeners are fully initialized. Testing: Insert a Thread.sleep(100) in BrokerMetadataPublisher.onMetadataUpdate after // Publish the new metadata image to the metadata cache. metadataCache.setImage(newImage) and run EdgeCaseRequestTest.testProduceRequestWithNullClientId and the test will fail locally nearly deterministically. After the change(s), the test no longer fails. Reviewers: Justine Olshan --- .../kafka/admin/RemoteTopicCrudTest.scala | 36 +++++++++---------- .../kafka/api/BaseProducerSendTest.scala | 20 +++++------ .../kafka/api/EndToEndAuthorizationTest.scala | 2 +- .../api/PlaintextAdminIntegrationTest.scala | 4 +-- .../kafka/api/PlaintextProducerSendTest.scala | 8 ++--- .../kafka/api/ProducerCompressionTest.scala | 2 +- .../DynamicBrokerReconfigurationTest.scala | 8 ++--- .../FetchFromFollowerIntegrationTest.scala | 3 ++ .../integration/KafkaServerTestHarness.scala | 9 +++-- .../ConsumerGroupHeartbeatRequestTest.scala | 3 +- .../GroupCoordinatorBaseRequestTest.scala | 12 ++++++- .../scala/unit/kafka/utils/TestUtils.scala | 8 ++++- .../storage/TieredStorageTestContext.java | 2 +- 13 files changed, 70 insertions(+), 47 deletions(-) diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 6d8fbe1bbe7..7b21ef82686 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -79,7 +79,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -91,7 +91,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "256") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -103,7 +103,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "1001") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -115,7 +115,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "1025") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) verifyRemoteLogTopicConfigs(topicConfig) } @@ -128,7 +128,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -140,7 +140,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_BYTES_CONFIG, "512") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -151,7 +151,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact") assertThrowsException(classOf[InvalidConfigurationException], () => - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig)) } @@ -160,7 +160,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { def testEnableRemoteLogOnExistingTopicTest(quorum: String): Unit = { val admin = createAdminClient() val topicConfig = new Properties() - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -181,11 +181,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfigWithRemoteStorage = new Properties() topicConfigWithRemoteStorage.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") val message = assertThrowsException(classOf[InvalidConfigurationException], - () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, + () => TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfigWithRemoteStorage)) assertTrue(message.getMessage.contains("Tiered Storage functionality is disabled in the broker")) - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor) + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName), Collections.singleton( @@ -203,7 +203,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -224,7 +224,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) val configs = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]() @@ -245,7 +245,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) // inherited local retention ms is 1000 @@ -265,7 +265,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val admin = createAdminClient() val topicConfig = new Properties() topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, numPartitions, numReplicationFactor, + TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, controllerServers, numPartitions, numReplicationFactor, topicConfig = topicConfig) // inherited local retention bytes is 1024 @@ -288,9 +288,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200") topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) - TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers) + TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers) assertThrowsException(classOf[UnknownTopicOrPartitionException], () => TestUtils.describeTopic(createAdminClient(), testTopicName), "Topic should be deleted") TestUtils.waitUntilTrue(() => @@ -304,7 +304,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head @@ -326,7 +326,7 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { val topicConfig = new Properties() topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) - TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, controllerServers, numPartitions, brokerCount, topicConfig = topicConfig) val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 5f51d2bd41b..93f7a7212c4 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -154,7 +154,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { try { // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) // send a normal record val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, "key".getBytes(StandardCharsets.UTF_8), @@ -208,7 +208,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { timeoutMs: Long = 20000L): Unit = { val partition = 0 try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val futures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -263,7 +263,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") else topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime") - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val recordAndFutures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes(StandardCharsets.UTF_8), @@ -296,7 +296,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { try { // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) // non-blocking send a list of records val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes(StandardCharsets.UTF_8), @@ -329,7 +329,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer() try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val partition = 1 val now = System.currentTimeMillis() @@ -373,7 +373,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val replicas = List(0, follower) try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 3, Map(0 -> replicas)) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 3, Map(0 -> replicas)) val partition = 0 val now = System.currentTimeMillis() @@ -422,7 +422,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val producer = createProducer(maxBlockMs = 5 * 1000L) // create topic - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val partition0 = 0 var futures0 = (1 to numRecords).map { i => @@ -479,7 +479,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { def testFlush(quorum: String): Unit = { val producer = createProducer(lingerMs = Int.MaxValue, deliveryTimeoutMs = Int.MaxValue) try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, "value".getBytes(StandardCharsets.UTF_8)) for (_ <- 0 until 50) { @@ -499,7 +499,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCloseWithZeroTimeoutFromCallerThread(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, @@ -525,7 +525,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testCloseWithZeroTimeoutFromSenderThread(quorum: String): Unit = { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2) val partition = 0 consumer.assign(List(new TopicPartition(topic, partition)).asJava) val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes(StandardCharsets.UTF_8)) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 7731efd360f..46e674c00aa 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -167,7 +167,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // create the test topic with all the brokers as replicas val superuserAdminClient = createSuperuserAdminClient() - TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, + TestUtils.createTopicWithAdmin(admin = superuserAdminClient, topic = topic, brokers = brokers, controllers = controllerServers, numPartitions = 1, replicationFactor = 3, topicConfig = new Properties) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 5bb3533146c..23fb314fa04 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2692,11 +2692,11 @@ object PlaintextAdminIntegrationTest { // Create topics val topic1 = "invalid-alter-configs-topic-1" val topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, topic1) - createTopicWithAdmin(admin, topic1, test.brokers, numPartitions = 1, replicationFactor = 1) + createTopicWithAdmin(admin, topic1, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1) val topic2 = "invalid-alter-configs-topic-2" val topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2) - createTopicWithAdmin(admin, topic2, test.brokers, numPartitions = 1, replicationFactor = 1) + createTopicWithAdmin(admin, topic2, test.brokers, test.controllerServers, numPartitions = 1, replicationFactor = 1) val topicConfigEntries1 = Seq( new ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "1.1"), // this value is invalid as it's above 1.0 diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index ac7b775c228..77132d919bc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -66,7 +66,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { val producer = createProducer(batchSize = 0) val numRecords = 10; try { - TestUtils.createTopicWithAdmin(admin, topic, brokers, 2) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 2) val futures = for (i <- 1 to numRecords) yield { val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, s"value$i".getBytes(StandardCharsets.UTF_8)) producer.send(record) @@ -128,7 +128,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to have 1 minute threshold. Note that recordTimestamp has 5 minutes diff val oneMinuteInMs: Long = 1 * 60 * 60 * 1000L topicProps.setProperty(messageTimeStampConfig, oneMinuteInMs.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() try { @@ -157,7 +157,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to be the same as the record timestamp topicProps.setProperty(messageTimeStampConfig, recordTimestamp.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() @@ -178,7 +178,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { // set the TopicConfig for timestamp validation to have 10 minute threshold. Note that recordTimestamp has 5 minutes diff val tenMinutesInMs: Long = 10 * 60 * 60 * 1000L topicProps.setProperty(messageTimeStampConfig, tenMinutesInMs.toString) - TestUtils.createTopicWithAdmin(admin, topic, brokers, 1, 2, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(admin, topic, brokers, controllerServers, 1, 2, topicConfig = topicProps) val producer = createProducer() diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 6135ec952ca..08f10e89083 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -87,7 +87,7 @@ class ProducerCompressionTest extends QuorumTestHarness { val admin = TestUtils.createAdminClient(Seq(broker), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) try { - TestUtils.createTopicWithAdmin(admin, topic, Seq(broker)) + TestUtils.createTopicWithAdmin(admin, topic, Seq(broker), controllerServers) } finally { admin.close() } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 8eb27097371..55714993631 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -156,8 +156,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup createAdminClient(SecurityProtocol.SSL, SecureInternal) - TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers) - TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, + TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, controllerServers, numPartitions, replicationFactor = numServers) + TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers, controllerServers, numPartitions = servers.head.config.offsetsTopicPartitions, replicationFactor = numServers, topicConfig = servers.head.groupCoordinator.groupMetadataTopicConfigs) @@ -356,7 +356,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @ValueSource(strings = Array("zk", "kraft")) def testKeyStoreAlter(quorum: String): Unit = { val topic2 = "testtopic2" - TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions, replicationFactor = numServers) // Start a producer and consumer that work with the current broker keystore. // This should continue working while changes are made @@ -578,7 +578,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val topic2 = "testtopic2" val topicProps = new Properties topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2") - TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps) + TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, controllerServers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps) def getLogOrThrow(tp: TopicPartition): UnifiedLog = { var (logOpt, found) = TestUtils.computeUntilTrue { diff --git a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala index dd1ae064f12..d15d0147608 100644 --- a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala @@ -65,6 +65,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) TestUtils.waitUntilLeaderIsKnown(brokers, new TopicPartition(topic, 0)) @@ -108,6 +109,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) @@ -135,6 +137,7 @@ class FetchFromFollowerIntegrationTest extends BaseFetchRequestTest { admin, topic, brokers, + controllerServers, replicaAssignment = Map(0 -> Seq(leaderBrokerId, followerBrokerId)) ) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8775a8323d4..01e6a91eb85 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -152,7 +152,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { ): Unit = { if (isKRaftTest()) { resource(createAdminClient(brokers, listenerName, adminClientConfig)) { admin => - TestUtils.createOffsetsTopicWithAdmin(admin, brokers) + TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers) } } else { TestUtils.createOffsetsTopic(zkClient, servers) @@ -178,6 +178,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { admin = admin, topic = topic, brokers = brokers, + controllers = controllerServers, numPartitions = numPartitions, replicationFactor = replicationFactor, topicConfig = topicConfig @@ -211,7 +212,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { admin = admin, topic = topic, replicaAssignment = partitionReplicaAssignment, - brokers = brokers + brokers = brokers, + controllers = controllerServers ) } } else { @@ -232,7 +234,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { TestUtils.deleteTopicWithAdmin( admin = admin, topic = topic, - brokers = aliveBrokers) + brokers = aliveBrokers, + controllers = controllerServers) } } else { adminZkClient.deleteTopic(topic) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 75bad3e62cd..42bfb97bb25 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -74,7 +74,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // in this test because it does not use FindCoordinator API. TestUtils.createOffsetsTopicWithAdmin( admin = admin, - brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala + brokers = raftCluster.brokers.collect(Collectors.toList[BrokerServer]).asScala, + controllers = raftCluster.controllerServers().asScala.toSeq ) // Heartbeat request to join the group. Note that the member subscribes diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 36bd53c4fbe..6600d669877 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -44,10 +44,19 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } } + private def controllerServers(): Seq[ControllerServer] = { + if (cluster.isKRaftTest) { + cluster.asInstanceOf[RaftClusterInstance].controllerServers().asScala.toSeq + } else { + Seq.empty + } + } + protected def createOffsetsTopic(): Unit = { TestUtils.createOffsetsTopicWithAdmin( admin = cluster.createAdminClient(), - brokers = brokers() + brokers = brokers(), + controllers = controllerServers() ) } @@ -58,6 +67,7 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { TestUtils.createTopicWithAdmin( admin = cluster.createAdminClient(), brokers = brokers(), + controllers = controllerServers(), topic = topic, numPartitions = numPartitions ) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c909cacdda7..be859f8a17a 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,6 +455,7 @@ object TestUtils extends Logging { admin: Admin, topic: String, brokers: Seq[B], + controllers: Seq[ControllerServer], numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignment: collection.Map[Int, Seq[Int]] = Map.empty, @@ -492,6 +493,7 @@ object TestUtils extends Logging { // wait until we've propagated all partitions metadata to all brokers val allPartitionsMetadata = waitForAllPartitionsMetadata(brokers, topic, effectiveNumPartitions) + controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller)) (0 until effectiveNumPartitions).map { i => i -> allPartitionsMetadata.get(new TopicPartition(topic, i)).map(_.leader()).getOrElse( @@ -521,7 +523,8 @@ object TestUtils extends Logging { def createOffsetsTopicWithAdmin[B <: KafkaBroker]( admin: Admin, - brokers: Seq[B] + brokers: Seq[B], + controllers: Seq[ControllerServer] ): Map[Int, Int] = { val broker = brokers.head createTopicWithAdmin( @@ -530,6 +533,7 @@ object TestUtils extends Logging { numPartitions = broker.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp), replicationFactor = broker.config.getShort(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, brokers = brokers, + controllers = controllers, topicConfig = broker.groupCoordinator.groupMetadataTopicConfigs, ) } @@ -538,6 +542,7 @@ object TestUtils extends Logging { admin: Admin, topic: String, brokers: Seq[B], + controllers: Seq[ControllerServer] ): Unit = { try { admin.deleteTopics(Collections.singletonList(topic)).all().get() @@ -547,6 +552,7 @@ object TestUtils extends Logging { // ignore } waitForAllPartitionsMetadata(brokers, topic, 0) + controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers, controller)) } /** diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java index 59acae74ad3..8d475fbfe3c 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java @@ -179,7 +179,7 @@ public final class TieredStorageTestContext implements AutoCloseable { } public void deleteTopic(String topic) { - TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers()); + TestUtils.deleteTopicWithAdmin(admin, topic, harness.brokers(), harness.controllerServers()); } /**