mirror of https://github.com/apache/kafka.git
KAFKA-16845 Migrate ReplicationQuotasTestRig to new test infra (#17089)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
5311839bd5
commit
1eb7644349
|
@ -2484,6 +2484,7 @@ project(':tools') {
|
|||
testImplementation project(':connect:runtime').sourceSets.test.output
|
||||
testImplementation project(':storage:storage-api').sourceSets.main.output
|
||||
testImplementation project(':storage').sourceSets.test.output
|
||||
testImplementation project(':test-common')
|
||||
testImplementation libs.junitJupiter
|
||||
testImplementation libs.mockitoCore
|
||||
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
|
||||
|
|
|
@ -283,6 +283,7 @@
|
|||
|
||||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.common"/>
|
||||
<allow pkg="org.apache.kafka.metadata" />
|
||||
<allow pkg="org.apache.kafka.metadata.properties" />
|
||||
<allow pkg="org.apache.kafka.network" />
|
||||
<allow pkg="org.apache.kafka.server.util" />
|
||||
|
@ -316,6 +317,7 @@
|
|||
<allow pkg="org.apache.kafka.tools" />
|
||||
<allow pkg="org.apache.kafka.tools.api" />
|
||||
<allow pkg="org.apache.kafka.tools.filter" />
|
||||
<allow pkg="org.apache.kafka.image" />
|
||||
|
||||
<subpackage name="consumer">
|
||||
<allow pkg="org.apache.kafka.tools"/>
|
||||
|
|
|
@ -17,14 +17,12 @@
|
|||
package org.apache.kafka.tools.other;
|
||||
|
||||
import kafka.log.UnifiedLog;
|
||||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.KafkaServer;
|
||||
import kafka.server.QuorumTestHarness;
|
||||
import kafka.utils.EmptyTestInfo;
|
||||
import kafka.server.BrokerServer;
|
||||
import kafka.server.KafkaBroker;
|
||||
import kafka.utils.TestUtils;
|
||||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
|
@ -33,9 +31,13 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.image.TopicImage;
|
||||
import org.apache.kafka.metadata.PartitionRegistration;
|
||||
import org.apache.kafka.server.quota.QuotaType;
|
||||
import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
|
||||
|
||||
|
@ -57,13 +59,11 @@ import java.nio.file.Files;
|
|||
import java.text.DecimalFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
|
@ -74,8 +74,6 @@ import java.util.stream.IntStream;
|
|||
import javax.imageio.ImageIO;
|
||||
|
||||
import scala.Option;
|
||||
import scala.collection.Seq;
|
||||
import scala.jdk.javaapi.CollectionConverters;
|
||||
|
||||
import static java.nio.file.StandardOpenOption.APPEND;
|
||||
import static java.nio.file.StandardOpenOption.CREATE;
|
||||
|
@ -85,7 +83,7 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
|||
* Test rig for measuring throttling performance. Configure the parameters for a set of experiments, then execute them
|
||||
* and view the html output file, with charts, that are produced. You can also render the charts to the screen if
|
||||
* you wish.
|
||||
*
|
||||
* <p>
|
||||
* Currently you'll need about 40GB of disk space to run these experiments (largest data written x2). Tune the msgSize
|
||||
* & #partitions and throttle to adjust this.
|
||||
*/
|
||||
|
@ -129,7 +127,6 @@ public class ReplicationQuotasTestRig {
|
|||
static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) {
|
||||
Experiment experiment = new Experiment();
|
||||
try {
|
||||
experiment.setUp(new EmptyTestInfo());
|
||||
experiment.run(config, journal, displayChartsOnScreen);
|
||||
journal.footer();
|
||||
} catch (Exception e) {
|
||||
|
@ -159,38 +156,46 @@ public class ReplicationQuotasTestRig {
|
|||
}
|
||||
}
|
||||
|
||||
static class Experiment extends QuorumTestHarness {
|
||||
static class Experiment {
|
||||
static final String TOPIC_NAME = "my-topic";
|
||||
|
||||
String experimentName = "unset";
|
||||
List<KafkaServer> servers;
|
||||
Map<Integer, List<Double>> leaderRates = new HashMap<>();
|
||||
Map<Integer, List<Double>> followerRates = new HashMap<>();
|
||||
KafkaClusterTestKit cluster;
|
||||
Admin adminClient;
|
||||
|
||||
void startBrokers(List<Integer> brokerIds) {
|
||||
void startBrokers(int numBrokerNodes) {
|
||||
System.out.println("Starting Brokers");
|
||||
servers = brokerIds.stream().map(i -> createBrokerConfig(i, zkConnect()))
|
||||
.map(c -> TestUtils.createServer(KafkaConfig.fromProps(c), Time.SYSTEM))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers), DEFAULT_MAX_WAIT_MS);
|
||||
String brokerList = TestUtils.plaintextBootstrapServers(seq(servers));
|
||||
adminClient = Admin.create(Collections.singletonMap(
|
||||
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList
|
||||
));
|
||||
try {
|
||||
cluster = new KafkaClusterTestKit.Builder(
|
||||
new TestKitNodes.Builder()
|
||||
.setNumControllerNodes(1)
|
||||
.setNumBrokerNodes(numBrokerNodes)
|
||||
.build()
|
||||
).build();
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
cluster.waitForReadyBrokers();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed to start test Kafka cluster", e);
|
||||
}
|
||||
|
||||
@Override public void tearDown() {
|
||||
adminClient = Admin.create(cluster.clientProperties());
|
||||
}
|
||||
|
||||
public void tearDown() {
|
||||
Utils.closeQuietly(adminClient, "adminClient");
|
||||
TestUtils.shutdownServers(seq(servers), true);
|
||||
super.tearDown();
|
||||
try {
|
||||
cluster.close();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) throws Exception {
|
||||
experimentName = config.name;
|
||||
List<Integer> brokers = IntStream.rangeClosed(100, 100 + config.brokers).boxed().collect(Collectors.toList());
|
||||
int shift = Math.round(config.brokers / 2f);
|
||||
|
||||
IntSupplier nextReplicaRoundRobin = new IntSupplier() {
|
||||
|
@ -198,30 +203,46 @@ public class ReplicationQuotasTestRig {
|
|||
|
||||
@Override public int getAsInt() {
|
||||
count++;
|
||||
return 100 + (count + shift) % config.brokers;
|
||||
return (count + shift) % config.brokers;
|
||||
}
|
||||
};
|
||||
|
||||
Map<Integer, Seq<Integer>> replicas = IntStream.rangeClosed(0, config.partitions).boxed().collect(Collectors.toMap(
|
||||
Map<Integer, List<Integer>> replicas = IntStream.rangeClosed(0, config.partitions - 1).boxed().collect(Collectors.toMap(
|
||||
Function.identity(),
|
||||
partition -> seq(Collections.singleton(nextReplicaRoundRobin.getAsInt()))
|
||||
partition -> Collections.singletonList(nextReplicaRoundRobin.getAsInt())
|
||||
));
|
||||
|
||||
startBrokers(brokers);
|
||||
TestUtils.createTopic(zkClient(), TOPIC_NAME, (scala.collection.Map) CollectionConverters.asScala(replicas), seq(servers));
|
||||
startBrokers(config.brokers);
|
||||
adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, replicas))).all().get();
|
||||
|
||||
TestUtils.waitUntilTrue(
|
||||
() -> cluster.brokers().values().stream().allMatch(server -> {
|
||||
TopicImage image = server.metadataCache().currentImage().topics().getTopic(TOPIC_NAME);
|
||||
return image != null && image.partitions().values().stream().allMatch(PartitionRegistration::hasLeader);
|
||||
}),
|
||||
() -> "Timed out waiting for topic listing",
|
||||
DEFAULT_MAX_WAIT_MS,
|
||||
500L
|
||||
);
|
||||
|
||||
System.out.println("Writing Data");
|
||||
KafkaProducer<byte[], byte[]> producer = createProducer(TestUtils.plaintextBootstrapServers(seq(servers)));
|
||||
|
||||
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
|
||||
for (int x = 0; x < config.msgsPerPartition; x++) {
|
||||
for (int partition = 0; partition < config.partitions; partition++) {
|
||||
producer.send(new ProducerRecord<>(TOPIC_NAME, partition, null, new byte[config.msgSize]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Generating Reassignment");
|
||||
Map<TopicPartition, List<Integer>> newAssignment = ReassignPartitionsCommand.generateAssignment(adminClient,
|
||||
json(TOPIC_NAME), brokers.stream().map(Object::toString).collect(Collectors.joining(",")), true).getKey();
|
||||
Map<TopicPartition, List<Integer>> newAssignment = ReassignPartitionsCommand.generateAssignment(
|
||||
adminClient,
|
||||
json(TOPIC_NAME),
|
||||
cluster.brokers().values().stream()
|
||||
.map(server -> String.valueOf(server.replicaManager().localBrokerId()))
|
||||
.collect(Collectors.joining(",")),
|
||||
true
|
||||
).getKey();
|
||||
|
||||
System.out.println("Starting Reassignment");
|
||||
long start = System.currentTimeMillis();
|
||||
|
@ -246,12 +267,13 @@ public class ReplicationQuotasTestRig {
|
|||
|
||||
void validateAllOffsetsMatch(ExperimentDef config) {
|
||||
//Validate that offsets are correct in all brokers
|
||||
for (KafkaServer broker : servers) {
|
||||
for (KafkaBroker broker : cluster.brokers().values()) {
|
||||
for (int partitionId = 0; partitionId < config.partitions; partitionId++) {
|
||||
long offset = broker.getLogManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false).map(UnifiedLog::logEndOffset).getOrElse(() -> -1L);
|
||||
long offset = broker.logManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false)
|
||||
.map(UnifiedLog::logEndOffset).getOrElse(() -> -1L);
|
||||
if (offset >= 0 && offset != config.msgsPerPartition) {
|
||||
throw new RuntimeException(
|
||||
"Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().brokerId() + ". " +
|
||||
"Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().nodeId() + ". " +
|
||||
"Expected " + config.msgsPerPartition + " but was " + offset + "."
|
||||
);
|
||||
}
|
||||
|
@ -259,7 +281,7 @@ public class ReplicationQuotasTestRig {
|
|||
}
|
||||
}
|
||||
|
||||
void logOutput(ExperimentDef config, Map<Integer, Seq<Integer>> replicas, Map<TopicPartition, List<Integer>> newAssignment) throws Exception {
|
||||
void logOutput(ExperimentDef config, Map<Integer, List<Integer>> replicas, Map<TopicPartition, List<Integer>> newAssignment) throws Exception {
|
||||
List<TopicPartitionInfo> actual = adminClient.describeTopics(Collections.singleton(TOPIC_NAME))
|
||||
.allTopicNames().get().get(TOPIC_NAME).partitions();
|
||||
|
||||
|
@ -272,7 +294,7 @@ public class ReplicationQuotasTestRig {
|
|||
System.out.println("The replicas are " + new TreeMap<>(replicas).entrySet().stream().map(e -> "\n" + e).collect(Collectors.joining()));
|
||||
System.out.println("This is the current replica assignment:\n" + curAssignment);
|
||||
System.out.println("proposed assignment is: \n" + newAssignment);
|
||||
System.out.println("This is the assignment we ended up with" + curAssignment);
|
||||
System.out.println("This is the assignment we ended up with " + curAssignment);
|
||||
|
||||
//Test Stats
|
||||
System.out.println("numBrokers: " + config.brokers);
|
||||
|
@ -341,29 +363,29 @@ public class ReplicationQuotasTestRig {
|
|||
return dataset;
|
||||
}
|
||||
|
||||
void record(Map<Integer, List<Double>> rates, int brokerId, Double currentRate) {
|
||||
List<Double> leaderRatesBroker = rates.getOrDefault(brokerId, new ArrayList<>());
|
||||
void record(Map<Integer, List<Double>> rates, int nodeId, Double currentRate) {
|
||||
List<Double> leaderRatesBroker = rates.getOrDefault(nodeId, new ArrayList<>());
|
||||
leaderRatesBroker.add(currentRate);
|
||||
rates.put(brokerId, leaderRatesBroker);
|
||||
rates.put(nodeId, leaderRatesBroker);
|
||||
}
|
||||
|
||||
void printRateMetrics() {
|
||||
for (KafkaServer broker : servers) {
|
||||
for (BrokerServer broker : cluster.brokers().values()) {
|
||||
double leaderRate = measuredRate(broker, QuotaType.LEADER_REPLICATION);
|
||||
if (broker.config().brokerId() == 100)
|
||||
LOGGER.info("waiting... Leader rate on 101 is " + leaderRate);
|
||||
record(leaderRates, broker.config().brokerId(), leaderRate);
|
||||
if (broker.config().nodeId() == 0)
|
||||
LOGGER.info("waiting... Leader rate on 1 is {}", leaderRate);
|
||||
record(leaderRates, broker.config().nodeId(), leaderRate);
|
||||
if (leaderRate > 0)
|
||||
LOGGER.trace("Leader Rate on " + broker.config().brokerId() + " is " + leaderRate);
|
||||
LOGGER.trace("Leader Rate on {} is {}", broker.config().nodeId(), leaderRate);
|
||||
|
||||
double followerRate = measuredRate(broker, QuotaType.FOLLOWER_REPLICATION);
|
||||
record(followerRates, broker.config().brokerId(), followerRate);
|
||||
record(followerRates, broker.config().nodeId(), followerRate);
|
||||
if (followerRate > 0)
|
||||
LOGGER.trace("Follower Rate on " + broker.config().brokerId() + " is " + followerRate);
|
||||
LOGGER.trace("Follower Rate on {} is {}", broker.config().nodeId(), followerRate);
|
||||
}
|
||||
}
|
||||
|
||||
private double measuredRate(KafkaServer broker, QuotaType repType) {
|
||||
private double measuredRate(KafkaBroker broker, QuotaType repType) {
|
||||
MetricName metricName = broker.metrics().metricName("byte-rate", repType.toString());
|
||||
return broker.metrics().metrics().containsKey(metricName)
|
||||
? (double) broker.metrics().metrics().get(metricName).metricValue()
|
||||
|
@ -375,10 +397,10 @@ public class ReplicationQuotasTestRig {
|
|||
return "{\"topics\": [" + topicStr + "],\"version\":1}";
|
||||
}
|
||||
|
||||
KafkaProducer<byte[], byte[]> createProducer(String brokerList) {
|
||||
KafkaProducer<byte[], byte[]> createProducer() {
|
||||
return TestUtils.createProducer(
|
||||
brokerList,
|
||||
0,
|
||||
cluster.bootstrapServers(),
|
||||
1,
|
||||
60 * 1000L,
|
||||
1024L * 1024L,
|
||||
Integer.MAX_VALUE,
|
||||
|
@ -395,31 +417,6 @@ public class ReplicationQuotasTestRig {
|
|||
false
|
||||
);
|
||||
}
|
||||
|
||||
Properties createBrokerConfig(int brokerId, String zkConnect) {
|
||||
return TestUtils.createBrokerConfig(
|
||||
brokerId,
|
||||
zkConnect,
|
||||
false, // shorten test time
|
||||
true,
|
||||
TestUtils.RandomPort(),
|
||||
Option.empty(),
|
||||
Option.empty(),
|
||||
Option.empty(),
|
||||
true,
|
||||
false,
|
||||
TestUtils.RandomPort(),
|
||||
false,
|
||||
TestUtils.RandomPort(),
|
||||
false,
|
||||
TestUtils.RandomPort(),
|
||||
Option.empty(),
|
||||
3,
|
||||
false,
|
||||
1,
|
||||
(short) 1,
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
static class Journal {
|
||||
|
@ -476,8 +473,4 @@ public class ReplicationQuotasTestRig {
|
|||
return log.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> Seq<T> seq(Collection<T> seq) {
|
||||
return CollectionConverters.asScala(seq).toSeq();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue