mirror of https://github.com/apache/kafka.git
KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)
Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow for throttling to take place. This helps avoid a race condition where the reassignment would complete more quickly than expected causing an assertion to fail some times. Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
a4b923f76f
commit
99472c54f0
|
@ -50,14 +50,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
|
|||
/**
|
||||
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
|
||||
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
|
||||
*
|
||||
* Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test
|
||||
* `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
|
||||
*/
|
||||
override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
|
||||
numConfigs = 6,
|
||||
zkConnect = zkConnect,
|
||||
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"),
|
||||
numPartitions = numPartitions,
|
||||
defaultReplicationFactor = defaultReplicationFactor
|
||||
).map(KafkaConfig.fromProps)
|
||||
defaultReplicationFactor = defaultReplicationFactor,
|
||||
).map { props =>
|
||||
props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1")
|
||||
KafkaConfig.fromProps(props)
|
||||
}
|
||||
|
||||
private val numPartitions = 1
|
||||
private val defaultReplicationFactor = 1.toShort
|
||||
|
@ -672,8 +678,13 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
|
|||
adminClient.createTopics(
|
||||
Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get()
|
||||
waitForTopicCreated(testTopicName)
|
||||
|
||||
// Produce multiple batches.
|
||||
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
|
||||
TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1)
|
||||
|
||||
// Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication
|
||||
// throughput so the reassignment doesn't complete quickly.
|
||||
val brokerIds = servers.map(_.config.brokerId)
|
||||
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1)
|
||||
|
||||
|
@ -703,6 +714,10 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
|
|||
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
|
||||
assertEquals(s"--under-replicated-partitions shouldn't return anything: '$underReplicatedOutput'", "", underReplicatedOutput)
|
||||
|
||||
// Verify reassignment is still ongoing.
|
||||
val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp)
|
||||
assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty))
|
||||
|
||||
TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
|
||||
TestUtils.waitForAllReassignmentsToComplete(adminClient)
|
||||
}
|
||||
|
|
|
@ -97,7 +97,8 @@ public class CheckpointBench {
|
|||
this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
|
||||
this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
|
||||
0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
|
||||
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1));
|
||||
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
|
||||
(short) 1));
|
||||
this.metrics = new Metrics();
|
||||
this.time = new MockTime();
|
||||
this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
|
||||
|
|
Loading…
Reference in New Issue