KAFKA-14248; Fix flaky test PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs (#12669)

The test is failing intermittently because we do not wait for propagation of the altered config (LogRetentionTimeMillisProp) across all brokers before proceeding ahead with the test.

This PR makes the following changes:
1. Wait for propagation of altered configuration to propagate to all brokers.
2. Use the existing `killBroker` utility method which waits for shutdown using `awaitshutdown`.
3. Improve code readability by using `TestUtils.incrementalAlterConfigs` to send alter config requests.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Divij Vaidya 2022-09-30 01:24:03 +02:00 committed by GitHub
parent 281e178352
commit bc95aa2116
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 44 deletions

View File

@ -33,7 +33,6 @@ import kafka.server.{Defaults, DynamicConfig, KafkaConfig}
import kafka.utils.TestUtils._
import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@ -159,22 +158,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
waitForTopics(client, List(), topics)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk")) // KRaft mode will be supported in KAFKA-13910
def testMetadataRefresh(quorum: String): Unit = {
client = Admin.create(createConfig)
val topics = Seq("mytopic")
val newTopics = Seq(new NewTopic("mytopic", 3, 3.toShort))
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, expectedPresent = topics, expectedMissing = List())
val controller = brokers.find(_.config.brokerId == brokers.flatMap(_.metadataCache.getControllerId).head).get
controller.shutdown()
controller.awaitShutdown()
val topicDesc = client.describeTopics(topics.asJava).allTopicNames.get()
assertEquals(topics.toSet, topicDesc.keySet.asScala)
}
/**
* describe should not auto create topics
*/
@ -821,10 +804,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ValueSource(strings = Array("zk", "kraft"))
def testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords(quorum: String): Unit = {
val leaders = createTopic(topic, replicationFactor = brokerCount)
val followerIndex = if (leaders(0) != brokers(0).config.brokerId) 0 else 1
val followerIndex = if (leaders(0) != brokers.head.config.brokerId) 0 else 1
def waitForFollowerLog(expectedStartOffset: Long, expectedEndOffset: Long): Unit = {
TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition) != None,
TestUtils.waitUntilTrue(() => brokers(followerIndex).replicaManager.localLog(topicPartition).isDefined,
"Expected follower to create replica for partition")
// wait until the follower discovers that log start offset moved beyond its HW
@ -862,6 +845,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val result1 = client.deleteRecords(Map(topicPartition -> RecordsToDelete.beforeOffset(117L)).asJava)
result1.all().get()
restartDeadBrokers()
TestUtils.waitForBrokersInIsr(client, topicPartition, Set(followerIndex))
waitForFollowerLog(expectedStartOffset=117L, expectedEndOffset=200L)
}
@ -1522,7 +1506,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Now change the preferred leader to 1
changePreferredLeader(prefer1)
// but shut it down...
brokers(1).shutdown()
killBroker(1)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
def assertPreferredLeaderNotAvailable(
@ -1576,9 +1560,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
brokers(broker2).shutdown()
killBroker(broker2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertNoLeader(client, partition1)
brokers(broker2).startup()
@ -1610,9 +1594,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
brokers(broker2).shutdown()
killBroker(broker2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(broker2))
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertNoLeader(client, partition2)
brokers(broker2).startup()
@ -1648,9 +1632,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
brokers(broker2).shutdown()
killBroker(broker2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
brokers(broker2).startup()
@ -1708,9 +1692,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
brokers(broker2).shutdown()
killBroker(broker2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertNoLeader(client, partition1)
val electResult = client.electLeaders(ElectionType.UNCLEAN, Set(partition1).asJava)
@ -1737,7 +1721,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertLeader(client, partition1, broker2)
brokers(broker1).startup()
@ -1769,9 +1753,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition1, broker1)
TestUtils.assertLeader(client, partition2, broker1)
brokers(broker2).shutdown()
killBroker(broker2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1), Set(broker2))
brokers(broker1).shutdown()
killBroker(broker1)
TestUtils.assertNoLeader(client, partition1)
TestUtils.assertLeader(client, partition2, broker3)
brokers(broker2).startup()
@ -2505,7 +2489,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val alterResult = client.incrementalAlterConfigs(Map(
topicResource -> topicAlterConfigs
).asJava)
alterResult.all().get()
alterResult.all().get(15, TimeUnit.SECONDS)
ensureConsistentKRaftMetadata()
val config = client.describeConfigs(List(topicResource).asJava).all().get().get(topicResource).get(LogConfig.LeaderReplicationThrottledReplicasProp)
@ -2523,19 +2507,29 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
def testCreateTopicsReturnsConfigs(quorum: String): Unit = {
client = Admin.create(super.createConfig)
val alterMap = new util.HashMap[ConfigResource, util.Collection[AlterConfigOp]]
alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, ""), util.Arrays.asList(
new AlterConfigOp(new ConfigEntry(KafkaConfig.LogRetentionTimeMillisProp, "10800000"), OpType.SET)))
(brokers.map(_.config) ++ controllerServers.map(_.config)).foreach { case config =>
alterMap.put(new ConfigResource(ConfigResource.Type.BROKER, config.nodeId.toString()),
util.Arrays.asList(new AlterConfigOp(new ConfigEntry(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "34"), OpType.SET)))
val newLogRetentionProperties = new Properties
newLogRetentionProperties.put(KafkaConfig.LogRetentionTimeMillisProp, "10800000")
TestUtils.incrementalAlterConfigs(null, client, newLogRetentionProperties, perBrokerConfig = false)
.all().get(15, TimeUnit.SECONDS)
val newLogCleanerDeleteRetention = new Properties
newLogCleanerDeleteRetention.put(KafkaConfig.LogCleanerDeleteRetentionMsProp, "34")
TestUtils.incrementalAlterConfigs(brokers, client, newLogCleanerDeleteRetention, perBrokerConfig = true)
.all().get(15, TimeUnit.SECONDS)
if (isKRaftTest()) {
ensureConsistentKRaftMetadata()
} else {
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
waitTimeMs = 60000L)
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
KafkaConfig.LogRetentionTimeMillisProp, "").toString.equals("10800000")),
s"Timed out waiting for change to ${KafkaConfig.LogRetentionTimeMillisProp}",
waitTimeMs = 60000L)
}
client.incrementalAlterConfigs(alterMap).all().get()
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
KafkaConfig.LogCleanerDeleteRetentionMsProp, "").toString.equals("34")),
s"Timed out waiting for change to ${KafkaConfig.LogCleanerDeleteRetentionMsProp}",
waitTimeMs = 60000L)
val newTopics = Seq(new NewTopic("foo", Map((0: Integer) -> Seq[Integer](1, 2).asJava,
(1: Integer) -> Seq[Integer](2, 0).asJava).asJava).