diff --git a/build.gradle b/build.gradle index f3f01f0f5c6..08023f96d8c 100644 --- a/build.gradle +++ b/build.gradle @@ -2484,6 +2484,7 @@ project(':tools') { testImplementation project(':connect:runtime').sourceSets.test.output testImplementation project(':storage:storage-api').sourceSets.main.output testImplementation project(':storage').sourceSets.test.output + testImplementation project(':test-common') testImplementation libs.junitJupiter testImplementation libs.mockitoCore testImplementation libs.mockitoJunitJupiter // supports MockitoExtension diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 2524bb92ebc..77d668c6994 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -283,6 +283,7 @@ + @@ -316,6 +317,7 @@ + diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 64599c688fa..a6b2f13e3c4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -17,14 +17,12 @@ package org.apache.kafka.tools.other; import kafka.log.UnifiedLog; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.server.QuorumTestHarness; -import kafka.utils.EmptyTestInfo; +import kafka.server.BrokerServer; +import kafka.server.KafkaBroker; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.MetricName; @@ -33,9 +31,13 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.server.quota.QuotaType; import org.apache.kafka.tools.reassign.ReassignPartitionsCommand; @@ -57,13 +59,11 @@ import java.nio.file.Files; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -74,8 +74,6 @@ import java.util.stream.IntStream; import javax.imageio.ImageIO; import scala.Option; -import scala.collection.Seq; -import scala.jdk.javaapi.CollectionConverters; import static java.nio.file.StandardOpenOption.APPEND; import static java.nio.file.StandardOpenOption.CREATE; @@ -85,7 +83,7 @@ import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; * Test rig for measuring throttling performance. Configure the parameters for a set of experiments, then execute them * and view the html output file, with charts, that are produced. You can also render the charts to the screen if * you wish. - * + *

* Currently you'll need about 40GB of disk space to run these experiments (largest data written x2). Tune the msgSize * & #partitions and throttle to adjust this. */ @@ -129,7 +127,6 @@ public class ReplicationQuotasTestRig { static void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) { Experiment experiment = new Experiment(); try { - experiment.setUp(new EmptyTestInfo()); experiment.run(config, journal, displayChartsOnScreen); journal.footer(); } catch (Exception e) { @@ -159,38 +156,46 @@ public class ReplicationQuotasTestRig { } } - static class Experiment extends QuorumTestHarness { + static class Experiment { static final String TOPIC_NAME = "my-topic"; String experimentName = "unset"; - List servers; Map> leaderRates = new HashMap<>(); Map> followerRates = new HashMap<>(); + KafkaClusterTestKit cluster; Admin adminClient; - void startBrokers(List brokerIds) { + void startBrokers(int numBrokerNodes) { System.out.println("Starting Brokers"); - servers = brokerIds.stream().map(i -> createBrokerConfig(i, zkConnect())) - .map(c -> TestUtils.createServer(KafkaConfig.fromProps(c), Time.SYSTEM)) - .collect(Collectors.toList()); - TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers), DEFAULT_MAX_WAIT_MS); - String brokerList = TestUtils.plaintextBootstrapServers(seq(servers)); - adminClient = Admin.create(Collections.singletonMap( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList - )); + try { + cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setNumControllerNodes(1) + .setNumBrokerNodes(numBrokerNodes) + .build() + ).build(); + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + } catch (Exception e) { + throw new RuntimeException("Failed to start test Kafka cluster", e); + } + + adminClient = Admin.create(cluster.clientProperties()); } - @Override public void tearDown() { + public void tearDown() { Utils.closeQuietly(adminClient, "adminClient"); - TestUtils.shutdownServers(seq(servers), true); - super.tearDown(); + try { + cluster.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } } - @SuppressWarnings("unchecked") public void run(ExperimentDef config, Journal journal, boolean displayChartsOnScreen) throws Exception { experimentName = config.name; - List brokers = IntStream.rangeClosed(100, 100 + config.brokers).boxed().collect(Collectors.toList()); int shift = Math.round(config.brokers / 2f); IntSupplier nextReplicaRoundRobin = new IntSupplier() { @@ -198,30 +203,46 @@ public class ReplicationQuotasTestRig { @Override public int getAsInt() { count++; - return 100 + (count + shift) % config.brokers; + return (count + shift) % config.brokers; } }; - Map> replicas = IntStream.rangeClosed(0, config.partitions).boxed().collect(Collectors.toMap( + Map> replicas = IntStream.rangeClosed(0, config.partitions - 1).boxed().collect(Collectors.toMap( Function.identity(), - partition -> seq(Collections.singleton(nextReplicaRoundRobin.getAsInt())) + partition -> Collections.singletonList(nextReplicaRoundRobin.getAsInt()) )); - startBrokers(brokers); - TestUtils.createTopic(zkClient(), TOPIC_NAME, (scala.collection.Map) CollectionConverters.asScala(replicas), seq(servers)); + startBrokers(config.brokers); + adminClient.createTopics(Collections.singleton(new NewTopic(TOPIC_NAME, replicas))).all().get(); + + TestUtils.waitUntilTrue( + () -> cluster.brokers().values().stream().allMatch(server -> { + TopicImage image = server.metadataCache().currentImage().topics().getTopic(TOPIC_NAME); + return image != null && image.partitions().values().stream().allMatch(PartitionRegistration::hasLeader); + }), + () -> "Timed out waiting for topic listing", + DEFAULT_MAX_WAIT_MS, + 500L + ); System.out.println("Writing Data"); - KafkaProducer producer = createProducer(TestUtils.plaintextBootstrapServers(seq(servers))); - - for (int x = 0; x < config.msgsPerPartition; x++) { - for (int partition = 0; partition < config.partitions; partition++) { - producer.send(new ProducerRecord<>(TOPIC_NAME, partition, null, new byte[config.msgSize])); + try (KafkaProducer producer = createProducer()) { + for (int x = 0; x < config.msgsPerPartition; x++) { + for (int partition = 0; partition < config.partitions; partition++) { + producer.send(new ProducerRecord<>(TOPIC_NAME, partition, null, new byte[config.msgSize])); + } } } System.out.println("Generating Reassignment"); - Map> newAssignment = ReassignPartitionsCommand.generateAssignment(adminClient, - json(TOPIC_NAME), brokers.stream().map(Object::toString).collect(Collectors.joining(",")), true).getKey(); + Map> newAssignment = ReassignPartitionsCommand.generateAssignment( + adminClient, + json(TOPIC_NAME), + cluster.brokers().values().stream() + .map(server -> String.valueOf(server.replicaManager().localBrokerId())) + .collect(Collectors.joining(",")), + true + ).getKey(); System.out.println("Starting Reassignment"); long start = System.currentTimeMillis(); @@ -246,12 +267,13 @@ public class ReplicationQuotasTestRig { void validateAllOffsetsMatch(ExperimentDef config) { //Validate that offsets are correct in all brokers - for (KafkaServer broker : servers) { + for (KafkaBroker broker : cluster.brokers().values()) { for (int partitionId = 0; partitionId < config.partitions; partitionId++) { - long offset = broker.getLogManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false).map(UnifiedLog::logEndOffset).getOrElse(() -> -1L); + long offset = broker.logManager().getLog(new TopicPartition(TOPIC_NAME, partitionId), false) + .map(UnifiedLog::logEndOffset).getOrElse(() -> -1L); if (offset >= 0 && offset != config.msgsPerPartition) { throw new RuntimeException( - "Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().brokerId() + ". " + + "Run failed as offsets did not match for partition " + partitionId + " on broker " + broker.config().nodeId() + ". " + "Expected " + config.msgsPerPartition + " but was " + offset + "." ); } @@ -259,7 +281,7 @@ public class ReplicationQuotasTestRig { } } - void logOutput(ExperimentDef config, Map> replicas, Map> newAssignment) throws Exception { + void logOutput(ExperimentDef config, Map> replicas, Map> newAssignment) throws Exception { List actual = adminClient.describeTopics(Collections.singleton(TOPIC_NAME)) .allTopicNames().get().get(TOPIC_NAME).partitions(); @@ -272,7 +294,7 @@ public class ReplicationQuotasTestRig { System.out.println("The replicas are " + new TreeMap<>(replicas).entrySet().stream().map(e -> "\n" + e).collect(Collectors.joining())); System.out.println("This is the current replica assignment:\n" + curAssignment); System.out.println("proposed assignment is: \n" + newAssignment); - System.out.println("This is the assignment we ended up with" + curAssignment); + System.out.println("This is the assignment we ended up with " + curAssignment); //Test Stats System.out.println("numBrokers: " + config.brokers); @@ -341,29 +363,29 @@ public class ReplicationQuotasTestRig { return dataset; } - void record(Map> rates, int brokerId, Double currentRate) { - List leaderRatesBroker = rates.getOrDefault(brokerId, new ArrayList<>()); + void record(Map> rates, int nodeId, Double currentRate) { + List leaderRatesBroker = rates.getOrDefault(nodeId, new ArrayList<>()); leaderRatesBroker.add(currentRate); - rates.put(brokerId, leaderRatesBroker); + rates.put(nodeId, leaderRatesBroker); } void printRateMetrics() { - for (KafkaServer broker : servers) { + for (BrokerServer broker : cluster.brokers().values()) { double leaderRate = measuredRate(broker, QuotaType.LEADER_REPLICATION); - if (broker.config().brokerId() == 100) - LOGGER.info("waiting... Leader rate on 101 is " + leaderRate); - record(leaderRates, broker.config().brokerId(), leaderRate); + if (broker.config().nodeId() == 0) + LOGGER.info("waiting... Leader rate on 1 is {}", leaderRate); + record(leaderRates, broker.config().nodeId(), leaderRate); if (leaderRate > 0) - LOGGER.trace("Leader Rate on " + broker.config().brokerId() + " is " + leaderRate); + LOGGER.trace("Leader Rate on {} is {}", broker.config().nodeId(), leaderRate); double followerRate = measuredRate(broker, QuotaType.FOLLOWER_REPLICATION); - record(followerRates, broker.config().brokerId(), followerRate); + record(followerRates, broker.config().nodeId(), followerRate); if (followerRate > 0) - LOGGER.trace("Follower Rate on " + broker.config().brokerId() + " is " + followerRate); + LOGGER.trace("Follower Rate on {} is {}", broker.config().nodeId(), followerRate); } } - private double measuredRate(KafkaServer broker, QuotaType repType) { + private double measuredRate(KafkaBroker broker, QuotaType repType) { MetricName metricName = broker.metrics().metricName("byte-rate", repType.toString()); return broker.metrics().metrics().containsKey(metricName) ? (double) broker.metrics().metrics().get(metricName).metricValue() @@ -375,10 +397,10 @@ public class ReplicationQuotasTestRig { return "{\"topics\": [" + topicStr + "],\"version\":1}"; } - KafkaProducer createProducer(String brokerList) { + KafkaProducer createProducer() { return TestUtils.createProducer( - brokerList, - 0, + cluster.bootstrapServers(), + 1, 60 * 1000L, 1024L * 1024L, Integer.MAX_VALUE, @@ -395,31 +417,6 @@ public class ReplicationQuotasTestRig { false ); } - - Properties createBrokerConfig(int brokerId, String zkConnect) { - return TestUtils.createBrokerConfig( - brokerId, - zkConnect, - false, // shorten test time - true, - TestUtils.RandomPort(), - Option.empty(), - Option.empty(), - Option.empty(), - true, - false, - TestUtils.RandomPort(), - false, - TestUtils.RandomPort(), - false, - TestUtils.RandomPort(), - Option.empty(), - 3, - false, - 1, - (short) 1, - false); - } } static class Journal { @@ -476,8 +473,4 @@ public class ReplicationQuotasTestRig { return log.getAbsolutePath(); } } - - private static Seq seq(Collection seq) { - return CollectionConverters.asScala(seq).toSeq(); - } }