mirror of https://github.com/apache/kafka.git
KAFKA-14158: Remove auto.include.jmx.reporter configuration (#17360)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
5624bc7c7e
commit
3815339e05
|
@ -19,7 +19,6 @@ package org.apache.kafka.clients;
|
||||||
import org.apache.kafka.common.config.AbstractConfig;
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.config.SaslConfigs;
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
import org.apache.kafka.common.metrics.JmxReporter;
|
|
||||||
import org.apache.kafka.common.metrics.MetricsReporter;
|
import org.apache.kafka.common.metrics.MetricsReporter;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
|
||||||
|
@ -126,14 +125,10 @@ public class CommonClientConfigs {
|
||||||
"\n" +
|
"\n" +
|
||||||
"TRACE level records all possible metrics, capturing every detail about the system's performance and operation. It's best for controlled environments where in-depth analysis is required, though it can introduce significant overhead.";
|
"TRACE level records all possible metrics, capturing every detail about the system's performance and operation. It's best for controlled environments where in-depth analysis is required, though it can introduce significant overhead.";
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
|
||||||
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
|
public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation.";
|
||||||
|
|
||||||
public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
|
public static final String METRICS_CONTEXT_PREFIX = "metrics.context.";
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = "auto.include.jmx.reporter";
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.";
|
|
||||||
|
|
||||||
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
|
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
|
||||||
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
|
public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " +
|
||||||
String.join(", ", SecurityProtocol.names()) + ".";
|
String.join(", ", SecurityProtocol.names()) + ".";
|
||||||
|
@ -305,15 +300,8 @@ public class CommonClientConfigs {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<MetricsReporter> metricsReporters(Map<String, Object> clientIdOverride, AbstractConfig config) {
|
public static List<MetricsReporter> metricsReporters(Map<String, Object> clientIdOverride, AbstractConfig config) {
|
||||||
List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
return config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
MetricsReporter.class, clientIdOverride);
|
MetricsReporter.class, clientIdOverride);
|
||||||
if (config.getBoolean(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG) &&
|
|
||||||
reporters.stream().noneMatch(r -> JmxReporter.class.equals(r.getClass()))) {
|
|
||||||
JmxReporter jmxReporter = new JmxReporter();
|
|
||||||
jmxReporter.configure(config.originals(clientIdOverride));
|
|
||||||
reporters.add(jmxReporter);
|
|
||||||
}
|
|
||||||
return reporters;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Optional<ClientTelemetryReporter> telemetryReporter(String clientId, AbstractConfig config) {
|
public static Optional<ClientTelemetryReporter> telemetryReporter(String clientId, AbstractConfig config) {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
import org.apache.kafka.common.config.SecurityConfig;
|
import org.apache.kafka.common.config.SecurityConfig;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
@ -120,10 +121,6 @@ public class AdminClientConfig extends AbstractConfig {
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
||||||
private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
|
private static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC;
|
|
||||||
|
|
||||||
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
|
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
|
||||||
private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
|
private static final String METRICS_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
|
||||||
|
|
||||||
|
@ -234,18 +231,17 @@ public class AdminClientConfig extends AbstractConfig {
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
METRICS_SAMPLE_WINDOW_MS_DOC)
|
METRICS_SAMPLE_WINDOW_MS_DOC)
|
||||||
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
|
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
|
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
|
Type.LIST,
|
||||||
|
JmxReporter.class.getName(),
|
||||||
|
Importance.LOW,
|
||||||
|
METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(METRICS_RECORDING_LEVEL_CONFIG,
|
.define(METRICS_RECORDING_LEVEL_CONFIG,
|
||||||
Type.STRING,
|
Type.STRING,
|
||||||
Sensor.RecordingLevel.INFO.toString(),
|
Sensor.RecordingLevel.INFO.toString(),
|
||||||
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
|
in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
METRICS_RECORDING_LEVEL_DOC)
|
METRICS_RECORDING_LEVEL_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW,
|
|
||||||
AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
.define(CLIENT_DNS_LOOKUP_CONFIG,
|
.define(CLIENT_DNS_LOOKUP_CONFIG,
|
||||||
Type.STRING,
|
Type.STRING,
|
||||||
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
|
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.config.SecurityConfig;
|
import org.apache.kafka.common.config.SecurityConfig;
|
||||||
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
import org.apache.kafka.common.errors.InvalidConfigurationException;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.requests.JoinGroupRequest;
|
import org.apache.kafka.common.requests.JoinGroupRequest;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
@ -283,12 +284,6 @@ public class ConsumerConfig extends AbstractConfig {
|
||||||
*/
|
*/
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
||||||
|
|
||||||
/**
|
|
||||||
* <code>auto.include.jmx.reporter</code>
|
|
||||||
* */
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <code>check.crcs</code>
|
* <code>check.crcs</code>
|
||||||
*/
|
*/
|
||||||
|
@ -545,15 +540,10 @@ public class ConsumerConfig extends AbstractConfig {
|
||||||
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
Type.LIST,
|
Type.LIST,
|
||||||
Collections.emptyList(),
|
JmxReporter.class.getName(),
|
||||||
new ConfigDef.NonNullValidator(),
|
new ConfigDef.NonNullValidator(),
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
.define(KEY_DESERIALIZER_CLASS_CONFIG,
|
.define(KEY_DESERIALIZER_CLASS_CONFIG,
|
||||||
Type.CLASS,
|
Type.CLASS,
|
||||||
Importance.HIGH,
|
Importance.HIGH,
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.config.SecurityConfig;
|
import org.apache.kafka.common.config.SecurityConfig;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
@ -253,10 +254,6 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
/** <code>metric.reporters</code> */
|
/** <code>metric.reporters</code> */
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
||||||
|
|
||||||
/** <code>auto.include.jmx.reporter</code> */
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
|
|
||||||
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
|
// max.in.flight.requests.per.connection should be less than or equal to 5 when idempotence producer enabled to ensure message ordering
|
||||||
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
|
private static final int MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE = 5;
|
||||||
|
|
||||||
|
@ -449,15 +446,10 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
Type.LIST,
|
Type.LIST,
|
||||||
Collections.emptyList(),
|
JmxReporter.class.getName(),
|
||||||
new ConfigDef.NonNullValidator(),
|
new ConfigDef.NonNullValidator(),
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
|
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
|
||||||
Type.INT,
|
Type.INT,
|
||||||
5,
|
5,
|
||||||
|
|
|
@ -41,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class CommonClientConfigsTest {
|
public class CommonClientConfigsTest {
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private static class TestConfig extends AbstractConfig {
|
private static class TestConfig extends AbstractConfig {
|
||||||
private static final ConfigDef CONFIG;
|
private static final ConfigDef CONFIG;
|
||||||
static {
|
static {
|
||||||
|
@ -71,15 +71,10 @@ public class CommonClientConfigsTest {
|
||||||
SaslConfigs.SASL_MECHANISM_DOC)
|
SaslConfigs.SASL_MECHANISM_DOC)
|
||||||
.define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
.define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
ConfigDef.Type.LIST,
|
ConfigDef.Type.LIST,
|
||||||
Collections.emptyList(),
|
JmxReporter.class.getName(),
|
||||||
new ConfigDef.NonNullValidator(),
|
new ConfigDef.NonNullValidator(),
|
||||||
ConfigDef.Importance.LOW,
|
ConfigDef.Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC);
|
||||||
.define(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
ConfigDef.Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
ConfigDef.Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -133,14 +128,13 @@ public class CommonClientConfigsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testMetricsReporters() {
|
public void testMetricsReporters() {
|
||||||
TestConfig config = new TestConfig(Collections.emptyMap());
|
TestConfig config = new TestConfig(Collections.emptyMap());
|
||||||
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters("clientId", config);
|
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters("clientId", config);
|
||||||
assertEquals(1, reporters.size());
|
assertEquals(1, reporters.size());
|
||||||
assertInstanceOf(JmxReporter.class, reporters.get(0));
|
assertInstanceOf(JmxReporter.class, reporters.get(0));
|
||||||
|
|
||||||
config = new TestConfig(Collections.singletonMap(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false"));
|
config = new TestConfig(Collections.singletonMap(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ""));
|
||||||
reporters = CommonClientConfigs.metricsReporters("clientId", config);
|
reporters = CommonClientConfigs.metricsReporters("clientId", config);
|
||||||
assertTrue(reporters.isEmpty());
|
assertTrue(reporters.isEmpty());
|
||||||
|
|
||||||
|
|
|
@ -445,16 +445,15 @@ public class KafkaAdminClientTest {
|
||||||
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) admin.metrics.reporters().get(0);
|
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) admin.metrics.reporters().get(0);
|
||||||
|
|
||||||
assertEquals(admin.getClientId(), mockMetricsReporter.clientId);
|
assertEquals(admin.getClientId(), mockMetricsReporter.clientId);
|
||||||
assertEquals(2, admin.metrics.reporters().size());
|
assertEquals(1, admin.metrics.reporters().size());
|
||||||
admin.close();
|
admin.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testDisableJmxReporter() {
|
public void testDisableJmxReporter() {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
props.setProperty(AdminClientConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.setProperty(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
|
KafkaAdminClient admin = (KafkaAdminClient) AdminClient.create(props);
|
||||||
assertTrue(admin.metrics.reporters().isEmpty());
|
assertTrue(admin.metrics.reporters().isEmpty());
|
||||||
admin.close();
|
admin.close();
|
||||||
|
|
|
@ -234,7 +234,7 @@ public class KafkaConsumerTest {
|
||||||
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
||||||
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
||||||
|
|
||||||
assertEquals(3, consumer.metricsRegistry().reporters().size());
|
assertEquals(2, consumer.metricsRegistry().reporters().size());
|
||||||
|
|
||||||
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().stream()
|
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metricsRegistry().reporters().stream()
|
||||||
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
|
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
|
||||||
|
@ -243,12 +243,11 @@ public class KafkaConsumerTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(GroupProtocol.class)
|
@EnumSource(GroupProtocol.class)
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) {
|
public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
|
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
|
||||||
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
|
props.setProperty(ConsumerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
|
||||||
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
||||||
assertTrue(consumer.metricsRegistry().reporters().isEmpty());
|
assertTrue(consumer.metricsRegistry().reporters().isEmpty());
|
||||||
|
@ -269,12 +268,11 @@ public class KafkaConsumerTest {
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(GroupProtocol.class)
|
@EnumSource(GroupProtocol.class)
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) {
|
public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
|
props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
|
||||||
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
props.setProperty(ConsumerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer());
|
||||||
assertEquals(1, consumer.metricsRegistry().reporters().size());
|
assertEquals(1, consumer.metricsRegistry().reporters().size());
|
||||||
assertInstanceOf(ClientTelemetryReporter.class, consumer.metricsRegistry().reporters().get(0));
|
assertInstanceOf(ClientTelemetryReporter.class, consumer.metricsRegistry().reporters().get(0));
|
||||||
|
|
|
@ -467,7 +467,7 @@ public class KafkaProducerTest {
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(
|
KafkaProducer<String, String> producer = new KafkaProducer<>(
|
||||||
props, new StringSerializer(), new StringSerializer());
|
props, new StringSerializer(), new StringSerializer());
|
||||||
|
|
||||||
assertEquals(3, producer.metrics.reporters().size());
|
assertEquals(2, producer.metrics.reporters().size());
|
||||||
|
|
||||||
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
|
MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().stream()
|
||||||
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
|
.filter(reporter -> reporter instanceof MockMetricsReporter).findFirst().get();
|
||||||
|
@ -477,11 +477,10 @@ public class KafkaProducerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testDisableJmxAndClientTelemetryReporter() {
|
public void testDisableJmxAndClientTelemetryReporter() {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
|
props.setProperty(ProducerConfig.ENABLE_METRICS_PUSH_CONFIG, "false");
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
|
||||||
assertTrue(producer.metrics.reporters().isEmpty());
|
assertTrue(producer.metrics.reporters().isEmpty());
|
||||||
|
@ -501,11 +500,10 @@ public class KafkaProducerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testExplicitlyOnlyEnableClientTelemetryReporter() {
|
public void testExplicitlyOnlyEnableClientTelemetryReporter() {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
props.setProperty(ProducerConfig.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
|
KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
|
||||||
assertEquals(1, producer.metrics.reporters().size());
|
assertEquals(1, producer.metrics.reporters().size());
|
||||||
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
|
assertInstanceOf(ClientTelemetryReporter.class, producer.metrics.reporters().get(0));
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.ForwardingAdmin;
|
||||||
import org.apache.kafka.common.KafkaException;
|
import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.config.AbstractConfig;
|
import org.apache.kafka.common.config.AbstractConfig;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetricsContext;
|
import org.apache.kafka.common.metrics.KafkaMetricsContext;
|
||||||
import org.apache.kafka.common.metrics.MetricsContext;
|
import org.apache.kafka.common.metrics.MetricsContext;
|
||||||
import org.apache.kafka.common.metrics.MetricsReporter;
|
import org.apache.kafka.common.metrics.MetricsReporter;
|
||||||
|
@ -258,7 +259,6 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
|
||||||
return sourceClusterAlias() + "->" + targetClusterAlias() + "|" + connectorName();
|
return sourceClusterAlias() + "->" + targetClusterAlias() + "|" + connectorName();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
protected static final ConfigDef BASE_CONNECTOR_CONFIG_DEF = new ConfigDef(ConnectorConfig.configDef())
|
protected static final ConfigDef BASE_CONNECTOR_CONFIG_DEF = new ConfigDef(ConnectorConfig.configDef())
|
||||||
.define(
|
.define(
|
||||||
ENABLED,
|
ENABLED,
|
||||||
|
@ -310,7 +310,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
|
||||||
.define(
|
.define(
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
ConfigDef.Type.LIST,
|
ConfigDef.Type.LIST,
|
||||||
null,
|
JmxReporter.class.getName(),
|
||||||
ConfigDef.Importance.LOW,
|
ConfigDef.Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(
|
.define(
|
||||||
|
@ -320,13 +320,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
|
||||||
in(Utils.enumOptions(SecurityProtocol.class)),
|
in(Utils.enumOptions(SecurityProtocol.class)),
|
||||||
ConfigDef.Importance.MEDIUM,
|
ConfigDef.Importance.MEDIUM,
|
||||||
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||||
.define(
|
.withClientSslSupport()
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
ConfigDef.Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
ConfigDef.Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
|
|
||||||
).withClientSslSupport()
|
|
||||||
.withClientSaslSupport();
|
.withClientSaslSupport();
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|
|
@ -187,15 +187,14 @@ public class MirrorConnectorConfigTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testMetricsReporters() {
|
public void testMetricsReporters() {
|
||||||
Map<String, String> connectorProps = makeProps("metric.reporters", MockMetricsReporter.class.getName());
|
Map<String, String> connectorProps = makeProps();
|
||||||
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
|
MirrorConnectorConfig config = new TestMirrorConnectorConfig(connectorProps);
|
||||||
assertEquals(2, config.metricsReporters().size());
|
|
||||||
|
|
||||||
connectorProps.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
|
||||||
config = new TestMirrorConnectorConfig(connectorProps);
|
|
||||||
assertEquals(1, config.metricsReporters().size());
|
assertEquals(1, config.metricsReporters().size());
|
||||||
|
|
||||||
|
connectorProps.put("metric.reporters", JmxReporter.class.getName() + "," + MockMetricsReporter.class.getName());
|
||||||
|
config = new TestMirrorConnectorConfig(connectorProps);
|
||||||
|
assertEquals(2, config.metricsReporters().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.kafka.common.config.AbstractConfig;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
|
@ -160,9 +161,6 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
|
public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
|
|
||||||
public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
|
public static final String TOPIC_TRACKING_ENABLE_CONFIG = "topic.tracking.enable";
|
||||||
protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
|
protected static final String TOPIC_TRACKING_ENABLE_DOC = "Enable tracking the set of active "
|
||||||
+ "topics per connector during runtime.";
|
+ "topics per connector during runtime.";
|
||||||
|
@ -234,13 +232,8 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
|
.define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
|
||||||
"", Importance.LOW,
|
JmxReporter.class.getName(), Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
|
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||||
HEADER_CONVERTER_CLASS_DEFAULT,
|
HEADER_CONVERTER_CLASS_DEFAULT,
|
||||||
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
|
Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
|
||||||
|
|
|
@ -154,10 +154,9 @@ public class ConnectMetricsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public void testDisableJmxReporter() {
|
public void testDisableJmxReporter() {
|
||||||
Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
|
Map<String, String> props = new HashMap<>(DEFAULT_WORKER_CONFIG);
|
||||||
props.put(CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false");
|
props.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
ConnectMetrics cm = new ConnectMetrics("worker-testDisableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
|
ConnectMetrics cm = new ConnectMetrics("worker-testDisableJmxReporter", new WorkerConfig(WorkerConfig.baseConfigDef(), props), new MockTime(), "cluster-1");
|
||||||
assertTrue(cm.metrics().reporters().isEmpty());
|
assertTrue(cm.metrics().reporters().isEmpty());
|
||||||
cm.stop();
|
cm.stop();
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
|
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
|
||||||
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
import org.apache.kafka.common.errors.ClusterAuthorizationException;
|
||||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.MetricsReporter;
|
import org.apache.kafka.common.metrics.MetricsReporter;
|
||||||
import org.apache.kafka.common.metrics.stats.Avg;
|
import org.apache.kafka.common.metrics.stats.Avg;
|
||||||
import org.apache.kafka.common.utils.LogCaptureAppender;
|
import org.apache.kafka.common.utils.LogCaptureAppender;
|
||||||
|
@ -250,7 +251,7 @@ public class WorkerTest {
|
||||||
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
|
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||||
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
|
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
|
||||||
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
|
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
|
||||||
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName() + "," + MockMetricsReporter.class.getName());
|
||||||
workerProps.put("config.providers", "file");
|
workerProps.put("config.providers", "file");
|
||||||
workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
|
workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
|
||||||
mockFileProviderTestId = UUID.randomUUID().toString();
|
mockFileProviderTestId = UUID.randomUUID().toString();
|
||||||
|
|
|
@ -66,7 +66,7 @@ public class WorkerGroupMemberTest {
|
||||||
workerProps.put("offset.storage.topic", "topic-1");
|
workerProps.put("offset.storage.topic", "topic-1");
|
||||||
workerProps.put("config.storage.topic", "topic-1");
|
workerProps.put("config.storage.topic", "topic-1");
|
||||||
workerProps.put("status.storage.topic", "topic-1");
|
workerProps.put("status.storage.topic", "topic-1");
|
||||||
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockConnectMetrics.MockMetricsReporter.class.getName());
|
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, JmxReporter.class.getName() + "," + MockConnectMetrics.MockMetricsReporter.class.getName());
|
||||||
DistributedConfig config = spy(new DistributedConfig(workerProps));
|
DistributedConfig config = spy(new DistributedConfig(workerProps));
|
||||||
doReturn("cluster-1").when(config).kafkaClusterId();
|
doReturn("cluster-1").when(config).kafkaClusterId();
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ public class WorkerGroupMemberTest {
|
||||||
workerProps.put("offset.storage.topic", "topic-1");
|
workerProps.put("offset.storage.topic", "topic-1");
|
||||||
workerProps.put("config.storage.topic", "topic-1");
|
workerProps.put("config.storage.topic", "topic-1");
|
||||||
workerProps.put("status.storage.topic", "topic-1");
|
workerProps.put("status.storage.topic", "topic-1");
|
||||||
workerProps.put("auto.include.jmx.reporter", "false");
|
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, "");
|
||||||
DistributedConfig config = spy(new DistributedConfig(workerProps));
|
DistributedConfig config = spy(new DistributedConfig(workerProps));
|
||||||
doReturn("cluster-1").when(config).kafkaClusterId();
|
doReturn("cluster-1").when(config).kafkaClusterId();
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
|
||||||
import org.apache.kafka.common.Reconfigurable
|
import org.apache.kafka.common.Reconfigurable
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
|
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
|
||||||
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
|
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
|
||||||
import org.apache.kafka.common.config.types.Password
|
import org.apache.kafka.common.config.types.Password
|
||||||
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
|
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
|
||||||
import org.apache.kafka.common.security.authenticator.LoginManager
|
import org.apache.kafka.common.security.authenticator.LoginManager
|
||||||
|
@ -45,7 +45,6 @@ import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfi
|
||||||
import org.apache.kafka.server.telemetry.ClientTelemetry
|
import org.apache.kafka.server.telemetry.ClientTelemetry
|
||||||
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
|
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
import scala.collection._
|
import scala.collection._
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
|
@ -946,14 +945,9 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
|
||||||
currentReporters.remove(className).foreach(metrics.removeReporter)
|
currentReporters.remove(className).foreach(metrics.removeReporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
|
private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
|
||||||
val reporters = mutable.Buffer[String]()
|
val reporters = mutable.Buffer[String]()
|
||||||
reporters ++= configs.get(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG).asInstanceOf[util.List[String]].asScala
|
reporters ++= configs.get(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG).asInstanceOf[util.List[String]].asScala
|
||||||
if (configs.get(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG).asInstanceOf[Boolean] &&
|
|
||||||
!reporters.contains(classOf[JmxReporter].getName)) {
|
|
||||||
reporters += classOf[JmxReporter].getName
|
|
||||||
}
|
|
||||||
reporters
|
reporters
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.kafka.common.config.provider.FileConfigProvider
|
||||||
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
|
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
|
||||||
import org.apache.kafka.common.internals.Topic
|
import org.apache.kafka.common.internals.Topic
|
||||||
import org.apache.kafka.common.message.MetadataRequestData
|
import org.apache.kafka.common.message.MetadataRequestData
|
||||||
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsContext, MetricsReporter, Quota}
|
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, MetricsContext, MetricsReporter, Quota}
|
||||||
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
|
||||||
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
|
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
|
||||||
import org.apache.kafka.common.record.TimestampType
|
import org.apache.kafka.common.record.TimestampType
|
||||||
|
@ -955,7 +955,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
|
||||||
// Add a new metrics reporter
|
// Add a new metrics reporter
|
||||||
val newProps = new Properties
|
val newProps = new Properties
|
||||||
newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
|
newProps.put(TestMetricsReporter.PollingIntervalProp, "100")
|
||||||
configureMetricsReporters(Seq(classOf[TestMetricsReporter]), newProps)
|
configureMetricsReporters(Seq(classOf[JmxReporter], classOf[TestMetricsReporter]), newProps)
|
||||||
|
|
||||||
val reporters = TestMetricsReporter.waitForReporters(servers.size)
|
val reporters = TestMetricsReporter.waitForReporters(servers.size)
|
||||||
reporters.foreach { reporter =>
|
reporters.foreach { reporter =>
|
||||||
|
|
|
@ -674,18 +674,17 @@ class DynamicBrokerConfigTest {
|
||||||
assertEquals(classOf[JmxReporter].getName, m.currentReporters.keySet.head)
|
assertEquals(classOf[JmxReporter].getName, m.currentReporters.keySet.head)
|
||||||
|
|
||||||
val props = new Properties()
|
val props = new Properties()
|
||||||
props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, classOf[MockMetricsReporter].getName)
|
props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, s"${classOf[JmxReporter].getName},${classOf[MockMetricsReporter].getName}")
|
||||||
config.dynamicConfig.updateDefaultConfig(props)
|
config.dynamicConfig.updateDefaultConfig(props)
|
||||||
assertEquals(2, m.currentReporters.size)
|
assertEquals(2, m.currentReporters.size)
|
||||||
assertEquals(Set(classOf[JmxReporter].getName, classOf[MockMetricsReporter].getName), m.currentReporters.keySet)
|
assertEquals(Set(classOf[JmxReporter].getName, classOf[MockMetricsReporter].getName), m.currentReporters.keySet)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def testUpdateMetricReportersNoJmxReporter(): Unit = {
|
def testUpdateMetricReportersNoJmxReporter(): Unit = {
|
||||||
val brokerId = 0
|
val brokerId = 0
|
||||||
val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
|
val origProps = TestUtils.createBrokerConfig(brokerId, null, port = 8181)
|
||||||
origProps.put(MetricConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG, "false")
|
origProps.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, "")
|
||||||
|
|
||||||
val config = KafkaConfig(origProps)
|
val config = KafkaConfig(origProps)
|
||||||
val serverMock = Mockito.mock(classOf[KafkaBroker])
|
val serverMock = Mockito.mock(classOf[KafkaBroker])
|
||||||
|
|
|
@ -29,6 +29,9 @@
|
||||||
<li>The <code>metrics.jmx.blacklist</code> and <code>metrics.jmx.whitelist</code> configurations were removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code>
|
<li>The <code>metrics.jmx.blacklist</code> and <code>metrics.jmx.whitelist</code> configurations were removed from the <code>org.apache.kafka.common.metrics.JmxReporter</code>
|
||||||
Please use <code>metrics.jmx.exclude</code> and <code>metrics.jmx.include</code> respectively instead.
|
Please use <code>metrics.jmx.exclude</code> and <code>metrics.jmx.include</code> respectively instead.
|
||||||
</li>
|
</li>
|
||||||
|
<li>The <code>auto.include.jmx.reporter</code> configuration was removed. The <code>metric.reporters</code> configuration
|
||||||
|
is now set to <code>org.apache.kafka.common.metrics.JmxReporter</code> by default.
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
<li><b>Broker</b>
|
<li><b>Broker</b>
|
||||||
|
|
|
@ -18,11 +18,11 @@ package org.apache.kafka.server.metrics;
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
|
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
|
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
|
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
|
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
|
||||||
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
|
||||||
|
@ -39,18 +39,13 @@ public class MetricConfigs {
|
||||||
public static final String METRIC_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
|
public static final String METRIC_NUM_SAMPLES_DOC = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC;
|
||||||
|
|
||||||
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
|
||||||
public static final String METRIC_REPORTER_CLASSES_DEFAULT = "";
|
public static final String METRIC_REPORTER_CLASSES_DEFAULT = JmxReporter.class.getName();
|
||||||
public static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
|
public static final String METRIC_REPORTER_CLASSES_DOC = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;
|
||||||
|
|
||||||
public static final String METRIC_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
|
public static final String METRIC_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
|
||||||
public static final String METRIC_RECORDING_LEVEL_DEFAULT = Sensor.RecordingLevel.INFO.toString();
|
public static final String METRIC_RECORDING_LEVEL_DEFAULT = Sensor.RecordingLevel.INFO.toString();
|
||||||
public static final String METRIC_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
|
public static final String METRIC_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
public static final boolean AUTO_INCLUDE_JMX_REPORTER_DEFAULT = true;
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_DOC = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC;
|
|
||||||
|
|
||||||
/** ********* Kafka Yammer Metrics Reporters Configuration ***********/
|
/** ********* Kafka Yammer Metrics Reporters Configuration ***********/
|
||||||
public static final String KAFKA_METRICS_REPORTER_CLASSES_CONFIG = "kafka.metrics.reporters";
|
public static final String KAFKA_METRICS_REPORTER_CLASSES_CONFIG = "kafka.metrics.reporters";
|
||||||
public static final String KAFKA_METRIC_REPORTER_CLASSES_DEFAULT = "";
|
public static final String KAFKA_METRIC_REPORTER_CLASSES_DEFAULT = "";
|
||||||
|
@ -76,7 +71,6 @@ public class MetricConfigs {
|
||||||
.define(METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, METRIC_SAMPLE_WINDOW_MS_DOC)
|
.define(METRIC_SAMPLE_WINDOW_MS_CONFIG, LONG, METRIC_SAMPLE_WINDOW_MS_DEFAULT, atLeast(1), LOW, METRIC_SAMPLE_WINDOW_MS_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG, LIST, METRIC_REPORTER_CLASSES_DEFAULT, LOW, METRIC_REPORTER_CLASSES_DOC)
|
.define(METRIC_REPORTER_CLASSES_CONFIG, LIST, METRIC_REPORTER_CLASSES_DEFAULT, LOW, METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(METRIC_RECORDING_LEVEL_CONFIG, STRING, METRIC_RECORDING_LEVEL_DEFAULT, LOW, METRIC_RECORDING_LEVEL_DOC)
|
.define(METRIC_RECORDING_LEVEL_CONFIG, STRING, METRIC_RECORDING_LEVEL_DEFAULT, LOW, METRIC_RECORDING_LEVEL_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG, BOOLEAN, AUTO_INCLUDE_JMX_REPORTER_DEFAULT, LOW, AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
|
|
||||||
// Kafka Yammer Metrics Reporter Configuration
|
// Kafka Yammer Metrics Reporter Configuration
|
||||||
.define(KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, KAFKA_METRICS_REPORTER_CLASSES_DOC)
|
.define(KAFKA_METRICS_REPORTER_CLASSES_CONFIG, LIST, KAFKA_METRIC_REPORTER_CLASSES_DEFAULT, LOW, KAFKA_METRICS_REPORTER_CLASSES_DOC)
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.config.TopicConfig;
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
|
import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
@ -631,11 +632,6 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
@SuppressWarnings("WeakerAccess")
|
@SuppressWarnings("WeakerAccess")
|
||||||
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
|
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
|
||||||
|
|
||||||
/** {@code auto.include.jmx.reporter}
|
|
||||||
* @deprecated and will be removed in 4.0.0 */
|
|
||||||
@Deprecated
|
|
||||||
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;
|
|
||||||
|
|
||||||
/** {@code num.standby.replicas} */
|
/** {@code num.standby.replicas} */
|
||||||
@SuppressWarnings("WeakerAccess")
|
@SuppressWarnings("WeakerAccess")
|
||||||
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
|
public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
|
||||||
|
@ -1095,7 +1091,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
|
CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
|
||||||
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
.define(METRIC_REPORTER_CLASSES_CONFIG,
|
||||||
Type.LIST,
|
Type.LIST,
|
||||||
"",
|
JmxReporter.class.getName(),
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||||
.define(METRICS_RECORDING_LEVEL_CONFIG,
|
.define(METRICS_RECORDING_LEVEL_CONFIG,
|
||||||
|
@ -1110,11 +1106,6 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
atLeast(0),
|
atLeast(0),
|
||||||
Importance.LOW,
|
Importance.LOW,
|
||||||
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
|
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
|
||||||
.define(AUTO_INCLUDE_JMX_REPORTER_CONFIG,
|
|
||||||
Type.BOOLEAN,
|
|
||||||
true,
|
|
||||||
Importance.LOW,
|
|
||||||
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC)
|
|
||||||
.define(POLL_MS_CONFIG,
|
.define(POLL_MS_CONFIG,
|
||||||
Type.LONG,
|
Type.LONG,
|
||||||
100L,
|
100L,
|
||||||
|
|
Loading…
Reference in New Issue