diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 35f42e3a700..840e5516e07 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -194,6 +194,7 @@
+
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
new file mode 100644
index 00000000000..4195f9b1977
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressSpec.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The specification for a task which connects and disconnects many times a
+ * second to stress the broker.
+ */
+public class ConnectionStressSpec extends TaskSpec {
+ private final String clientNode;
+ private final String bootstrapServers;
+ private final Map commonClientConf;
+ private final int targetConnectionsPerSec;
+ private final int numThreads;
+
+ @JsonCreator
+ public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
+ @JsonProperty("durationMs") long durationMs,
+ @JsonProperty("clientNode") String clientNode,
+ @JsonProperty("bootstrapServers") String bootstrapServers,
+ @JsonProperty("commonClientConf") Map commonClientConf,
+ @JsonProperty("targetConnectionsPerSec") int targetConnectionsPerSec,
+ @JsonProperty("numThreads") int numThreads) {
+ super(startMs, durationMs);
+ this.clientNode = (clientNode == null) ? "" : clientNode;
+ this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
+ this.commonClientConf = configOrEmptyMap(commonClientConf);
+ this.targetConnectionsPerSec = targetConnectionsPerSec;
+ this.numThreads = numThreads < 1 ? 1 : numThreads;
+ }
+
+ @JsonProperty
+ public String clientNode() {
+ return clientNode;
+ }
+
+ @JsonProperty
+ public String bootstrapServers() {
+ return bootstrapServers;
+ }
+
+ @JsonProperty
+ public Map commonClientConf() {
+ return commonClientConf;
+ }
+
+ @JsonProperty
+ public int targetConnectionsPerSec() {
+ return targetConnectionsPerSec;
+ }
+
+ @JsonProperty
+ public int numThreads() {
+ return numThreads;
+ }
+
+ public TaskController newController(String id) {
+ return new TaskController() {
+ @Override
+ public Set targetNodes(Topology topology) {
+ return Collections.singleton(clientNode);
+ }
+ };
+ }
+
+ @Override
+ public TaskWorker newTaskWorker(String id) {
+ return new ConnectionStressWorker(id, this);
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
new file mode 100644
index 00000000000..5d78de8f6fa
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NetworkClientUtils;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.common.WorkerUtils;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ConnectionStressWorker implements TaskWorker {
+ private static final Logger log = LoggerFactory.getLogger(ConnectionStressWorker.class);
+
+ private static final int THROTTLE_PERIOD_MS = 100;
+
+ private static final int REPORT_INTERVAL_MS = 20000;
+
+ private final String id;
+
+ private final ConnectionStressSpec spec;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private KafkaFutureImpl doneFuture;
+
+ private WorkerStatusTracker status;
+
+ private long totalConnections;
+
+ private long totalFailedConnections;
+
+ private long startTimeMs;
+
+ private Throttle throttle;
+
+ private long nextReportTime;
+
+ private ExecutorService workerExecutor;
+
+ public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public void start(Platform platform, WorkerStatusTracker status,
+ KafkaFutureImpl doneFuture) throws Exception {
+ if (!running.compareAndSet(false, true)) {
+ throw new IllegalStateException("ConnectionStressWorker is already running.");
+ }
+ log.info("{}: Activating ConnectionStressWorker with {}", id, spec);
+ this.doneFuture = doneFuture;
+ this.status = status;
+ this.totalConnections = 0;
+ this.totalFailedConnections = 0;
+ this.startTimeMs = Time.SYSTEM.milliseconds();
+ this.throttle = new ConnectStressThrottle(WorkerUtils.
+ perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
+ this.nextReportTime = 0;
+ this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
+ ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
+ for (int i = 0; i < spec.numThreads(); i++) {
+ this.workerExecutor.submit(new ConnectLoop());
+ }
+ }
+
+ private static class ConnectStressThrottle extends Throttle {
+ ConnectStressThrottle(int maxPerPeriod) {
+ super(maxPerPeriod, THROTTLE_PERIOD_MS);
+ }
+ }
+
+ public class ConnectLoop implements Runnable {
+ @Override
+ public void run() {
+ try {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
+ WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
+ AdminClientConfig conf = new AdminClientConfig(props);
+ List addresses = ClientUtils.parseAndValidateAddresses(
+ conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+ ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
+ while (true) {
+ if (doneFuture.isDone()) {
+ break;
+ }
+ throttle.increment();
+ long lastTimeMs = throttle.lastTimeMs();
+ boolean success = attemptConnection(conf, updater);
+ synchronized (ConnectionStressWorker.this) {
+ totalConnections++;
+ if (!success) {
+ totalFailedConnections++;
+ }
+ if (lastTimeMs > nextReportTime) {
+ status.update(JsonUtil.JSON_SERDE.valueToTree(
+ new StatusData(totalConnections,
+ totalFailedConnections,
+ (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
+ nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
+ }
+ }
+ }
+ } catch (Exception e) {
+ WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
+ }
+ }
+
+ private boolean attemptConnection(AdminClientConfig conf,
+ ManualMetadataUpdater updater) throws Exception {
+ try {
+ List nodes = updater.fetchNodes();
+ Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
+ try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf)) {
+ try (Metrics metrics = new Metrics()) {
+ LogContext logContext = new LogContext();
+ try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+ metrics, Time.SYSTEM, "", channelBuilder, logContext)) {
+ try (NetworkClient client = new NetworkClient(selector,
+ updater,
+ "ConnectionStressWorker",
+ 1,
+ 1000,
+ 1000,
+ 4096,
+ 4096,
+ 1000,
+ Time.SYSTEM,
+ false,
+ new ApiVersions(),
+ logContext)) {
+ NetworkClientUtils.awaitReady(client, targetNode, Time.SYSTEM, 100);
+ }
+ }
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ }
+
+ public static class StatusData {
+ private final long totalConnections;
+ private final long totalFailedConnections;
+ private final double connectsPerSec;
+
+ @JsonCreator
+ StatusData(@JsonProperty("totalConnections") long totalConnections,
+ @JsonProperty("totalFailedConnections") long totalFailedConnections,
+ @JsonProperty("connectsPerSec") double connectsPerSec) {
+ this.totalConnections = totalConnections;
+ this.totalFailedConnections = totalFailedConnections;
+ this.connectsPerSec = connectsPerSec;
+ }
+
+ @JsonProperty
+ public long totalConnections() {
+ return totalConnections;
+ }
+
+ @JsonProperty
+ public long totalFailedConnections() {
+ return totalFailedConnections;
+ }
+
+ @JsonProperty
+ public double connectsPerSec() {
+ return connectsPerSec;
+ }
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ if (!running.compareAndSet(true, false)) {
+ throw new IllegalStateException("ConnectionStressWorker is not running.");
+ }
+ log.info("{}: Deactivating ConnectionStressWorker.", id);
+ doneFuture.complete("");
+ workerExecutor.shutdownNow();
+ workerExecutor.awaitTermination(1, TimeUnit.DAYS);
+ this.workerExecutor = null;
+ this.status = null;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
index 41f9d022793..6a99c02a5dc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/Throttle.java
@@ -24,12 +24,14 @@ public class Throttle {
private final int periodMs;
private int count;
private long prevPeriod;
+ private long lastTimeMs;
Throttle(int maxPerPeriod, int periodMs) {
this.maxPerPeriod = maxPerPeriod;
this.periodMs = periodMs;
this.count = maxPerPeriod;
this.prevPeriod = -1;
+ this.lastTimeMs = 0;
}
synchronized public boolean increment() throws InterruptedException {
@@ -39,11 +41,11 @@ public class Throttle {
count++;
return throttled;
}
- long now = time().milliseconds();
- long curPeriod = now / periodMs;
+ lastTimeMs = time().milliseconds();
+ long curPeriod = lastTimeMs / periodMs;
if (curPeriod <= prevPeriod) {
long nextPeriodMs = (curPeriod + 1) * periodMs;
- delay(nextPeriodMs - now);
+ delay(nextPeriodMs - lastTimeMs);
throttled = true;
} else {
prevPeriod = curPeriod;
@@ -52,6 +54,10 @@ public class Throttle {
}
}
+ public synchronized long lastTimeMs() {
+ return lastTimeMs;
+ }
+
protected Time time() {
return Time.SYSTEM;
}