MINOR: fix throttling and status in ConnectionStressWorker

Each separate thread should have its own throttle, so that it can sleep
for an appropriate amount of time when needed.

ConnectionStressWorker should avoid recalculating the status after
shutting down the runnables.  Otherwise, if one runnable is slow to
stop, it will skew the average down in a way that doesn't reflect
reality.  This change moves the status calculation into a separate
periodic runnable that gets shut down cleanly before the other ones.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Gwen Shapira, Stanislav Kozlovski

Closes #6533 from cmccabe/fix_connection_stress_worker
This commit is contained in:
Colin P. Mccabe 2019-04-04 14:16:56 -07:00 committed by Gwen Shapira
parent c301025484
commit a674ded0b3
1 changed files with 43 additions and 23 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
@ -52,6 +53,8 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -62,7 +65,7 @@ public class ConnectionStressWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
private static final int REPORT_INTERVAL_MS = 20000;
private static final int REPORT_INTERVAL_MS = 5000;
private final String id;
@ -80,12 +83,12 @@ public class ConnectionStressWorker implements TaskWorker {
private long startTimeMs;
private Throttle throttle;
private long nextReportTime;
private Future<?> statusUpdaterFuture;
private ExecutorService workerExecutor;
private ScheduledExecutorService statusUpdaterExecutor;
public ConnectionStressWorker(String id, ConnectionStressSpec spec) {
this.id = id;
this.spec = spec;
@ -103,11 +106,12 @@ public class ConnectionStressWorker implements TaskWorker {
synchronized (ConnectionStressWorker.this) {
this.totalConnections = 0;
this.totalFailedConnections = 0;
this.nextReportTime = 0;
this.startTimeMs = TIME.milliseconds();
}
this.throttle = new ConnectStressThrottle(WorkerUtils.
perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS));
this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(
new StatusUpdater(), 0, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false));
for (int i = 0; i < spec.numThreads(); i++) {
@ -115,17 +119,6 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
/**
* Update the worker's status and next status report time.
*/
private synchronized void updateStatus(long lastTimeMs) {
status.update(JsonUtil.JSON_SERDE.valueToTree(
new StatusData(totalConnections,
totalFailedConnections,
(totalConnections * 1000.0) / (lastTimeMs - startTimeMs))));
nextReportTime = lastTimeMs + REPORT_INTERVAL_MS;
}
private static class ConnectStressThrottle extends Throttle {
ConnectStressThrottle(int maxPerPeriod) {
super(maxPerPeriod, THROTTLE_PERIOD_MS);
@ -233,27 +226,45 @@ public class ConnectionStressWorker implements TaskWorker {
@Override
public void run() {
Stressor stressor = Stressor.fromSpec(spec);
int rate = WorkerUtils.perSecToPerPeriod(
((float) spec.targetConnectionsPerSec()) / spec.numThreads(),
THROTTLE_PERIOD_MS);
Throttle throttle = new ConnectStressThrottle(rate);
try {
while (!doneFuture.isDone()) {
throttle.increment();
long lastTimeMs = throttle.lastTimeMs();
boolean success = stressor.tryConnect();
synchronized (ConnectionStressWorker.this) {
totalConnections++;
if (!success) {
totalFailedConnections++;
}
if (lastTimeMs > nextReportTime)
updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
WorkerUtils.abort(log, "ConnectLoop", e, doneFuture);
} finally {
Utils.closeQuietly(stressor, "stressor");
}
}
}
private class StatusUpdater implements Runnable {
@Override
public void run() {
try {
long lastTimeMs = Time.SYSTEM.milliseconds();
JsonNode node = null;
synchronized (ConnectionStressWorker.this) {
node = JsonUtil.JSON_SERDE.valueToTree(
new StatusData(totalConnections, totalFailedConnections,
(totalConnections * 1000.0) / (lastTimeMs - startTimeMs)));
}
status.update(node);
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
}
public static class StatusData {
@ -292,10 +303,19 @@ public class ConnectionStressWorker implements TaskWorker {
throw new IllegalStateException("ConnectionStressWorker is not running.");
}
log.info("{}: Deactivating ConnectionStressWorker.", id);
// Shut down the periodic status updater and perform a final update on the
// statistics. We want to do this first, before deactivating any threads.
// Otherwise, if some threads take a while to terminate, this could lead
// to a misleading rate getting reported.
this.statusUpdaterFuture.cancel(false);
this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS);
this.statusUpdaterExecutor = null;
new StatusUpdater().run();
doneFuture.complete("");
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.DAYS);
updateStatus(throttle.lastTimeMs());
this.workerExecutor = null;
this.status = null;
}