diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index d42367e8336..e9bbf07e861 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; @@ -52,6 +53,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,7 +65,7 @@ public class ConnectionStressWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; - private static final int REPORT_INTERVAL_MS = 20000; + private static final int REPORT_INTERVAL_MS = 5000; private final String id; @@ -80,12 +83,12 @@ public class ConnectionStressWorker implements TaskWorker { private long startTimeMs; - private Throttle throttle; - - private long nextReportTime; + private Future statusUpdaterFuture; private ExecutorService workerExecutor; + private ScheduledExecutorService statusUpdaterExecutor; + public ConnectionStressWorker(String id, ConnectionStressSpec spec) { this.id = id; this.spec = spec; @@ -103,11 +106,12 @@ public class ConnectionStressWorker implements TaskWorker { synchronized (ConnectionStressWorker.this) { this.totalConnections = 0; this.totalFailedConnections = 0; - this.nextReportTime = 0; this.startTimeMs = TIME.milliseconds(); } - this.throttle = new ConnectStressThrottle(WorkerUtils. - perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS)); + this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1, + ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false)); + this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate( + new StatusUpdater(), 0, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS); this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(), ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false)); for (int i = 0; i < spec.numThreads(); i++) { @@ -115,17 +119,6 @@ public class ConnectionStressWorker implements TaskWorker { } } - /** - * Update the worker's status and next status report time. - */ - private synchronized void updateStatus(long lastTimeMs) { - status.update(JsonUtil.JSON_SERDE.valueToTree( - new StatusData(totalConnections, - totalFailedConnections, - (totalConnections * 1000.0) / (lastTimeMs - startTimeMs)))); - nextReportTime = lastTimeMs + REPORT_INTERVAL_MS; - } - private static class ConnectStressThrottle extends Throttle { ConnectStressThrottle(int maxPerPeriod) { super(maxPerPeriod, THROTTLE_PERIOD_MS); @@ -233,27 +226,45 @@ public class ConnectionStressWorker implements TaskWorker { @Override public void run() { Stressor stressor = Stressor.fromSpec(spec); + int rate = WorkerUtils.perSecToPerPeriod( + ((float) spec.targetConnectionsPerSec()) / spec.numThreads(), + THROTTLE_PERIOD_MS); + Throttle throttle = new ConnectStressThrottle(rate); try { while (!doneFuture.isDone()) { throttle.increment(); - long lastTimeMs = throttle.lastTimeMs(); boolean success = stressor.tryConnect(); synchronized (ConnectionStressWorker.this) { totalConnections++; if (!success) { totalFailedConnections++; } - if (lastTimeMs > nextReportTime) - updateStatus(lastTimeMs); } } } catch (Exception e) { - WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture); + WorkerUtils.abort(log, "ConnectLoop", e, doneFuture); } finally { Utils.closeQuietly(stressor, "stressor"); } } + } + private class StatusUpdater implements Runnable { + @Override + public void run() { + try { + long lastTimeMs = Time.SYSTEM.milliseconds(); + JsonNode node = null; + synchronized (ConnectionStressWorker.this) { + node = JsonUtil.JSON_SERDE.valueToTree( + new StatusData(totalConnections, totalFailedConnections, + (totalConnections * 1000.0) / (lastTimeMs - startTimeMs))); + } + status.update(node); + } catch (Exception e) { + WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); + } + } } public static class StatusData { @@ -292,10 +303,19 @@ public class ConnectionStressWorker implements TaskWorker { throw new IllegalStateException("ConnectionStressWorker is not running."); } log.info("{}: Deactivating ConnectionStressWorker.", id); + + // Shut down the periodic status updater and perform a final update on the + // statistics. We want to do this first, before deactivating any threads. + // Otherwise, if some threads take a while to terminate, this could lead + // to a misleading rate getting reported. + this.statusUpdaterFuture.cancel(false); + this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS); + this.statusUpdaterExecutor = null; + new StatusUpdater().run(); + doneFuture.complete(""); workerExecutor.shutdownNow(); workerExecutor.awaitTermination(1, TimeUnit.DAYS); - updateStatus(throttle.lastTimeMs()); this.workerExecutor = null; this.status = null; }