MINOR: Optimize ConnectionStressWorker

Optimize ConnectionStressWorker by avoiding creating a new
ChannelBuilder each time we want to open a new connection.

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Gwen Shapira

Closes #6518 from cmccabe/optimize-connection-stress-worker
This commit is contained in:
Colin P. Mccabe 2019-03-29 15:02:10 -07:00 committed by Gwen Shapira
parent 981815c8d1
commit 219c22113e
1 changed files with 102 additions and 62 deletions

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
@ -131,50 +132,42 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
public class ConnectLoop implements Runnable {
@Override
public void run() {
try {
interface Stressor extends AutoCloseable {
static Stressor fromSpec(ConnectionStressSpec spec) {
switch (spec.action()) {
case CONNECT:
return new ConnectStressor(spec);
case FETCH_METADATA:
return new FetchMetadataStressor(spec);
}
throw new RuntimeException("invalid spec.action " + spec.action());
}
boolean tryConnect();
}
static class ConnectStressor implements Stressor {
private final AdminClientConfig conf;
private final ManualMetadataUpdater updater;
private final ChannelBuilder channelBuilder;
ConnectStressor(ConnectionStressSpec spec) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf());
AdminClientConfig conf = new AdminClientConfig(props);
this.conf = new AdminClientConfig(props);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
while (!doneFuture.isDone()) {
throttle.increment();
long lastTimeMs = throttle.lastTimeMs();
boolean success = false;
switch (spec.action()) {
case CONNECT:
success = attemptConnection(conf, updater);
break;
case FETCH_METADATA:
success = attemptMetadataFetch(props);
break;
}
synchronized (ConnectionStressWorker.this) {
totalConnections++;
if (!success) {
totalFailedConnections++;
}
if (lastTimeMs > nextReportTime)
updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
}
this.updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
this.channelBuilder = ClientUtils.createChannelBuilder(conf, TIME);
}
private boolean attemptConnection(AdminClientConfig conf,
ManualMetadataUpdater updater) {
@Override
public boolean tryConnect() {
try {
List<Node> nodes = updater.fetchNodes();
Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, TIME)) {
try (Metrics metrics = new Metrics()) {
LogContext logContext = new LogContext();
try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
@ -197,15 +190,31 @@ public class ConnectionStressWorker implements TaskWorker {
}
}
}
}
return true;
} catch (IOException e) {
return false;
}
}
private boolean attemptMetadataFetch(Properties conf) {
try (AdminClient client = AdminClient.create(conf)) {
@Override
public void close() throws Exception {
Utils.closeQuietly(updater, "ManualMetadataUpdater");
Utils.closeQuietly(channelBuilder, "ChannelBuilder");
}
}
static class FetchMetadataStressor implements Stressor {
private final Properties props;
FetchMetadataStressor(ConnectionStressSpec spec) {
this.props = new Properties();
this.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
WorkerUtils.addConfigsToProperties(this.props, spec.commonClientConf(), spec.commonClientConf());
}
@Override
public boolean tryConnect() {
try (AdminClient client = AdminClient.create(this.props)) {
client.describeCluster().nodes().get();
} catch (RuntimeException e) {
return false;
@ -214,6 +223,37 @@ public class ConnectionStressWorker implements TaskWorker {
}
return true;
}
@Override
public void close() throws Exception {
}
}
public class ConnectLoop implements Runnable {
@Override
public void run() {
Stressor stressor = Stressor.fromSpec(spec);
try {
while (!doneFuture.isDone()) {
throttle.increment();
long lastTimeMs = throttle.lastTimeMs();
boolean success = stressor.tryConnect();
synchronized (ConnectionStressWorker.this) {
totalConnections++;
if (!success) {
totalFailedConnections++;
}
if (lastTimeMs > nextReportTime)
updateStatus(lastTimeMs);
}
}
} catch (Exception e) {
WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture);
} finally {
Utils.closeQuietly(stressor, "stressor");
}
}
}
public static class StatusData {