From 609c81ec8b190f4812e7008b30fc509ee1656d68 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Mon, 6 Aug 2018 00:47:25 -0700 Subject: [PATCH] KAFKA-7183: Add a trogdor test that creates many connections to brokers (#5393) Reviewers: Ismael Juma , Rajini Sivaram --- checkstyle/import-control.xml | 1 + .../workload/ConnectionStressSpec.java | 96 ++++++++ .../workload/ConnectionStressWorker.java | 232 ++++++++++++++++++ .../kafka/trogdor/workload/Throttle.java | 12 +- 4 files changed, 338 insertions(+), 3 deletions(-) create mode 100644 tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java create mode 100644 tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 35f42e3a700..840e5516e07 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -194,6 +194,7 @@ + diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java new file mode 100644 index 00000000000..4195f9b1977 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.trogdor.task.TaskController; +import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.task.TaskWorker; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The specification for a task which connects and disconnects many times a + * second to stress the broker. + */ +public class ConnectionStressSpec extends TaskSpec { + private final String clientNode; + private final String bootstrapServers; + private final Map commonClientConf; + private final int targetConnectionsPerSec; + private final int numThreads; + + @JsonCreator + public ConnectionStressSpec(@JsonProperty("startMs") long startMs, + @JsonProperty("durationMs") long durationMs, + @JsonProperty("clientNode") String clientNode, + @JsonProperty("bootstrapServers") String bootstrapServers, + @JsonProperty("commonClientConf") Map commonClientConf, + @JsonProperty("targetConnectionsPerSec") int targetConnectionsPerSec, + @JsonProperty("numThreads") int numThreads) { + super(startMs, durationMs); + this.clientNode = (clientNode == null) ? "" : clientNode; + this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; + this.commonClientConf = configOrEmptyMap(commonClientConf); + this.targetConnectionsPerSec = targetConnectionsPerSec; + this.numThreads = numThreads < 1 ? 1 : numThreads; + } + + @JsonProperty + public String clientNode() { + return clientNode; + } + + @JsonProperty + public String bootstrapServers() { + return bootstrapServers; + } + + @JsonProperty + public Map commonClientConf() { + return commonClientConf; + } + + @JsonProperty + public int targetConnectionsPerSec() { + return targetConnectionsPerSec; + } + + @JsonProperty + public int numThreads() { + return numThreads; + } + + public TaskController newController(String id) { + return new TaskController() { + @Override + public Set targetNodes(Topology topology) { + return Collections.singleton(clientNode); + } + }; + } + + @Override + public TaskWorker newTaskWorker(String id) { + return new ConnectionStressWorker(id, this); + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java new file mode 100644 index 00000000000..5d78de8f6fa --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.ManualMetadataUpdater; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.NetworkClientUtils; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.ThreadUtils; +import org.apache.kafka.trogdor.common.WorkerUtils; +import org.apache.kafka.trogdor.task.TaskWorker; +import org.apache.kafka.trogdor.task.WorkerStatusTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ConnectionStressWorker implements TaskWorker { + private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class); + + private static final int THROTTLE_PERIOD_MS = 100; + + private static final int REPORT_INTERVAL_MS = 20000; + + private final String id; + + private final ConnectionStressSpec spec; + + private final AtomicBoolean running = new AtomicBoolean(false); + + private KafkaFutureImpl doneFuture; + + private WorkerStatusTracker status; + + private long totalConnections; + + private long totalFailedConnections; + + private long startTimeMs; + + private Throttle throttle; + + private long nextReportTime; + + private ExecutorService workerExecutor; + + public ConnectionStressWorker(String id, ConnectionStressSpec spec) { + this.id = id; + this.spec = spec; + } + + @Override + public void start(Platform platform, WorkerStatusTracker status, + KafkaFutureImpl doneFuture) throws Exception { + if (!running.compareAndSet(false, true)) { + throw new IllegalStateException("ConnectionStressWorker is already running."); + } + log.info("{}: Activating ConnectionStressWorker with {}", id, spec); + this.doneFuture = doneFuture; + this.status = status; + this.totalConnections = 0; + this.totalFailedConnections = 0; + this.startTimeMs = Time.SYSTEM.milliseconds(); + this.throttle = new ConnectStressThrottle(WorkerUtils. + perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS)); + this.nextReportTime = 0; + this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(), + ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false)); + for (int i = 0; i < spec.numThreads(); i++) { + this.workerExecutor.submit(new ConnectLoop()); + } + } + + private static class ConnectStressThrottle extends Throttle { + ConnectStressThrottle(int maxPerPeriod) { + super(maxPerPeriod, THROTTLE_PERIOD_MS); + } + } + + public class ConnectLoop implements Runnable { + @Override + public void run() { + try { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf()); + AdminClientConfig conf = new AdminClientConfig(props); + List addresses = ClientUtils.parseAndValidateAddresses( + conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); + while (true) { + if (doneFuture.isDone()) { + break; + } + throttle.increment(); + long lastTimeMs = throttle.lastTimeMs(); + boolean success = attemptConnection(conf, updater); + synchronized (ConnectionStressWorker.this) { + totalConnections++; + if (!success) { + totalFailedConnections++; + } + if (lastTimeMs > nextReportTime) { + status.update(JsonUtil.JSON_SERDE.valueToTree( + new StatusData(totalConnections, + totalFailedConnections, + (totalConnections * 1000.0) / (lastTimeMs - startTimeMs)))); + nextReportTime = lastTimeMs + REPORT_INTERVAL_MS; + } + } + } + } catch (Exception e) { + WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture); + } + } + + private boolean attemptConnection(AdminClientConfig conf, + ManualMetadataUpdater updater) throws Exception { + try { + List nodes = updater.fetchNodes(); + Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); + try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf)) { + try (Metrics metrics = new Metrics()) { + LogContext logContext = new LogContext(); + try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + metrics, Time.SYSTEM, "", channelBuilder, logContext)) { + try (NetworkClient client = new NetworkClient(selector, + updater, + "ConnectionStressWorker", + 1, + 1000, + 1000, + 4096, + 4096, + 1000, + Time.SYSTEM, + false, + new ApiVersions(), + logContext)) { + NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 100); + } + } + } + } + return true; + } catch (IOException e) { + return false; + } + } + } + + public static class StatusData { + private final long totalConnections; + private final long totalFailedConnections; + private final double connectsPerSec; + + @JsonCreator + StatusData(@JsonProperty("totalConnections") long totalConnections, + @JsonProperty("totalFailedConnections") long totalFailedConnections, + @JsonProperty("connectsPerSec") double connectsPerSec) { + this.totalConnections = totalConnections; + this.totalFailedConnections = totalFailedConnections; + this.connectsPerSec = connectsPerSec; + } + + @JsonProperty + public long totalConnections() { + return totalConnections; + } + + @JsonProperty + public long totalFailedConnections() { + return totalFailedConnections; + } + + @JsonProperty + public double connectsPerSec() { + return connectsPerSec; + } + } + + @Override + public void stop(Platform platform) throws Exception { + if (!running.compareAndSet(true, false)) { + throw new IllegalStateException("ConnectionStressWorker is not running."); + } + log.info("{}: Deactivating ConnectionStressWorker.", id); + doneFuture.complete(""); + workerExecutor.shutdownNow(); + workerExecutor.awaitTermination(1, TimeUnit.DAYS); + this.workerExecutor = null; + this.status = null; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java index 41f9d022793..6a99c02a5dc 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java @@ -24,12 +24,14 @@ public class Throttle { private final int periodMs; private int count; private long prevPeriod; + private long lastTimeMs; Throttle(int maxPerPeriod, int periodMs) { this.maxPerPeriod = maxPerPeriod; this.periodMs = periodMs; this.count = maxPerPeriod; this.prevPeriod = -1; + this.lastTimeMs = 0; } synchronized public boolean increment() throws InterruptedException { @@ -39,11 +41,11 @@ public class Throttle { count++; return throttled; } - long now = time().milliseconds(); - long curPeriod = now / periodMs; + lastTimeMs = time().milliseconds(); + long curPeriod = lastTimeMs / periodMs; if (curPeriod <= prevPeriod) { long nextPeriodMs = (curPeriod + 1) * periodMs; - delay(nextPeriodMs - now); + delay(nextPeriodMs - lastTimeMs); throttled = true; } else { prevPeriod = curPeriod; @@ -52,6 +54,10 @@ public class Throttle { } } + public synchronized long lastTimeMs() { + return lastTimeMs; + } + protected Time time() { return Time.SYSTEM; }