KAFKA-8853; Create sustained connections test for Trogdor

This creates a test that generates sustained connections against Kafka.  There
are three different components we can stress with this, KafkaConsumer,
KafkaProducer, and AdminClient.  This test tries use minimal bandwidth per
connection to reduce overhead impacts.

This test works by creating a threadpool that creates connections and then
maintains a central pool of connections at a specified keepalive rate.  The
keepalive action varies by which component is being stressed:

  * KafkaProducer:  Sends one single produce record.  The configuration for
    the produce request uses the same key/value generator as the ProduceBench
    test.

  * KafkaConsumer: Subscribes to a single partition, seeks to the end, and
    then polls a minimal number of records.  Each consumer connection is its
    own consumer group, and defaults to 1024 bytes as FETCH_MAX_BYTES to keep
    traffic to a minimum.

  * AdminClient: Makes an API call to get the nodes in the cluster.

NOTE: This test is designed to be run alongside a ProduceBench test for a
specific topic, due to the way the Consumer test polls a single partition.
There may be no data returned by the consumer test if this is run on its own.
The connection should still be kept alive, but with no data returned.

Author: Scott Hendricks <scott.hendricks@confluent.io>

Reviewers: Stanislav Kozlovski, Gwen Shapira

Closes #7289 from scott-hendricks/trunk
This commit is contained in:
Scott Hendricks 2019-09-08 19:49:13 -07:00 committed by Gwen Shapira
parent 0f177ea6b8
commit 39afe9fe0e
3 changed files with 729 additions and 0 deletions

View File

@ -214,6 +214,8 @@
files="SignalLogger.java"/>
<suppress checks="ParameterNumber"
files="ProduceBenchSpec.java"/>
<suppress checks="ParameterNumber"
files="SustainedConnectionSpec.java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.util.Collections;
import java.util.Map;
/**
* The specification for a benchmark that creates sustained connections.
*
* An example JSON representation which will result in a test that creates 27 connections (9 of each), refreshes them
* every 10 seconds using 2 threads, running against topic `topic1`, for a duration of 1 hour, and with various other
* options set:
*
* #{@code
* {
* "class": "org.apache.kafka.trogdor.workload.SustainedConnectionSpec",
* "durationMs": 3600000,
* "clientNode": "node0",
* "bootstrapServers": "localhost:9092",
* "commonClientConf": {
* "compression.type": "lz4",
* "auto.offset.reset": "earliest",
* "linger.ms": "100"
* },
* "keyGenerator": {
* "type": "sequential",
* "size": 4,
* "startOffset": 0
* },
* "valueGenerator": {
* "type": "uniformRandom",
* "size": 512,
* "seed": 0,
* "padding": 0
* },
* "producerConnectionCount": 9,
* "consumerConnectionCount": 9,
* "metadataConnectionCount": 9,
* "topicName": "test-topic1-1",
* "numThreads": 2,
* "refreshRateMs": 10000
* }
* }
*/
public class SustainedConnectionSpec extends TaskSpec {
private final String clientNode;
private final String bootstrapServers;
private final Map<String, String> producerConf;
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
private final PayloadGenerator keyGenerator;
private final PayloadGenerator valueGenerator;
private final int producerConnectionCount;
private final int consumerConnectionCount;
private final int metadataConnectionCount;
private final String topicName;
private final int numThreads;
private final int refreshRateMs;
@JsonCreator
public SustainedConnectionSpec(
@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("clientNode") String clientNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("producerConf") Map<String, String> producerConf,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("adminClientConf") Map<String, String> adminClientConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("producerConnectionCount") int producerConnectionCount,
@JsonProperty("consumerConnectionCount") int consumerConnectionCount,
@JsonProperty("metadataConnectionCount") int metadataConnectionCount,
@JsonProperty("topicName") String topicName,
@JsonProperty("numThreads") int numThreads,
@JsonProperty("refreshRateMs") int refreshRateMs) {
super(startMs, durationMs);
this.clientNode = clientNode == null ? "" : clientNode;
this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers;
this.producerConf = configOrEmptyMap(producerConf);
this.consumerConf = configOrEmptyMap(consumerConf);
this.adminClientConf = configOrEmptyMap(adminClientConf);
this.commonClientConf = configOrEmptyMap(commonClientConf);
this.keyGenerator = keyGenerator;
this.valueGenerator = valueGenerator;
this.producerConnectionCount = producerConnectionCount;
this.consumerConnectionCount = consumerConnectionCount;
this.metadataConnectionCount = metadataConnectionCount;
this.topicName = topicName;
this.numThreads = numThreads < 1 ? 1 : numThreads;
this.refreshRateMs = refreshRateMs < 1 ? 1 : refreshRateMs;
}
@JsonProperty
public String clientNode() {
return clientNode;
}
@JsonProperty
public String bootstrapServers() {
return bootstrapServers;
}
@JsonProperty
public Map<String, String> producerConf() {
return producerConf;
}
@JsonProperty
public Map<String, String> consumerConf() {
return consumerConf;
}
@JsonProperty
public Map<String, String> adminClientConf() {
return adminClientConf;
}
@JsonProperty
public Map<String, String> commonClientConf() {
return commonClientConf;
}
@JsonProperty
public PayloadGenerator keyGenerator() {
return keyGenerator;
}
@JsonProperty
public PayloadGenerator valueGenerator() {
return valueGenerator;
}
@JsonProperty
public int producerConnectionCount() {
return producerConnectionCount;
}
@JsonProperty
public int consumerConnectionCount() {
return consumerConnectionCount;
}
@JsonProperty
public int metadataConnectionCount() {
return metadataConnectionCount;
}
@JsonProperty
public String topicName() {
return topicName;
}
@JsonProperty
public int numThreads() {
return numThreads;
}
@JsonProperty
public int refreshRateMs() {
return refreshRateMs;
}
public TaskController newController(String id) {
return topology -> Collections.singleton(clientNode);
}
@Override
public TaskWorker newTaskWorker(String id) {
return new SustainedConnectionWorker(id, this);
}
}

