diff --git a/build.gradle b/build.gradle
index 7c93a454cab..d9c455f6c8f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -681,7 +681,8 @@ project(':core') {
'genAdminClientConfigDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs',
'genKafkaConfigDocs', 'genTopicConfigDocs',
':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs',
- ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs'], type: Tar) {
+ ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs',
+ ':connect:runtime:genConnectMetricsDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")
@@ -1189,6 +1190,14 @@ project(':connect:runtime') {
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream()
}
+
+ task genConnectMetricsDocs(type: JavaExec) {
+ classpath = sourceSets.test.runtimeClasspath
+ main = 'org.apache.kafka.connect.runtime.ConnectMetrics'
+ if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
+ standardOutput = new File(generatedDocsDir, "connect_metrics.html").newOutputStream()
+ }
+
}
project(':connect:file') {
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b747e4c681a..7c32eb0a643 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -23,7 +23,7 @@
files="AbstractResponse.java"/>
+ files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index eb0133745f6..974967a9046 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,7 +47,6 @@ import java.util.concurrent.TimeUnit;
public class ConnectMetrics {
public static final String JMX_PREFIX = "kafka.connect";
- public static final String WORKER_ID_TAG_NAME = "worker-id";
private static final Logger LOG = LoggerFactory.getLogger(ConnectMetrics.class);
@@ -54,6 +54,7 @@ public class ConnectMetrics {
private final Time time;
private final String workerId;
private final ConcurrentMap groupsByName = new ConcurrentHashMap<>();
+ private final ConnectMetricsRegistry registry = new ConnectMetricsRegistry();
/**
* Create an instance.
@@ -96,15 +97,12 @@ public class ConnectMetrics {
}
/**
- * Get or create a {@link MetricGroup} with the specified group name.
+ * Get the registry of metric names.
*
- * @param groupName the name of the metric group; may not be null and must be a
- * {@link #checkNameIsValid(String) valid name}
- * @return the {@link MetricGroup} that can be used to create metrics; never null
- * @throws IllegalArgumentException if the group name is not valid
+ * @return the registry for the Connect metrics; never null
*/
- public MetricGroup group(String groupName) {
- return group(groupName, false);
+ public ConnectMetricsRegistry registry() {
+ return registry;
}
/**
@@ -118,22 +116,7 @@ public class ConnectMetrics {
* @throws IllegalArgumentException if the group name is not valid
*/
public MetricGroup group(String groupName, String... tagKeyValues) {
- return group(groupName, false, tagKeyValues);
- }
-
- /**
- * Get or create a {@link MetricGroup} with the specified group name and the given tags.
- * Each group is uniquely identified by the name and tags.
- *
- * @param groupName the name of the metric group; may not be null and must be a
- * {@link #checkNameIsValid(String) valid name}
- * @param includeWorkerId true if the tags should include the worker ID
- * @param tagKeyValues pairs of tag name and values
- * @return the {@link MetricGroup} that can be used to create metrics; never null
- * @throws IllegalArgumentException if the group name is not valid
- */
- public MetricGroup group(String groupName, boolean includeWorkerId, String... tagKeyValues) {
- MetricGroupId groupId = groupId(groupName, includeWorkerId, tagKeyValues);
+ MetricGroupId groupId = groupId(groupName, tagKeyValues);
MetricGroup group = groupsByName.get(groupId);
if (group == null) {
group = new MetricGroup(groupId);
@@ -143,9 +126,9 @@ public class ConnectMetrics {
return group;
}
- protected MetricGroupId groupId(String groupName, boolean includeWorkerId, String... tagKeyValues) {
+ protected MetricGroupId groupId(String groupName, String... tagKeyValues) {
checkNameIsValid(groupName);
- Map tags = tags(includeWorkerId ? workerId : null, tagKeyValues);
+ Map tags = tags(tagKeyValues);
return new MetricGroupId(groupName, tags);
}
@@ -174,8 +157,8 @@ public class ConnectMetrics {
private final String str;
public MetricGroupId(String groupName, Map tags) {
- assert groupName != null;
- assert tags != null;
+ Objects.requireNonNull(groupName);
+ Objects.requireNonNull(tags);
this.groupName = groupName;
this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
this.hc = Objects.hash(this.groupName, this.tags);
@@ -253,21 +236,35 @@ public class ConnectMetrics {
* @param groupId the identifier of the group; may not be null and must be valid
*/
protected MetricGroup(MetricGroupId groupId) {
+ Objects.requireNonNull(groupId);
this.groupId = groupId;
sensorPrefix = "connect-sensor-group: " + groupId.toString() + ";";
}
+ /**
+ * Get the group identifier.
+ *
+ * @return the group identifier; never null
+ */
+ public MetricGroupId groupId() {
+ return groupId;
+ }
+
/**
* Create the name of a metric that belongs to this group and has the group's tags.
*
- * @param name the name of the metric/attribute; may not be null and must be valid
- * @param desc the description for the metric/attribute; may not be null
+ * @param template the name template for the metric; may not be null
* @return the metric name; never null
* @throws IllegalArgumentException if the name is not valid
*/
- public MetricName metricName(String name, String desc) {
- checkNameIsValid(name);
- return metrics.metricName(name, groupId.groupName(), desc, groupId.tags());
+ public MetricName metricName(MetricNameTemplate template) {
+ checkNameIsValid(template.name());
+ return metrics.metricInstance(template, groupId.tags());
+ }
+
+ // for testing only
+ MetricName metricName(String name) {
+ return metrics.metricName(name, groupId.groupName(), "", groupId.tags());
}
/**
@@ -275,7 +272,7 @@ public class ConnectMetrics {
*
* Do not use this to add {@link Sensor Sensors}, since they will not be removed when this group is
* {@link #close() closed}. Metrics can be added directly, as long as the metric names are obtained from
- * this group via the {@link #metricName(String, String)} method.
+ * this group via the {@link #metricName(MetricNameTemplate)} method.
*
* @return the metrics; never null
*/
@@ -295,14 +292,12 @@ public class ConnectMetrics {
/**
* Add to this group an indicator metric with a function that will be used to obtain the indicator state.
*
- * @param name the name of the metric; may not be null and must be a
- * {@link #checkNameIsValid(String) valid name}
- * @param description the description of the metric; may not be null
- * @param predicate the predicate function used to determine the indicator state; may not be null
+ * @param nameTemplate the name template for the metric; may not be null
+ * @param predicate the predicate function used to determine the indicator state; may not be null
* @throws IllegalArgumentException if the name is not valid
*/
- public void addIndicatorMetric(String name, String description, final IndicatorPredicate predicate) {
- MetricName metricName = metricName(name, description);
+ public void addIndicatorMetric(MetricNameTemplate nameTemplate, final IndicatorPredicate predicate) {
+ MetricName metricName = metricName(nameTemplate);
if (metrics().metric(metricName) == null) {
metrics().addMetric(metricName, new Measurable() {
@Override
@@ -411,17 +406,13 @@ public class ConnectMetrics {
* Create a set of tags using the supplied key and value pairs. Every tag name and value will be
* {@link #makeValidName(String) made valid} before it is used. The order of the tags will be kept.
*
- * @param workerId the worker ID that should be included first in the tags; may be null if not to be included
* @param keyValue the key and value pairs for the tags; must be an even number
* @return the map of tags that can be supplied to the {@link Metrics} methods; never null
*/
- static Map tags(String workerId, String... keyValue) {
+ static Map tags(String... keyValue) {
if ((keyValue.length % 2) != 0)
throw new IllegalArgumentException("keyValue needs to be specified in pairs");
Map tags = new LinkedHashMap<>();
- if (workerId != null && !workerId.trim().isEmpty()) {
- tags.put(WORKER_ID_TAG_NAME, makeValidName(workerId));
- }
for (int i = 0; i < keyValue.length; i += 2) {
tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 1]));
}
@@ -457,4 +448,15 @@ public class ConnectMetrics {
throw new IllegalArgumentException("The name '" + name + "' contains at least one invalid character");
}
}
+
+ /**
+ * Utility to generate the documentation for the Connect metrics.
+ *
+ * @param args the arguments
+ */
+ public static void main(String[] args) {
+ ConnectMetricsRegistry metrics = new ConnectMetricsRegistry();
+ System.out.println(Metrics.toHtmlTable("kafka.connect", metrics.getAllTemplates()));
+ }
+
}
\ No newline at end of file
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
new file mode 100644
index 00000000000..ee513c9a9a5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricNameTemplate;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ConnectMetricsRegistry {
+
+ public static final String CONNECTOR_TAG_NAME = "connector";
+ public static final String TASK_TAG_NAME = "task";
+ public static final String CONNECTOR_GROUP_NAME = "connector-metrics";
+ public static final String TASK_GROUP_NAME = "connector-task-metrics";
+ public static final String SOURCE_TASK_GROUP_NAME = "source-task-metrics";
+ public static final String SINK_TASK_GROUP_NAME = "sink-task-metrics";
+
+ private final List allTemplates = new ArrayList<>();
+ public final MetricNameTemplate connectorStatusRunning;
+ public final MetricNameTemplate connectorStatusPaused;
+ public final MetricNameTemplate connectorStatusFailed;
+ public final MetricNameTemplate taskStatusUnassigned;
+ public final MetricNameTemplate taskStatusRunning;
+ public final MetricNameTemplate taskStatusPaused;
+ public final MetricNameTemplate taskStatusFailed;
+ public final MetricNameTemplate taskStatusDestroyed;
+ public final MetricNameTemplate taskRunningRatio;
+ public final MetricNameTemplate taskPauseRatio;
+ public final MetricNameTemplate taskCommitTimeMax;
+ public final MetricNameTemplate taskCommitTimeAvg;
+ public final MetricNameTemplate taskBatchSizeMax;
+ public final MetricNameTemplate taskBatchSizeAvg;
+ public final MetricNameTemplate taskCommitFailurePercentage;
+ public final MetricNameTemplate taskCommitSuccessPercentage;
+ public final MetricNameTemplate sourceRecordPollRate;
+ public final MetricNameTemplate sourceRecordPollTotal;
+ public final MetricNameTemplate sourceRecordWriteRate;
+ public final MetricNameTemplate sourceRecordWriteTotal;
+ public final MetricNameTemplate sourceRecordPollBatchTimeMax;
+ public final MetricNameTemplate sourceRecordPollBatchTimeAvg;
+ public final MetricNameTemplate sourceRecordActiveCount;
+ public final MetricNameTemplate sourceRecordActiveCountMax;
+ public final MetricNameTemplate sourceRecordActiveCountAvg;
+ public final MetricNameTemplate sinkRecordReadRate;
+ public final MetricNameTemplate sinkRecordReadTotal;
+ public final MetricNameTemplate sinkRecordSendRate;
+ public final MetricNameTemplate sinkRecordSendTotal;
+ public final MetricNameTemplate sinkRecordLagMax;
+ public final MetricNameTemplate sinkRecordPartitionCount;
+ public final MetricNameTemplate sinkRecordOffsetCommitSeqNum;
+ public final MetricNameTemplate sinkRecordOffsetCommitCompletionRate;
+ public final MetricNameTemplate sinkRecordOffsetCommitCompletionTotal;
+ public final MetricNameTemplate sinkRecordOffsetCommitSkipRate;
+ public final MetricNameTemplate sinkRecordOffsetCommitSkipTotal;
+ public final MetricNameTemplate sinkRecordPutBatchTimeMax;
+ public final MetricNameTemplate sinkRecordPutBatchTimeAvg;
+ public final MetricNameTemplate sinkRecordActiveCount;
+ public final MetricNameTemplate sinkRecordActiveCountMax;
+ public final MetricNameTemplate sinkRecordActiveCountAvg;
+
+ public ConnectMetricsRegistry() {
+ this(new LinkedHashSet());
+ }
+
+ public ConnectMetricsRegistry(Set tags) {
+ /***** Connector level *****/
+ Set connectorTags = new LinkedHashSet<>(tags);
+ connectorTags.add(CONNECTOR_TAG_NAME);
+
+ connectorStatusRunning = createTemplate("status-running", CONNECTOR_GROUP_NAME,
+ "Signals whether the connector is in the running state.", connectorTags);
+ connectorStatusPaused = createTemplate("status-paused", CONNECTOR_GROUP_NAME,
+ "Signals whether the connector is in the paused state.", connectorTags);
+ connectorStatusFailed = createTemplate("status-failed", CONNECTOR_GROUP_NAME,
+ "Signals whether the connector is in the failed state.", connectorTags);
+
+ /***** Worker task level *****/
+ Set workerTaskTags = new LinkedHashSet<>(tags);
+ workerTaskTags.add(CONNECTOR_TAG_NAME);
+ workerTaskTags.add(TASK_TAG_NAME);
+
+ taskStatusUnassigned = createTemplate("status-unassigned", TASK_GROUP_NAME, "Signals whether this task is in the unassigned state.",
+ workerTaskTags);
+ taskStatusRunning = createTemplate("status-running", TASK_GROUP_NAME, "Signals whether this task is in the running state.",
+ workerTaskTags);
+ taskStatusPaused = createTemplate("status-paused", TASK_GROUP_NAME, "Signals whether this task is in the paused state.",
+ workerTaskTags);
+ taskStatusFailed = createTemplate("status-failed", TASK_GROUP_NAME, "Signals whether this task is in the failed state.",
+ workerTaskTags);
+ taskStatusDestroyed = createTemplate("status-destroyed", TASK_GROUP_NAME, "Signals whether this task is in the destroyed state.",
+ workerTaskTags);
+ taskRunningRatio = createTemplate("running-ratio", TASK_GROUP_NAME,
+ "The fraction of time this task has spent in the running state.", workerTaskTags);
+ taskPauseRatio = createTemplate("pause-ratio", TASK_GROUP_NAME, "The fraction of time this task has spent in the pause state.",
+ workerTaskTags);
+ taskCommitTimeMax = createTemplate("offset-commit-max-time-ms", TASK_GROUP_NAME,
+ "The maximum time in milliseconds taken by this task to commit offsets.", workerTaskTags);
+ taskCommitTimeAvg = createTemplate("offset-commit-avg-time-ms", TASK_GROUP_NAME,
+ "The average time in milliseconds taken by this task to commit offsets.", workerTaskTags);
+ taskBatchSizeMax = createTemplate("batch-size-max", TASK_GROUP_NAME, "The maximum size of the batches processed by the connector.",
+ workerTaskTags);
+ taskBatchSizeAvg = createTemplate("batch-size-avg", TASK_GROUP_NAME, "The average size of the batches processed by the connector.",
+ workerTaskTags);
+ taskCommitFailurePercentage = createTemplate("offset-commit-failure-percentage", TASK_GROUP_NAME,
+ "The average percentage of this task's offset commit attempts that failed.",
+ workerTaskTags);
+ taskCommitSuccessPercentage = createTemplate("offset-commit-success-percentage", TASK_GROUP_NAME,
+ "The average percentage of this task's offset commit attempts that succeeded.",
+ workerTaskTags);
+
+ /***** Source worker task level *****/
+ Set sourceTaskTags = new LinkedHashSet<>(tags);
+ sourceTaskTags.add(CONNECTOR_TAG_NAME);
+ sourceTaskTags.add(TASK_TAG_NAME);
+
+ sourceRecordPollRate = createTemplate("source-record-poll-rate", SOURCE_TASK_GROUP_NAME,
+ "The average per-second number of records produced/polled (before transformation) by " +
+ "this task belonging to the named source connector in this worker.",
+ sourceTaskTags);
+ sourceRecordPollTotal = createTemplate("source-record-poll-total", SOURCE_TASK_GROUP_NAME,
+ "The total number of records produced/polled (before transformation) by this task " +
+ "belonging to the named source connector in this worker.",
+ sourceTaskTags);
+ sourceRecordWriteRate = createTemplate("source-record-write-rate", SOURCE_TASK_GROUP_NAME,
+ "The average per-second number of records output from the transformations and written" +
+ " to Kafka for this task belonging to the named source connector in this worker. This" +
+ " is after transformations are applied and excludes any records filtered out by the " +
+ "transformations.",
+ sourceTaskTags);
+ sourceRecordWriteTotal = createTemplate("source-record-write-total", SOURCE_TASK_GROUP_NAME,
+ "The number of records output from the transformations and written to Kafka for this" +
+ " task belonging to the named source connector in this worker, since the task was " +
+ "last restarted.",
+ sourceTaskTags);
+ sourceRecordPollBatchTimeMax = createTemplate("poll-batch-max-time-ms", SOURCE_TASK_GROUP_NAME,
+ "The maximum time in milliseconds taken by this task to poll for a batch of " +
+ "source records.",
+ sourceTaskTags);
+ sourceRecordPollBatchTimeAvg = createTemplate("poll-batch-avg-time-ms", SOURCE_TASK_GROUP_NAME,
+ "The average time in milliseconds taken by this task to poll for a batch of " +
+ "source records.",
+ sourceTaskTags);
+ sourceRecordActiveCount = createTemplate("source-record-active-count", SOURCE_TASK_GROUP_NAME,
+ "The number of records that have been produced by this task but not yet completely " +
+ "written to Kafka.",
+ sourceTaskTags);
+ sourceRecordActiveCountMax = createTemplate("source-record-active-count-max", SOURCE_TASK_GROUP_NAME,
+ "The maximum number of records that have been produced by this task but not yet " +
+ "completely written to Kafka.",
+ sourceTaskTags);
+ sourceRecordActiveCountAvg = createTemplate("source-record-active-count-avg", SOURCE_TASK_GROUP_NAME,
+ "The average number of records that have been produced by this task but not yet " +
+ "completely written to Kafka.",
+ sourceTaskTags);
+
+ /***** Sink worker task level *****/
+ Set sinkTaskTags = new LinkedHashSet<>(tags);
+ sinkTaskTags.add(CONNECTOR_TAG_NAME);
+ sinkTaskTags.add(TASK_TAG_NAME);
+
+ sinkRecordReadRate = createTemplate("sink-record-read-rate", SINK_TASK_GROUP_NAME,
+ "The average per-second number of records read from Kafka for this task belonging to the" +
+ " named sink connector in this worker. This is before transformations are applied.",
+ sinkTaskTags);
+ sinkRecordReadTotal = createTemplate("sink-record-read-total", SINK_TASK_GROUP_NAME,
+ "The total number of records read from Kafka by this task belonging to the named sink " +
+ "connector in this worker, since the task was last restarted.",
+ sinkTaskTags);
+ sinkRecordSendRate = createTemplate("sink-record-send-rate", SINK_TASK_GROUP_NAME,
+ "The average per-second number of records output from the transformations and sent/put " +
+ "to this task belonging to the named sink connector in this worker. This is after " +
+ "transformations are applied and excludes any records filtered out by the " +
+ "transformations.",
+ sinkTaskTags);
+ sinkRecordSendTotal = createTemplate("sink-record-send-total", SINK_TASK_GROUP_NAME,
+ "The total number of records output from the transformations and sent/put to this task " +
+ "belonging to the named sink connector in this worker, since the task was last " +
+ "restarted.",
+ sinkTaskTags);
+ sinkRecordLagMax = createTemplate("sink-record-lag-max", SINK_TASK_GROUP_NAME,
+ "The maximum lag in terms of number of records that the sink task is behind the consumer's " +
+ "position for any topic partitions.",
+ sinkTaskTags);
+ sinkRecordPartitionCount = createTemplate("partition-count", SINK_TASK_GROUP_NAME,
+ "The number of topic partitions assigned to this task belonging to the named sink " +
+ "connector in this worker.",
+ sinkTaskTags);
+ sinkRecordOffsetCommitSeqNum = createTemplate("offset-commit-seq-no", SINK_TASK_GROUP_NAME,
+ "The current sequence number for offset commits.", sinkTaskTags);
+ sinkRecordOffsetCommitCompletionRate = createTemplate("offset-commit-completion-rate", SINK_TASK_GROUP_NAME,
+ "The average per-second number of offset commit completions that were " +
+ "completed successfully.",
+ sinkTaskTags);
+ sinkRecordOffsetCommitCompletionTotal = createTemplate("offset-commit-completion-total", SINK_TASK_GROUP_NAME,
+ "The total number of offset commit completions that were completed " +
+ "successfully.",
+ sinkTaskTags);
+ sinkRecordOffsetCommitSkipRate = createTemplate("offset-commit-skip-rate", SINK_TASK_GROUP_NAME,
+ "The average per-second number of offset commit completions that were " +
+ "received too late and skipped/ignored.",
+ sinkTaskTags);
+ sinkRecordOffsetCommitSkipTotal = createTemplate("offset-commit-skip-total", SINK_TASK_GROUP_NAME,
+ "The total number of offset commit completions that were received too late " +
+ "and skipped/ignored.",
+ sinkTaskTags);
+ sinkRecordPutBatchTimeMax = createTemplate("put-batch-max-time-ms", SINK_TASK_GROUP_NAME,
+ "The maximum time taken by this task to put a batch of sinks records.", sinkTaskTags);
+ sinkRecordPutBatchTimeAvg = createTemplate("put-batch-avg-time-ms", SINK_TASK_GROUP_NAME,
+ "The average time taken by this task to put a batch of sinks records.", sinkTaskTags);
+ sinkRecordActiveCount = createTemplate("sink-record-active-count", SINK_TASK_GROUP_NAME,
+ "The number of records that have been read from Kafka but not yet completely " +
+ "committed/flushed/acknowledged by the sink task.",
+ sinkTaskTags);
+ sinkRecordActiveCountMax = createTemplate("sink-record-active-count-max", SINK_TASK_GROUP_NAME,
+ "The maximum number of records that have been read from Kafka but not yet completely " +
+ "committed/flushed/acknowledged by the sink task.",
+ sinkTaskTags);
+ sinkRecordActiveCountAvg = createTemplate("sink-record-active-count-avg", SINK_TASK_GROUP_NAME,
+ "The average number of records that have been read from Kafka but not yet completely " +
+ "committed/flushed/acknowledged by the sink task.",
+ sinkTaskTags);
+ }
+
+ private MetricNameTemplate createTemplate(String name, String group, String doc, Set tags) {
+ MetricNameTemplate template = new MetricNameTemplate(name, group, doc, tags);
+ allTemplates.add(template);
+ return template;
+ }
+
+ public List getAllTemplates() {
+ return Collections.unmodifiableList(allTemplates);
+ }
+
+ public String connectorTagName() {
+ return CONNECTOR_TAG_NAME;
+ }
+
+ public String taskTagName() {
+ return TASK_TAG_NAME;
+ }
+
+ public String connectorGroupName() {
+ return CONNECTOR_GROUP_NAME;
+ }
+
+ public String taskGroupName() {
+ return TASK_GROUP_NAME;
+ }
+
+ public String sinkTaskGroupName() {
+ return SINK_TASK_GROUP_NAME;
+ }
+
+ public String sourceTaskGroupName() {
+ return SOURCE_TASK_GROUP_NAME;
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
index 874edd5e07f..21104bd5018 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics.IndicatorPredicate;
@@ -225,19 +226,17 @@ public class WorkerConnector {
public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
this.delegate = delegate;
this.state = initialState;
- this.metricGroup = connectMetrics.group("connector-metrics",
- "connector", connName);
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ this.metricGroup = connectMetrics.group(registry.connectorGroupName(),
+ registry.connectorTagName(), connName);
- addStateMetric(AbstractStatus.State.RUNNING, "status-running",
- "Signals whether the connector task is in the running state.");
- addStateMetric(AbstractStatus.State.PAUSED, "status-paused",
- "Signals whether the connector task is in the paused state.");
- addStateMetric(AbstractStatus.State.FAILED, "status-failed",
- "Signals whether the connector task is in the failed state.");
+ addStateMetric(AbstractStatus.State.RUNNING, registry.connectorStatusRunning);
+ addStateMetric(AbstractStatus.State.PAUSED, registry.connectorStatusPaused);
+ addStateMetric(AbstractStatus.State.FAILED, registry.connectorStatusFailed);
}
- private void addStateMetric(final AbstractStatus.State matchingState, String name, String description) {
- metricGroup.addIndicatorMetric(name, description, new IndicatorPredicate() {
+ private void addStateMetric(final AbstractStatus.State matchingState, MetricNameTemplate nameTemplate) {
+ metricGroup.addIndicatorMetric(nameTemplate, new IndicatorPredicate() {
@Override
public boolean matches() {
return state == matchingState;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 3cb68b7586d..234ce8adf14 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -268,7 +268,9 @@ class WorkerSinkTask extends WorkerTask {
log.info("{} Sink task finished initialization and start", this);
}
- /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
+ /**
+ * Poll for new messages with the given timeout. Should only be invoked by the worker thread.
+ */
protected void poll(long timeoutMs) {
rewind();
long retryTimeout = context.timeout();
@@ -627,6 +629,8 @@ class WorkerSinkTask extends WorkerTask {
}
static class SinkTaskMetricsGroup {
+ private final ConnectorTaskId id;
+ private final ConnectMetrics metrics;
private final MetricGroup metricGroup;
private final Sensor sinkRecordRead;
private final Sensor sinkRecordSend;
@@ -641,77 +645,44 @@ class WorkerSinkTask extends WorkerTask {
private Map committedOffsets = new HashMap<>();
public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
- metricGroup = connectMetrics.group("sink-task-metrics",
- "connector", id.connector(), "task", Integer.toString(id.task()));
+ this.metrics = connectMetrics;
+ this.id = id;
+
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics
+ .group(registry.sinkTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(),
+ Integer.toString(id.task()));
sinkRecordRead = metricGroup.metrics().sensor("sink-record-read");
- sinkRecordRead.add(metricGroup.metricName("sink-record-read-rate",
- "The average per-second number of records read from Kafka for this task belonging to the " +
- "named sink connector in this worker. This is before transformations are applied."),
- new Rate());
- sinkRecordRead.add(metricGroup.metricName("sink-record-read-total",
- "The total number of records produced/polled (before transformation) by this task belonging " +
- "to the named sink connector in this worker, since the task was last restarted."),
- new Total());
+ sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new Rate());
+ sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new Total());
sinkRecordSend = metricGroup.metrics().sensor("sink-record-send");
- sinkRecordSend.add(metricGroup.metricName("sink-record-send-rate",
- "The average per-second number of records output from the transformations and sent/put to " +
- "this task belonging to the named sink connector in this worker. This is after transformations " +
- "are applied and excludes any records filtered out by the transformations."),
- new Rate());
- sinkRecordSend.add(metricGroup.metricName("sink-record-send-total",
- "The total number of records output from the transformations and sent/put to this task " +
- "belonging to the named sink connector in this worker, since the task was last restarted."),
- new Total());
+ sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new Rate());
+ sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new Total());
sinkRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
- sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count",
- "The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
- "by the sink task"),
- new Value());
- sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-max",
- "The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
- "by the sink task"),
- new Max());
- sinkRecordActiveCount.add(metricGroup.metricName("sink-record-active-count-avg",
- "The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged" +
- "by the sink task"),
- new Avg());
+ sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount), new Value());
+ sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountMax), new Max());
+ sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCountAvg), new Avg());
partitionCount = metricGroup.metrics().sensor("partition-count");
- partitionCount.add(metricGroup.metricName("partition-count",
- "The number of topic partitions assigned to this task belonging to the named sink connector in this worker."),
- new Value());
+ partitionCount.add(metricGroup.metricName(registry.sinkRecordPartitionCount), new Value());
offsetSeqNum = metricGroup.metrics().sensor("offset-seq-number");
- offsetSeqNum.add(metricGroup.metricName("offset-commit-seq-no",
- "The current sequence number for offset commits"),
- new Value());
+ offsetSeqNum.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), new Value());
offsetCompletion = metricGroup.metrics().sensor("offset-commit-completion");
- offsetCompletion.add(metricGroup.metricName("offset-commit-completion-rate",
- "The average per-second number of offset commit completions that were completed successfully."),
- new Rate());
- offsetCompletion.add(metricGroup.metricName("offset-commit-completion-total",
- "The total number of offset commit completions that were completed successfully."),
- new Total());
+ offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), new Rate());
+ offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), new Total());
offsetCompletionSkip = metricGroup.metrics().sensor("offset-commit-completion-skip");
- offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-rate",
- "The average per-second number of offset commit completions that were received too late and skipped/ignored."),
- new Rate());
- offsetCompletionSkip.add(metricGroup.metricName("offset-commit-completion-skip-total",
- "The total number of offset commit completions that were received too late and skipped/ignored."),
- new Total());
+ offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), new Rate());
+ offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), new Total());
putBatchTime = metricGroup.metrics().sensor("put-batch-time");
- putBatchTime.add(metricGroup.metricName("put-batch-max-time-ms",
- "The maximum time taken by this task to put a batch of sinks records."),
- new Max());
- putBatchTime.add(metricGroup.metricName("put-batch-avg-time-ms",
- "The average time taken by this task to put a batch of sinks records."),
- new Avg());
+ putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), new Max());
+ putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), new Avg());
}
void computeSinkRecordLag() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 9a187d305ab..9072cd47c81 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -490,51 +490,27 @@ class WorkerSourceTask extends WorkerTask {
private int activeRecordCount;
public SourceTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
- metricGroup = connectMetrics.group("source-task-metrics",
- "connector", id.connector(), "task", Integer.toString(id.task()));
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics.group(registry.sourceTaskGroupName(),
+ registry.connectorTagName(), id.connector(),
+ registry.taskTagName(), Integer.toString(id.task()));
sourceRecordPoll = metricGroup.sensor("source-record-poll");
- sourceRecordPoll.add(metricGroup.metricName("source-record-poll-rate",
- "The average per-second number of records produced/polled (before transformation) by this " +
- "task belonging to the named source connector in this worker."),
- new Rate());
- sourceRecordPoll.add(metricGroup.metricName("source-record-poll-total",
- "The number of records produced/polled (before transformation) by this task belonging to " +
- "the named source connector in this worker, since the task was last restarted."),
- new Total());
+ sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new Rate());
+ sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal), new Total());
sourceRecordWrite = metricGroup.sensor("source-record-write");
- sourceRecordWrite.add(metricGroup.metricName("source-record-write-rate",
- "The average per-second number of records output from the transformations and written to " +
- "Kafka for this task belonging to the named source connector in this worker. " +
- "This is after transformations are applied and excludes any records filtered out " +
- "by the transformations."),
- new Rate());
- sourceRecordWrite.add(metricGroup.metricName("source-record-write-total",
- "The number of records output from the transformations and written to Kafka for this task " +
- "belonging to the named source connector in this worker, since the task was last " +
- "restarted."),
- new Total());
-
+ sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate), new Rate());
+ sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal), new Total());
pollTime = metricGroup.sensor("poll-batch-time");
- pollTime.add(metricGroup.metricName("poll-batch-max-time-ms",
- "The maximum time in milliseconds taken by this task to poll for a batch of source records"),
- new Max());
- pollTime.add(metricGroup.metricName("poll-batch-avg-time-ms",
- "The average time in milliseconds taken by this task to poll for a batch of source records"),
- new Avg());
+ pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new Max());
+ pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeAvg), new Avg());
- sourceRecordActiveCount = metricGroup.metrics().sensor("source-record-active-count");
- sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count",
- "The number of records that have been produced by this task but not yet completely written to Kafka."),
- new Value());
- sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-max",
- "The maximum number of records that have been produced by this task but not yet completely written to Kafka."),
- new Max());
- sourceRecordActiveCount.add(metricGroup.metricName("source-record-active-count-avg",
- "The average number of records that have been produced by this task but not yet completely written to Kafka."),
- new Avg());
+ sourceRecordActiveCount = metricGroup.metrics().sensor("sink-record-active-count");
+ sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCount), new Value());
+ sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountMax), new Max());
+ sourceRecordActiveCount.add(metricGroup.metricName(registry.sourceRecordActiveCountAvg), new Avg());
}
void close() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index e4af5164d1e..6499ac24dda 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
@@ -307,52 +308,37 @@ abstract class WorkerTask implements Runnable {
delegateListener = statusListener;
time = connectMetrics.time();
taskStateTimer = new StateTracker();
- metricGroup = connectMetrics.group("connector-tasks",
- "connector", id.connector(), "task", Integer.toString(id.task()));
+ ConnectMetricsRegistry registry = connectMetrics.registry();
+ metricGroup = connectMetrics.group(registry.taskGroupName(),
+ registry.connectorTagName(), id.connector(),
+ registry.taskTagName(), Integer.toString(id.task()));
- addTaskStateMetric(State.UNASSIGNED, "status-unassigned",
- "Signals whether the connector task is in the unassigned state.");
- addTaskStateMetric(State.RUNNING, "status-running",
- "Signals whether the connector task is in the running state.");
- addTaskStateMetric(State.PAUSED, "status-paused",
- "Signals whether the connector task is in the paused state.");
- addTaskStateMetric(State.FAILED, "status-failed",
- "Signals whether the connector task is in the failed state.");
- addTaskStateMetric(State.DESTROYED, "status-destroyed",
- "Signals whether the connector task is in the destroyed state.");
+ addTaskStateMetric(State.UNASSIGNED, registry.taskStatusUnassigned);
+ addTaskStateMetric(State.RUNNING, registry.taskStatusRunning);
+ addTaskStateMetric(State.PAUSED, registry.taskStatusPaused);
+ addTaskStateMetric(State.FAILED, registry.taskStatusDestroyed);
+ addTaskStateMetric(State.DESTROYED, registry.taskStatusDestroyed);
- addRatioMetric(State.RUNNING, "running-ratio",
- "The fraction of time this task has spent in the running state.");
- addRatioMetric(State.PAUSED, "pause-ratio",
- "The fraction of time this task has spent in the paused state.");
+ addRatioMetric(State.RUNNING, registry.taskRunningRatio);
+ addRatioMetric(State.PAUSED, registry.taskPauseRatio);
commitTime = metricGroup.sensor("commit-time");
- commitTime.add(metricGroup.metricName("offset-commit-max-time-ms",
- "The maximum time in milliseconds taken by this task to commit offsets"),
- new Max());
- commitTime.add(metricGroup.metricName("offset-commit-avg-time-ms",
- "The average time in milliseconds taken by this task to commit offsets"),
- new Avg());
+ commitTime.add(metricGroup.metricName(registry.taskCommitTimeMax), new Max());
+ commitTime.add(metricGroup.metricName(registry.taskCommitTimeAvg), new Avg());
batchSize = metricGroup.sensor("batch-size");
- batchSize.add(metricGroup.metricName("batch-size-max",
- "The maximum size of the batches processed by the connector"),
- new Max());
- batchSize.add(metricGroup.metricName("batch-size-avg",
- "The average size of the batches processed by the connector"),
- new Avg());
+ batchSize.add(metricGroup.metricName(registry.taskBatchSizeMax), new Max());
+ batchSize.add(metricGroup.metricName(registry.taskBatchSizeAvg), new Avg());
- MetricName offsetCommitFailures = metricGroup.metricName("offset-commit-failure-percentage",
- "The average percentage of this task's offset commit attempts that failed");
- MetricName offsetCommitSucceeds = metricGroup.metricName("offset-commit-success-percentage",
- "The average percentage of this task's offset commit attempts that failed");
+ MetricName offsetCommitFailures = metricGroup.metricName(registry.taskCommitFailurePercentage);
+ MetricName offsetCommitSucceeds = metricGroup.metricName(registry.taskCommitSuccessPercentage);
Frequencies commitFrequencies = Frequencies.forBooleanValues(offsetCommitFailures, offsetCommitSucceeds);
commitAttempts = metricGroup.sensor("offset-commit-completion");
commitAttempts.add(commitFrequencies);
}
- private void addTaskStateMetric(final State matchingState, String name, String description) {
- metricGroup.addIndicatorMetric(name, description, new IndicatorPredicate() {
+ private void addTaskStateMetric(final State matchingState, MetricNameTemplate template) {
+ metricGroup.addIndicatorMetric(template, new IndicatorPredicate() {
@Override
public boolean matches() {
return matchingState == taskStateTimer.currentState();
@@ -360,8 +346,8 @@ abstract class WorkerTask implements Runnable {
});
}
- private void addRatioMetric(final State matchingState, String name, String description) {
- MetricName metricName = metricGroup.metricName(name, description);
+ private void addRatioMetric(final State matchingState, MetricNameTemplate template) {
+ MetricName metricName = metricGroup.metricName(template);
if (metricGroup.metrics().metric(metricName) == null) {
metricGroup.metrics().addMetric(metricName, new Measurable() {
@Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 6de0638456e..a16ab4115dc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -52,7 +52,8 @@ public class ConnectMetricsTest {
@After
public void tearDown() {
- if (metrics != null) metrics.stop();
+ if (metrics != null)
+ metrics.stop();
}
@Test
@@ -85,53 +86,29 @@ public class ConnectMetricsTest {
}
@Test
- public void testCreatingTagsWithNonNullWorkerId() {
- Map tags = ConnectMetrics.tags("name", "k1", "v1", "k2", "v2");
+ public void testCreatingTags() {
+ Map tags = ConnectMetrics.tags("k1", "v1", "k2", "v2");
assertEquals("v1", tags.get("k1"));
assertEquals("v2", tags.get("k2"));
- assertEquals("name", tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
- }
-
- @Test
- public void testCreatingTagsWithNullWorkerId() {
- Map tags = ConnectMetrics.tags(null, "k1", "v1", "k2", "v2");
- assertEquals("v1", tags.get("k1"));
- assertEquals("v2", tags.get("k2"));
- assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
- }
-
- @Test
- public void testCreatingTagsWithEmptyWorkerId() {
- Map tags = ConnectMetrics.tags("", "k1", "v1", "k2", "v2");
- assertEquals("v1", tags.get("k1"));
- assertEquals("v2", tags.get("k2"));
- assertEquals(null, tags.get(ConnectMetrics.WORKER_ID_TAG_NAME));
+ assertEquals(2, tags.size());
}
@Test(expected = IllegalArgumentException.class)
public void testCreatingTagsWithOddNumberOfTags() {
- ConnectMetrics.tags("name", "k1", "v1", "k2", "v2", "extra");
+ ConnectMetrics.tags("k1", "v1", "k2", "v2", "extra");
}
@Test(expected = IllegalArgumentException.class)
public void testGettingGroupWithOddNumberOfTags() {
- metrics.group("name", false, "k1", "v1", "k2", "v2", "extra");
+ metrics.group("name", "k1", "v1", "k2", "v2", "extra");
}
@Test
public void testGettingGroupWithTags() {
- MetricGroup group1 = metrics.group("name", false, "k1", "v1", "k2", "v2");
+ MetricGroup group1 = metrics.group("name", "k1", "v1", "k2", "v2");
assertEquals("v1", group1.tags().get("k1"));
assertEquals("v2", group1.tags().get("k2"));
- assertEquals(null, group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
- }
-
- @Test
- public void testGettingGroupWithWorkerIdAndTags() {
- MetricGroup group1 = metrics.group("name", true, "k1", "v1", "k2", "v2");
- assertEquals("v1", group1.tags().get("k1"));
- assertEquals("v2", group1.tags().get("k2"));
- assertEquals(metrics.workerId(), group1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
+ assertEquals(2, group1.tags().size());
}
@Test
@@ -156,9 +133,9 @@ public class ConnectMetricsTest {
@Test
public void testMetricGroupIdIdentity() {
- MetricGroupId id1 = metrics.groupId("name", false, "k1", "v1");
- MetricGroupId id2 = metrics.groupId("name", false, "k1", "v1");
- MetricGroupId id3 = metrics.groupId("name", false, "k1", "v1", "k2", "v2");
+ MetricGroupId id1 = metrics.groupId("name", "k1", "v1");
+ MetricGroupId id2 = metrics.groupId("name", "k1", "v1");
+ MetricGroupId id3 = metrics.groupId("name", "k1", "v1", "k2", "v2");
assertEquals(id1.hashCode(), id2.hashCode());
assertEquals(id1, id2);
@@ -172,8 +149,8 @@ public class ConnectMetricsTest {
@Test
public void testMetricGroupIdWithoutTags() {
- MetricGroupId id1 = metrics.groupId("name", false);
- MetricGroupId id2 = metrics.groupId("name", false);
+ MetricGroupId id1 = metrics.groupId("name");
+ MetricGroupId id2 = metrics.groupId("name");
assertEquals(id1.hashCode(), id2.hashCode());
assertEquals(id1, id2);
@@ -183,16 +160,4 @@ public class ConnectMetricsTest {
assertNotNull(id1.tags());
assertNotNull(id2.tags());
}
-
- @Test
- public void testMetricGroupIdWithWorkerId() {
- MetricGroupId id1 = metrics.groupId("name", true);
- assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
- assertEquals("name;worker-id=worker1", id1.toString());
-
- id1 = metrics.groupId("name", true, "k1", "v1", "k2", "v2");
- assertNotNull(metrics.workerId(), id1.tags().get(ConnectMetrics.WORKER_ID_TAG_NAME));
- assertEquals("name;worker-id=worker1;k1=v1;k2=v2", id1.toString()); // maintain order of tags
- }
-
}
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
index f0bb1e2ef71..6cc6db771dc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockConnectMetrics.java
@@ -74,7 +74,7 @@ public class MockConnectMetrics extends ConnectMetrics {
* @return the current value of the metric
*/
public double currentMetricValue(MetricGroup metricGroup, String name) {
- MetricName metricName = metricGroup.metricName(name, "desc");
+ MetricName metricName = metricGroup.metricName(name);
for (MetricsReporter reporter : metrics().reporters()) {
if (reporter instanceof MockMetricsReporter) {
return ((MockMetricsReporter) reporter).currentMetricValue(metricName);
@@ -92,7 +92,7 @@ public class MockConnectMetrics extends ConnectMetrics {
* @return true if the metric is still register, or false if it has been removed
*/
public boolean metricExists(MetricGroup metricGroup, String name) {
- MetricName metricName = metricGroup.metricName(name, "desc");
+ MetricName metricName = metricGroup.metricName(name);
KafkaMetric metric = metricGroup.metrics().metric(metricName);
return metric != null;
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 290cdd0626a..782d66b2ec4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -251,8 +251,8 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -270,8 +270,8 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0333);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
- assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 0.0);
assertTaskMetricValue("status-paused", 1.0);
assertTaskMetricValue("running-ratio", 0.25);
@@ -331,8 +331,8 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-rate", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-rate", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-rate", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -491,7 +491,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count-avg", 0.33333);
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -559,7 +559,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count-avg", 0.333333);
assertSinkMetricValue("offset-commit-seq-no", 0.0);
assertSinkMetricValue("offset-commit-completion-total", 0.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -587,7 +587,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count-avg", 0.2);
assertSinkMetricValue("offset-commit-seq-no", 1.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 0.0);
+ assertSinkMetricValue("offset-commit-skip-total", 0.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -990,7 +990,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count-avg", 0.71429);
assertSinkMetricValue("offset-commit-seq-no", 2.0);
assertSinkMetricValue("offset-commit-completion-total", 1.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+ assertSinkMetricValue("offset-commit-skip-total", 1.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -1025,7 +1025,7 @@ public class WorkerSinkTaskTest {
assertSinkMetricValue("sink-record-active-count-avg", 0.5555555);
assertSinkMetricValue("offset-commit-seq-no", 3.0);
assertSinkMetricValue("offset-commit-completion-total", 2.0);
- assertSinkMetricValue("offset-commit-completion-skip-total", 1.0);
+ assertSinkMetricValue("offset-commit-skip-total", 1.0);
assertTaskMetricValue("status-running", 1.0);
assertTaskMetricValue("status-paused", 0.0);
assertTaskMetricValue("running-ratio", 1.0);
@@ -1099,6 +1099,7 @@ public class WorkerSinkTaskTest {
createTask(initialState);
expectInitializeTask();
+ expectPollInitialAssignment();
expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
expectConversionAndTransformation(1);
@@ -1110,7 +1111,8 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
- workerTask.iteration();
+ workerTask.iteration(); // iter 1 -- initial assignment
+ workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
@@ -1129,18 +1131,19 @@ public class WorkerSinkTaskTest {
createTask(initialState);
expectInitializeTask();
+ expectPollInitialAssignment();
expectConsumerPoll(1, timestamp, timestampType);
expectConversionAndTransformation(1);
Capture> records = EasyMock.newCapture(CaptureType.ALL);
-
sinkTask.put(EasyMock.capture(records));
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
- workerTask.iteration();
+ workerTask.iteration(); // iter 1 -- initial assignment
+ workerTask.iteration(); // iter 2 -- deliver 1 record
SinkRecord record = records.getValue().iterator().next();
@@ -1309,8 +1312,8 @@ public class WorkerSinkTaskTest {
sinkMetricValue("offset-commit-seq-no");
sinkMetricValue("offset-commit-completion-rate");
sinkMetricValue("offset-commit-completion-total");
- sinkMetricValue("offset-commit-completion-skip-rate");
- sinkMetricValue("offset-commit-completion-skip-total");
+ sinkMetricValue("offset-commit-skip-rate");
+ sinkMetricValue("offset-commit-skip-total");
sinkMetricValue("put-batch-max-time-ms");
sinkMetricValue("put-batch-avg-time-ms");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b7d21d515f9..ce297575b08 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -367,6 +367,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
// converted
expectInitializeTask();
+ expectPollInitialAssignment();
expectOnePoll().andAnswer(new IAnswer