mirror of https://github.com/apache/kafka.git
KAFKA-7428: ConnectionStressSpec: add "action", allow multiple clients (#5668)
This commit is contained in:
parent
f712ce69fc
commit
888423ee56
|
|
@ -24,40 +24,52 @@ import org.apache.kafka.trogdor.task.TaskController;
|
|||
import org.apache.kafka.trogdor.task.TaskSpec;
|
||||
import org.apache.kafka.trogdor.task.TaskWorker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* The specification for a task which connects and disconnects many times a
|
||||
* second to stress the broker.
|
||||
*/
|
||||
public class ConnectionStressSpec extends TaskSpec {
|
||||
private final String clientNode;
|
||||
private final List<String> clientNodes;
|
||||
private final String bootstrapServers;
|
||||
private final Map<String, String> commonClientConf;
|
||||
private final int targetConnectionsPerSec;
|
||||
private final int numThreads;
|
||||
private final ConnectionStressAction action;
|
||||
|
||||
enum ConnectionStressAction {
|
||||
CONNECT,
|
||||
FETCH_METADATA
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public ConnectionStressSpec(@JsonProperty("startMs") long startMs,
|
||||
@JsonProperty("durationMs") long durationMs,
|
||||
@JsonProperty("clientNode") String clientNode,
|
||||
@JsonProperty("clientNode") List<String> clientNodes,
|
||||
@JsonProperty("bootstrapServers") String bootstrapServers,
|
||||
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
|
||||
@JsonProperty("targetConnectionsPerSec") int targetConnectionsPerSec,
|
||||
@JsonProperty("numThreads") int numThreads) {
|
||||
@JsonProperty("numThreads") int numThreads,
|
||||
@JsonProperty("action") ConnectionStressAction action) {
|
||||
super(startMs, durationMs);
|
||||
this.clientNode = (clientNode == null) ? "" : clientNode;
|
||||
this.clientNodes = (clientNodes == null) ? Collections.emptyList() :
|
||||
Collections.unmodifiableList(new ArrayList<>(clientNodes));
|
||||
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
|
||||
this.commonClientConf = configOrEmptyMap(commonClientConf);
|
||||
this.targetConnectionsPerSec = targetConnectionsPerSec;
|
||||
this.numThreads = numThreads < 1 ? 1 : numThreads;
|
||||
this.action = (action == null) ? ConnectionStressAction.CONNECT : action;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String clientNode() {
|
||||
return clientNode;
|
||||
public List<String> clientNode() {
|
||||
return clientNodes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
|
@ -80,11 +92,16 @@ public class ConnectionStressSpec extends TaskSpec {
|
|||
return numThreads;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public ConnectionStressAction action() {
|
||||
return action;
|
||||
}
|
||||
|
||||
public TaskController newController(String id) {
|
||||
return new TaskController() {
|
||||
@Override
|
||||
public Set<String> targetNodes(Topology topology) {
|
||||
return Collections.singleton(clientNode);
|
||||
return new TreeSet<>(clientNodes);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.kafka.clients.ClientUtils;
|
|||
import org.apache.kafka.clients.ManualMetadataUpdater;
|
||||
import org.apache.kafka.clients.NetworkClient;
|
||||
import org.apache.kafka.clients.NetworkClientUtils;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
|
|
@ -132,7 +133,15 @@ public class ConnectionStressWorker implements TaskWorker {
|
|||
}
|
||||
throttle.increment();
|
||||
long lastTimeMs = throttle.lastTimeMs();
|
||||
boolean success = attemptConnection(conf, updater);
|
||||
boolean success = false;
|
||||
switch (spec.action()) {
|
||||
case CONNECT:
|
||||
success = attemptConnection(conf, updater);
|
||||
break;
|
||||
case FETCH_METADATA:
|
||||
success = attemptMetadataFetch(props);
|
||||
break;
|
||||
}
|
||||
synchronized (ConnectionStressWorker.this) {
|
||||
totalConnections++;
|
||||
if (!success) {
|
||||
|
|
@ -185,6 +194,17 @@ public class ConnectionStressWorker implements TaskWorker {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean attemptMetadataFetch(Properties conf) {
|
||||
try (AdminClient client = AdminClient.create(conf)) {
|
||||
client.describeCluster().nodes().get();
|
||||
} catch (RuntimeException e) {
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class StatusData {
|
||||
|
|
|
|||
Loading…
Reference in New Issue