View File

@ -0,0 +1,531 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class SustainedConnectionWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(SustainedConnectionWorker.class);
private static final SystemTime SYSTEM_TIME = new SystemTime();
// This is the metadata for the test itself.
private final String id;
private final SustainedConnectionSpec spec;
// These variables are used to maintain the connections.
private static final int BACKOFF_PERIOD_MS = 10;
private ExecutorService workerExecutor;
private final AtomicBoolean running = new AtomicBoolean(false);
private KafkaFutureImpl<String> doneFuture;
private ArrayList<SustainedConnection> connections;
// These variables are used when tracking the reported status of the worker.
private static final int REPORT_INTERVAL_MS = 5000;
private WorkerStatusTracker status;
private AtomicLong totalProducerConnections;
private AtomicLong totalProducerFailedConnections;
private AtomicLong totalConsumerConnections;
private AtomicLong totalConsumerFailedConnections;
private AtomicLong totalMetadataConnections;
private AtomicLong totalMetadataFailedConnections;
private AtomicLong totalAbortedThreads;
private Future<?> statusUpdaterFuture;
private ScheduledExecutorService statusUpdaterExecutor;
public SustainedConnectionWorker(String id, SustainedConnectionSpec spec) {
this.id = id;
this.spec = spec;
}
@Override
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("SustainedConnectionWorker is already running.");
}
log.info("{}: Activating SustainedConnectionWorker with {}", this.id, this.spec);
this.doneFuture = doneFuture;
this.status = status;
this.connections = new ArrayList<>();
// Initialize all status reporting metrics to 0.
this.totalProducerConnections = new AtomicLong(0);
this.totalProducerFailedConnections = new AtomicLong(0);
this.totalConsumerConnections = new AtomicLong(0);
this.totalConsumerFailedConnections = new AtomicLong(0);
this.totalMetadataConnections = new AtomicLong(0);
this.totalMetadataFailedConnections = new AtomicLong(0);
this.totalAbortedThreads = new AtomicLong(0);
// Create the worker classes and add them to the list of items to act on.
for (int i = 0; i < this.spec.producerConnectionCount(); i++) {
this.connections.add(new ProducerSustainedConnection());
}
for (int i = 0; i < this.spec.consumerConnectionCount(); i++) {
this.connections.add(new ConsumerSustainedConnection());
}
for (int i = 0; i < this.spec.metadataConnectionCount(); i++) {
this.connections.add(new MetadataSustainedConnection());
}
// Create the status reporter thread and schedule it.
this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1,
ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(
new StatusUpdater(), 0, REPORT_INTERVAL_MS, TimeUnit.MILLISECONDS);
// Create the maintainer pool, add all the maintainer threads, then start it.
this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(),
ThreadUtils.createThreadFactory("SustainedConnectionWorkerThread%d", false));
for (int i = 0; i < this.spec.numThreads(); i++) {
this.workerExecutor.submit(new MaintainLoop());
}
}
private interface SustainedConnection extends AutoCloseable {
boolean needsRefresh(long milliseconds);
void refresh();
void claim();
}
private abstract class ClaimableConnection implements SustainedConnection {
protected long nextUpdate = 0;
protected boolean inUse = false;
protected long refreshRate;
@Override
public boolean needsRefresh(long milliseconds) {
return !this.inUse && (milliseconds > this.nextUpdate);
}
@Override
public void claim() {
this.inUse = true;
}
@Override
public void close() throws Exception {
this.closeQuietly();
}
protected void completeRefresh() {
this.nextUpdate = SustainedConnectionWorker.SYSTEM_TIME.milliseconds() + this.refreshRate;
this.inUse = false;
}
protected abstract void closeQuietly();
}
private class MetadataSustainedConnection extends ClaimableConnection {
private AdminClient client;
private final Properties props;
MetadataSustainedConnection() {
// These variables are used to maintain the connection itself.
this.client = null;
this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
// This variable is used to maintain the connection properties.
this.props = new Properties();
this.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SustainedConnectionWorker.this.spec.bootstrapServers());
WorkerUtils.addConfigsToProperties(
this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.commonClientConf());
}
@Override
public void refresh() {
try {
if (this.client == null) {
// Housekeeping to track the number of opened connections.
SustainedConnectionWorker.this.totalMetadataConnections.incrementAndGet();
// Create the admin client connection.
this.client = AdminClient.create(this.props);
}
// Fetch some metadata to keep the connection alive.
this.client.describeCluster().nodes().get();
} catch (Throwable e) {
// Set the admin client to be recreated on the next cycle.
this.closeQuietly();
// Housekeeping to track the number of opened connections and failed connection attempts.
SustainedConnectionWorker.this.totalMetadataConnections.decrementAndGet();
SustainedConnectionWorker.this.totalMetadataFailedConnections.incrementAndGet();
SustainedConnectionWorker.log.error("Error while refreshing sustained AdminClient connection", e);
}
// Schedule this again and set to not in use.
this.completeRefresh();
}
@Override
protected void closeQuietly() {
Utils.closeQuietly(this.client, "AdminClient");
this.client = null;
}
}
private class ProducerSustainedConnection extends ClaimableConnection {
private KafkaProducer<byte[], byte[]> producer;
private List<TopicPartition> partitions;
private Iterator<TopicPartition> partitionsIterator;
private final String topicName;
private final PayloadIterator keys;
private final PayloadIterator values;
private final Properties props;
ProducerSustainedConnection() {
// These variables are used to maintain the connection itself.
this.producer = null;
this.partitions = null;
this.topicName = SustainedConnectionWorker.this.spec.topicName();
this.partitionsIterator = null;
this.keys = new PayloadIterator(SustainedConnectionWorker.this.spec.keyGenerator());
this.values = new PayloadIterator(SustainedConnectionWorker.this.spec.valueGenerator());
this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
// This variable is used to maintain the connection properties.
this.props = new Properties();
this.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SustainedConnectionWorker.this.spec.bootstrapServers());
WorkerUtils.addConfigsToProperties(
this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.producerConf());
}
@Override
public void refresh() {
try {
if (this.producer == null) {
// Housekeeping to track the number of opened connections.
SustainedConnectionWorker.this.totalProducerConnections.incrementAndGet();
// Create the producer, fetch the specified topic's partitions and randomize them.
this.producer = new KafkaProducer<>(this.props, new ByteArraySerializer(), new ByteArraySerializer());
this.partitions = this.producer.partitionsFor(this.topicName).stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
Collections.shuffle(this.partitions);
}
// Create a new iterator over the partitions if the current one doesn't exist or is exhausted.
if (this.partitionsIterator == null || !this.partitionsIterator.hasNext()) {
this.partitionsIterator = this.partitions.iterator();
}
// Produce a single record and send it synchronously.
TopicPartition partition = this.partitionsIterator.next();
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
partition.topic(), partition.partition(), keys.next(), values.next());
producer.send(record).get();
} catch (Throwable e) {
// Set the producer to be recreated on the next cycle.
this.closeQuietly();
// Housekeeping to track the number of opened connections and failed connection attempts.
SustainedConnectionWorker.this.totalProducerConnections.decrementAndGet();
SustainedConnectionWorker.this.totalProducerFailedConnections.incrementAndGet();
SustainedConnectionWorker.log.error("Error while refreshing sustained KafkaProducer connection", e);
}
// Schedule this again and set to not in use.
this.completeRefresh();
}
@Override
protected void closeQuietly() {
Utils.closeQuietly(this.producer, "KafkaProducer");
this.producer = null;
this.partitions = null;
this.partitionsIterator = null;
}
}
private class ConsumerSustainedConnection extends ClaimableConnection {
private KafkaConsumer<byte[], byte[]> consumer;
private TopicPartition activePartition;
private final String topicName;
private final Random rand;
private final Properties props;
ConsumerSustainedConnection() {
// These variables are used to maintain the connection itself.
this.topicName = SustainedConnectionWorker.this.spec.topicName();
this.consumer = null;
this.activePartition = null;
this.rand = new Random();
this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
// This variable is used to maintain the connection properties.
this.props = new Properties();
this.props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SustainedConnectionWorker.this.spec.bootstrapServers());
this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
this.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
this.props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024);
WorkerUtils.addConfigsToProperties(
this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.consumerConf());
}
@Override
public void refresh() {
try {
if (this.consumer == null) {
// Housekeeping to track the number of opened connections.
SustainedConnectionWorker.this.totalConsumerConnections.incrementAndGet();
// Create the consumer and fetch the partitions for the specified topic.
this.consumer = new KafkaConsumer<>(this.props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
List<TopicPartition> partitions = this.consumer.partitionsFor(this.topicName).stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
// Select a random partition and assign it.
this.activePartition = partitions.get(this.rand.nextInt(partitions.size()));
this.consumer.assign(Collections.singletonList(this.activePartition));
}
// The behavior when passing in an empty list is to seek to the end of all subscribed partitions.
this.consumer.seekToEnd(Collections.emptyList());
// Poll to keep the connection alive, ignoring any records returned.
this.consumer.poll(Duration.ofMillis(50));
} catch (Throwable e) {
// Set the consumer to be recreated on the next cycle.
this.closeQuietly();
// Housekeeping to track the number of opened connections and failed connection attempts.
SustainedConnectionWorker.this.totalConsumerConnections.decrementAndGet();
SustainedConnectionWorker.this.totalConsumerFailedConnections.incrementAndGet();
SustainedConnectionWorker.log.error("Error while refreshing sustained KafkaConsumer connection", e);
}
// Schedule this again and set to not in use.
this.completeRefresh();
}
@Override
protected void closeQuietly() {
Utils.closeQuietly(this.consumer, "KafkaConsumer");
this.consumer = null;
this.activePartition = null;
}
}
public class MaintainLoop implements Runnable {
@Override
public void run() {
try {
while (!doneFuture.isDone()) {
Optional<SustainedConnection> currentConnection = SustainedConnectionWorker.this.findConnectionToMaintain();
if (currentConnection.isPresent()) {
currentConnection.get().refresh();
} else {
SustainedConnectionWorker.SYSTEM_TIME.sleep(SustainedConnectionWorker.BACKOFF_PERIOD_MS);
}
}
} catch (Exception e) {
SustainedConnectionWorker.this.totalAbortedThreads.incrementAndGet();
SustainedConnectionWorker.log.error("Aborted thread while maintaining sustained connections", e);
}
}
}
private synchronized Optional<SustainedConnection> findConnectionToMaintain() {
final long milliseconds = SustainedConnectionWorker.SYSTEM_TIME.milliseconds();
for (SustainedConnection connection : this.connections) {
if (connection.needsRefresh(milliseconds)) {
connection.claim();
return Optional.of(connection);
}
}
return Optional.empty();
}
private class StatusUpdater implements Runnable {
@Override
public void run() {
try {
JsonNode node = JsonUtil.JSON_SERDE.valueToTree(
new StatusData(
SustainedConnectionWorker.this.totalProducerConnections.get(),
SustainedConnectionWorker.this.totalProducerFailedConnections.get(),
SustainedConnectionWorker.this.totalConsumerConnections.get(),
SustainedConnectionWorker.this.totalConsumerFailedConnections.get(),
SustainedConnectionWorker.this.totalMetadataConnections.get(),
SustainedConnectionWorker.this.totalMetadataFailedConnections.get(),
SustainedConnectionWorker.this.totalAbortedThreads.get(),
SustainedConnectionWorker.SYSTEM_TIME.milliseconds()));
status.update(node);
} catch (Exception e) {
SustainedConnectionWorker.log.error("Aborted test while running StatusUpdater", e);
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
}
public static class StatusData {
private final long totalProducerConnections;
private final long totalProducerFailedConnections;
private final long totalConsumerConnections;
private final long totalConsumerFailedConnections;
private final long totalMetadataConnections;
private final long totalMetadataFailedConnections;
private final long totalAbortedThreads;
private final long updatedMs;
@JsonCreator
StatusData(@JsonProperty("totalProducerConnections") long totalProducerConnections,
@JsonProperty("totalProducerFailedConnections") long totalProducerFailedConnections,
@JsonProperty("totalConsumerConnections") long totalConsumerConnections,
@JsonProperty("totalConsumerFailedConnections") long totalConsumerFailedConnections,
@JsonProperty("totalMetadataConnections") long totalMetadataConnections,
@JsonProperty("totalMetadataFailedConnections") long totalMetadataFailedConnections,
@JsonProperty("totalAbortedThreads") long totalAbortedThreads,
@JsonProperty("updatedMs") long updatedMs) {
this.totalProducerConnections = totalProducerConnections;
this.totalProducerFailedConnections = totalProducerFailedConnections;
this.totalConsumerConnections = totalConsumerConnections;
this.totalConsumerFailedConnections = totalConsumerFailedConnections;
this.totalMetadataConnections = totalMetadataConnections;
this.totalMetadataFailedConnections = totalMetadataFailedConnections;
this.totalAbortedThreads = totalAbortedThreads;
this.updatedMs = updatedMs;
}
@JsonProperty
public long totalProducerConnections() {
return totalProducerConnections;
}
@JsonProperty
public long totalProducerFailedConnections() {
return totalProducerFailedConnections;
}
@JsonProperty
public long totalConsumerConnections() {
return totalConsumerConnections;
}
@JsonProperty
public long totalConsumerFailedConnections() {
return totalConsumerFailedConnections;
}
@JsonProperty
public long totalMetadataConnections() {
return totalMetadataConnections;
}
@JsonProperty
public long totalMetadataFailedConnections() {
return totalMetadataFailedConnections;
}
@JsonProperty
public long totalAbortedThreads() {
return totalAbortedThreads;
}
@JsonProperty
public long updatedMs() {
return updatedMs;
}
}
@Override
public void stop(Platform platform) throws Exception {
if (!running.compareAndSet(true, false)) {
throw new IllegalStateException("SustainedConnectionWorker is not running.");
}
log.info("{}: Deactivating SustainedConnectionWorker.", this.id);
// Shut down the periodic status updater and perform a final update on the
// statistics. We want to do this first, before deactivating any threads.
// Otherwise, if some threads take a while to terminate, this could lead
// to a misleading rate getting reported.
this.statusUpdaterFuture.cancel(false);
this.statusUpdaterExecutor.shutdown();
this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.HOURS);
this.statusUpdaterExecutor = null;
new StatusUpdater().run();
doneFuture.complete("");
for (SustainedConnection connection : this.connections) {
connection.close();
}
workerExecutor.shutdownNow();
workerExecutor.awaitTermination(1, TimeUnit.HOURS);
this.workerExecutor = null;
this.status = null;
this.connections = null;
}
}