KAFKA-16769 Remove add.source.alias.to.metrics configuration (#17323)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-10-01 20:03:02 +02:00 committed by GitHub
parent 5377595a5f
commit 7fb25a2b06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 63 additions and 83 deletions

View File

@ -91,11 +91,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
public static final long OFFSET_LAG_MAX_DEFAULT = 100L; public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
public static final String ADD_SOURCE_ALIAS_TO_METRICS = "add.source.alias.to.metrics";
private static final String ADD_SOURCE_ALIAS_TO_METRICS_DOC = "Deprecated. Whether to tag metrics with the source cluster alias. "
+ "Metrics have the target, topic and partition tags. When this setting is enabled, it adds the source tag. "
+ "This configuration will be removed in Kafka 4.0 and the default behavior will be to always have the source tag.";
public static final boolean ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT = false;
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer"; public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer"; public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin"; public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
@ -199,10 +194,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
} }
boolean addSourceAliasToMetrics() {
return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS);
}
boolean emitOffsetSyncsEnabled() { boolean emitOffsetSyncsEnabled() {
return getBoolean(EMIT_OFFSET_SYNCS_ENABLED); return getBoolean(EMIT_OFFSET_SYNCS_ENABLED);
} }
@ -318,12 +309,6 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC) OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
ADD_SOURCE_ALIAS_TO_METRICS,
ConfigDef.Type.BOOLEAN,
ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT,
ConfigDef.Importance.LOW,
ADD_SOURCE_ALIAS_TO_METRICS_DOC)
.define( .define(
EMIT_OFFSET_SYNCS_ENABLED, EMIT_OFFSET_SYNCS_ENABLED,
ConfigDef.Type.BOOLEAN, ConfigDef.Type.BOOLEAN,

View File

@ -56,17 +56,13 @@ class MirrorSourceMetrics implements AutoCloseable {
private final Map<TopicPartition, PartitionMetrics> partitionMetrics; private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
private final String source; private final String source;
private final String target; private final String target;
private final boolean addSourceAlias;
MirrorSourceMetrics(MirrorSourceTaskConfig taskConfig) { MirrorSourceMetrics(MirrorSourceTaskConfig taskConfig) {
this.target = taskConfig.targetClusterAlias(); this.target = taskConfig.targetClusterAlias();
this.source = taskConfig.sourceClusterAlias(); this.source = taskConfig.sourceClusterAlias();
this.addSourceAlias = taskConfig.addSourceAliasToMetrics();
this.metrics = new Metrics(); this.metrics = new Metrics();
Set<String> partitionTags = new HashSet<>(addSourceAlias Set<String> partitionTags = new HashSet<>(Arrays.asList("source", "target", "topic", "partition"));
? Arrays.asList("source", "target", "topic", "partition")
: Arrays.asList("target", "topic", "partition"));
recordCount = new MetricNameTemplate( recordCount = new MetricNameTemplate(
"record-count", SOURCE_CONNECTOR_GROUP, "record-count", SOURCE_CONNECTOR_GROUP,
@ -153,7 +149,7 @@ class MirrorSourceMetrics implements AutoCloseable {
String prefix = topicPartition.topic() + "-" + topicPartition.partition() + "-"; String prefix = topicPartition.topic() + "-" + topicPartition.partition() + "-";
Map<String, String> tags = new LinkedHashMap<>(); Map<String, String> tags = new LinkedHashMap<>();
if (addSourceAlias) tags.put("source", source); tags.put("source", source);
tags.put("target", target); tags.put("target", target);
tags.put("topic", topicPartition.topic()); tags.put("topic", topicPartition.topic());
tags.put("partition", Integer.toString(topicPartition.partition())); tags.put("partition", Integer.toString(topicPartition.partition()));

View File

@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class MirrorSourceMetricsTest { public class MirrorSourceMetricsTest {
@ -59,22 +58,6 @@ public class MirrorSourceMetricsTest {
MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig); MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
metrics.addReporter(reporter); metrics.addReporter(reporter);
metrics.countRecord(SOURCE_TP);
assertEquals(13, reporter.metrics.size());
Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
assertEquals(TARGET, tags.get("target"));
assertEquals(SOURCE_TP.topic(), tags.get("topic"));
assertEquals(String.valueOf(SOURCE_TP.partition()), tags.get("partition"));
assertNull(tags.get("source"));
}
@Test
public void testTagsWithSourceAlias() {
configs.put(MirrorSourceConfig.ADD_SOURCE_ALIAS_TO_METRICS, "true");
MirrorSourceTaskConfig taskConfig = new MirrorSourceTaskConfig(configs);
MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
metrics.addReporter(reporter);
metrics.countRecord(SOURCE_TP); metrics.countRecord(SOURCE_TP);
assertEquals(13, reporter.metrics.size()); assertEquals(13, reporter.metrics.size());
Map<String, String> tags = reporter.metrics.get(0).metricName().tags(); Map<String, String> tags = reporter.metrics.get(0).metricName().tags();

View File

@ -22,51 +22,67 @@
<h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any version 0.8.x through 3.9.x</a></h4> <h4><a id="upgrade_4_0_0" href="#upgrade_4_0_0">Upgrading to 4.0.0 from any version 0.8.x through 3.9.x</a></h4>
<h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4.0.0</a></h5> <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4.0.0</a></h5>
<ul> <ul>
<li>A number of deprecated classes, methods,configurations and tools have been removed from the <code>clients</code>, <code>connect</code>, <code>core</code> and <code>tools</code> modules:</li> <li>A number of deprecated classes, methods, configurations and tools have been removed.
<ul> <ul>
<li> The original MirrorMaker (MM1) and related classes have been removed. Please use the Connect-based <li><b>Common</b>
MirrorMaker (MM2), as described in the <ul>
<a href="/{{version}}/documentation/#georeplication">Geo-Replication section</a>. <li>The <code>metrics.jmx.blacklist</code> and <code>metrics.jmx.whitelist</code> configurations were removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code>
</li> Please use <code>metrics.jmx.exclude</code> and <code>metrics.jmx.include</code> respectively instead.
<li> </li>
The <code>kafka.common.MessageReader</code> class has been removed. Please use the </ul>
<a href="/{{version}}/javadoc/org/apache/kafka/tools/api/RecordReader.html"><code>org.apache.kafka.tools.api.RecordReader</code></a> </li>
interface to build custom readers for the <code>kafka-console-producer</code> tool. <li><b>Broker</b>
</li> <ul>
<li> Remove <code>use.incremental.alter.configs</code>. The modified behavior is identical to the previous <code>required</code> configuration, <li>The <code>delegation.token.master.key</code> configuration was removed.
therefore users should ensure that target broker is at least 2.3.0 Please use <code>delegation.token.secret.key</code> instead.
</li> </li>
<li> Remove <code>delegation.token.master.key</code>. please use <code>delegation.token.secret.key</code> instead of it. </ul>
</li> </li>
<li> <li><b>MirrorMaker</b>
The <code>kafka.tools.DefaultMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.DefaultMessageFormatter</code> class instead. <ul>
</li> <li>The original MirrorMaker (MM1) and related classes were removed. Please use the Connect-based
<li> MirrorMaker (MM2), as described in the <a href="/{{version}}/documentation/#georeplication">Geo-Replication section.</a>.
The <code>kafka.tools.LoggingMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.LoggingMessageFormatter</code> class instead. </li>
</li> <li>The <code>use.incremental.alter.configs</code> configuration was removedfrom <code>MirrorSourceConnector</code>.
<li> The modified behavior is identical to the previous <code>required</code> configuration, therefore users should ensure that brokers in the target cluster are at least running 2.3.0.
The <code>kafka.tools.NoOpMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.NoOpMessageFormatter</code> class instead. </li>
</li> <li>The <code>add.source.alias.to.metrics</code> configuration was removed from <code>MirrorSourceConnector</code>.
<li> The source cluster alias is now always added to the metrics.
The <code>--whitelist</code> option was removed from the <code>kafka-console-consumer</code> command line tool. </li>
Please use <code>--include</code> instead. </ul>
</li> </li>
<li> <li><b>Tools</b>
The <code>--whitelist</code> and <code>--blacklist</code> options were removed from the <code>org.apache.kafka.connect.transforms.ReplaceField</code>. <ul>
Please use <code>--include</code> and <code>--exclude</code> instead. <li>The <code>kafka.common.MessageReader</code> class was removed. Please use the
</li> <a href="/{{version}}/javadoc/org/apache/kafka/tools/api/RecordReader.html"><code>org.apache.kafka.tools.api.RecordReader</code></a>
<li> interface to build custom readers for the <code>kafka-console-producer</code> tool.
The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated. </li>
</li> <li>The <code>kafka.tools.DefaultMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.DefaultMessageFormatter</code> class instead.
<li> </li>
The <code>metrics.jmx.blacklist</code> was removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code> <li>The <code>kafka.tools.LoggingMessageFormatter</code> class was removed. Please use the <code>org.apache.kafka.tools.consumer.LoggingMessageFormatter</code> class instead.
Please use <code>metrics.jmx.exclude</code> instead. </li>
</li> <li>The <code>kafka.tools.NoOpMessageFormatter</code> class has been removed. Please use the <code>org.apache.kafka.tools.consumer.NoOpMessageFormatter</code> class instead.
<li> </li>
The <code>metrics.jmx.whitelist</code> was removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code> <li>The <code>--whitelist</code> option was removed from the <code>kafka-console-consumer</code> command line tool.
Please use <code>metrics.jmx.include</code> instead. Please use <code>--include</code> instead.
</li> </li>
</ul> </ul>
</li>
<li><b>Connect</b>
<ul>
<li>The <code>whitelist</code> and <code>blacklist</code> configurations were removed from the <code>org.apache.kafka.connect.transforms.ReplaceField</code> transformation.
Please use <code>include</code> and <code>exclude</code> respectively instead.
</li>
</ul>
</li>
</ul>
</li>
<li>Other changes:
<ul>
<li>The <code>--delete-config</code> option in the <code>kafka-topics</code> command line tool has been deprecated.
</li>
</ul>
</li>
</ul> </ul>
<h4><a id="upgrade_3_9_0" href="#upgrade_3_9_0">Upgrading to 3.9.0 from any version 0.8.x through 3.8.x</a></h4> <h4><a id="upgrade_3_9_0" href="#upgrade_3_9_0">Upgrading to 3.9.0 from any version 0.8.x through 3.8.x</a></h4>