mirror of https://github.com/apache/kafka.git
KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable (#17511)
Reviewers: Greg Harris <gharris1727@gmail.com>, Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
15c5c075c1
commit
71314739f9
|
@ -87,6 +87,11 @@
|
||||||
<allow class="org.apache.kafka.common.record.RecordBatch" exact-match="true" />
|
<allow class="org.apache.kafka.common.record.RecordBatch" exact-match="true" />
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
|
<subpackage name="internals">
|
||||||
|
<allow pkg="org.apache.kafka.common.metrics" />
|
||||||
|
<allow pkg="org.apache.kafka.common.metrics.internals" />
|
||||||
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="message">
|
<subpackage name="message">
|
||||||
<allow pkg="com.fasterxml.jackson" />
|
<allow pkg="com.fasterxml.jackson" />
|
||||||
<allow pkg="org.apache.kafka.common.protocol" />
|
<allow pkg="org.apache.kafka.common.protocol" />
|
||||||
|
|
|
@ -39,6 +39,8 @@ import java.util.Map;
|
||||||
* {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
|
* {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
|
||||||
* <p>
|
* <p>
|
||||||
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
||||||
|
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics. The following tags are automatically added to
|
||||||
|
* all metrics registered: <code>config</code> set to <code>interceptor.classes</code>, and <code>class</code> set to the ConsumerInterceptor class name.
|
||||||
*/
|
*/
|
||||||
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
|
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
|
||||||
|
|
||||||
|
|
|
@ -326,12 +326,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||||
|
|
||||||
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
|
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
|
||||||
this.interceptors = new ConsumerInterceptors<>(interceptorList);
|
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
|
||||||
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
|
||||||
this.subscriptions = createSubscriptionState(config, logContext);
|
this.subscriptions = createSubscriptionState(config, logContext);
|
||||||
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
|
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(metrics.reporters(),
|
||||||
interceptorList,
|
interceptorList,
|
||||||
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
|
Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer()));
|
||||||
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
|
this.metadata = metadataFactory.build(config, subscriptions, logContext, clusterResourceListeners);
|
||||||
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
||||||
metadata.bootstrap(addresses);
|
metadata.bootstrap(addresses);
|
||||||
|
@ -494,13 +494,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
this.autoCommitEnabled = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
|
||||||
this.fetchBuffer = new FetchBuffer(logContext);
|
this.fetchBuffer = new FetchBuffer(logContext);
|
||||||
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
|
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
|
||||||
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
|
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.metrics = new Metrics(time);
|
this.metrics = new Metrics(time);
|
||||||
|
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||||
this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
|
this.defaultApiTimeoutMs = Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
|
||||||
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
|
||||||
this.clientTelemetryReporter = Optional.empty();
|
this.clientTelemetryReporter = Optional.empty();
|
||||||
|
|
||||||
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
|
ConsumerMetrics metricsRegistry = new ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
|
||||||
|
|
|
@ -179,13 +179,13 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
||||||
|
|
||||||
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
|
List<ConsumerInterceptor<K, V>> interceptorList = configuredConsumerInterceptors(config);
|
||||||
this.interceptors = new ConsumerInterceptors<>(interceptorList);
|
this.interceptors = new ConsumerInterceptors<>(interceptorList, metrics);
|
||||||
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
|
||||||
this.subscriptions = createSubscriptionState(config, logContext);
|
this.subscriptions = createSubscriptionState(config, logContext);
|
||||||
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
||||||
metrics.reporters(),
|
metrics.reporters(),
|
||||||
interceptorList,
|
interceptorList,
|
||||||
Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer));
|
Arrays.asList(this.deserializers.keyDeserializer(), this.deserializers.valueDeserializer()));
|
||||||
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
|
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
|
||||||
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
||||||
this.metadata.bootstrap(addresses);
|
this.metadata.bootstrap(addresses);
|
||||||
|
@ -289,12 +289,12 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
this.metrics = new Metrics(time);
|
this.metrics = new Metrics(time);
|
||||||
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
|
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
|
||||||
this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
|
this.groupId = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
|
||||||
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
|
||||||
this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
|
this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
|
||||||
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
|
this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
|
||||||
this.assignors = assignors;
|
this.assignors = assignors;
|
||||||
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
|
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, CONSUMER_METRIC_GROUP_PREFIX);
|
||||||
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
|
this.interceptors = new ConsumerInterceptors<>(Collections.emptyList(), metrics);
|
||||||
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||||
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
this.retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
||||||
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
||||||
|
|
|
@ -319,13 +319,13 @@ public class CompletedFetch {
|
||||||
K key;
|
K key;
|
||||||
V value;
|
V value;
|
||||||
try {
|
try {
|
||||||
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
|
key = keyBytes == null ? null : deserializers.keyDeserializer().deserialize(partition.topic(), headers, keyBytes);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Key Deserializers with error: {}", deserializers);
|
log.error("Key Deserializers with error: {}", deserializers);
|
||||||
throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
|
throw newRecordDeserializationException(DeserializationExceptionOrigin.KEY, partition, timestampType, record, e, headers);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
|
value = valueBytes == null ? null : deserializers.valueDeserializer().deserialize(partition.topic(), headers, valueBytes);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Value Deserializers with error: {}", deserializers);
|
log.error("Value Deserializers with error: {}", deserializers);
|
||||||
throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
|
throw newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, partition, timestampType, record, e, headers);
|
||||||
|
|
|
@ -17,10 +17,13 @@
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
|
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.internals.Plugin;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -35,15 +38,15 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
public class ConsumerInterceptors<K, V> implements Closeable {
|
public class ConsumerInterceptors<K, V> implements Closeable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
|
private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
|
||||||
private final List<ConsumerInterceptor<K, V>> interceptors;
|
private final List<Plugin<ConsumerInterceptor<K, V>>> interceptorPlugins;
|
||||||
|
|
||||||
public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
|
public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors, Metrics metrics) {
|
||||||
this.interceptors = interceptors;
|
this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns true if no interceptors are defined. All other methods will be no-ops in this case. */
|
/** Returns true if no interceptors are defined. All other methods will be no-ops in this case. */
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
return interceptors.isEmpty();
|
return interceptorPlugins.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,9 +65,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
|
||||||
*/
|
*/
|
||||||
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
|
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
|
||||||
ConsumerRecords<K, V> interceptRecords = records;
|
ConsumerRecords<K, V> interceptRecords = records;
|
||||||
for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptRecords = interceptor.onConsume(interceptRecords);
|
interceptRecords = interceptorPlugin.get().onConsume(interceptRecords);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// do not propagate interceptor exception, log and continue calling other interceptors
|
// do not propagate interceptor exception, log and continue calling other interceptors
|
||||||
log.warn("Error executing interceptor onConsume callback", e);
|
log.warn("Error executing interceptor onConsume callback", e);
|
||||||
|
@ -83,9 +86,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
|
||||||
* @param offsets A map of offsets by partition with associated metadata
|
* @param offsets A map of offsets by partition with associated metadata
|
||||||
*/
|
*/
|
||||||
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
|
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
|
||||||
for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptor.onCommit(offsets);
|
interceptorPlugin.get().onCommit(offsets);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// do not propagate interceptor exception, just log
|
// do not propagate interceptor exception, just log
|
||||||
log.warn("Error executing interceptor onCommit callback", e);
|
log.warn("Error executing interceptor onCommit callback", e);
|
||||||
|
@ -98,9 +101,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptor.close();
|
interceptorPlugin.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to close consumer interceptor ", e);
|
log.error("Failed to close consumer interceptor ", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.common.KafkaException;
|
import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.errors.InterruptException;
|
import org.apache.kafka.common.errors.InterruptException;
|
||||||
|
import org.apache.kafka.common.internals.Plugin;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
|
@ -28,44 +30,54 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class Deserializers<K, V> implements AutoCloseable {
|
public class Deserializers<K, V> implements AutoCloseable {
|
||||||
|
|
||||||
public final Deserializer<K> keyDeserializer;
|
private final Plugin<Deserializer<K>> keyDeserializerPlugin;
|
||||||
public final Deserializer<V> valueDeserializer;
|
private final Plugin<Deserializer<V>> valueDeserializerPlugin;
|
||||||
|
|
||||||
public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
|
public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metrics metrics) {
|
||||||
this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Key deserializer provided to Deserializers should not be null");
|
this.keyDeserializerPlugin = Plugin.wrapInstance(
|
||||||
this.valueDeserializer = Objects.requireNonNull(valueDeserializer, "Value deserializer provided to Deserializers should not be null");
|
Objects.requireNonNull(keyDeserializer, "Key deserializer provided to Deserializers should not be null"),
|
||||||
}
|
metrics,
|
||||||
|
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
|
||||||
public Deserializers(ConsumerConfig config) {
|
this.valueDeserializerPlugin = Plugin.wrapInstance(
|
||||||
this(config, null, null);
|
Objects.requireNonNull(valueDeserializer, "Value deserializer provided to Deserializers should not be null"),
|
||||||
|
metrics,
|
||||||
|
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Deserializers(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
|
public Deserializers(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metrics metrics) {
|
||||||
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
|
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
|
||||||
|
|
||||||
if (keyDeserializer == null) {
|
if (keyDeserializer == null) {
|
||||||
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||||
this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
|
keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), true);
|
||||||
} else {
|
} else {
|
||||||
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
|
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
|
||||||
this.keyDeserializer = keyDeserializer;
|
|
||||||
}
|
}
|
||||||
|
this.keyDeserializerPlugin = Plugin.wrapInstance(keyDeserializer, metrics, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
|
||||||
|
|
||||||
if (valueDeserializer == null) {
|
if (valueDeserializer == null) {
|
||||||
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||||
this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
|
valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)), false);
|
||||||
} else {
|
} else {
|
||||||
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
|
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
|
||||||
this.valueDeserializer = valueDeserializer;
|
|
||||||
}
|
}
|
||||||
|
this.valueDeserializerPlugin = Plugin.wrapInstance(valueDeserializer, metrics, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Deserializer<K> keyDeserializer() {
|
||||||
|
return keyDeserializerPlugin.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Deserializer<V> valueDeserializer() {
|
||||||
|
return valueDeserializerPlugin.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
AtomicReference<Throwable> firstException = new AtomicReference<>();
|
AtomicReference<Throwable> firstException = new AtomicReference<>();
|
||||||
Utils.closeQuietly(keyDeserializer, "key deserializer", firstException);
|
Utils.closeQuietly(keyDeserializerPlugin, "key deserializer", firstException);
|
||||||
Utils.closeQuietly(valueDeserializer, "value deserializer", firstException);
|
Utils.closeQuietly(valueDeserializerPlugin, "value deserializer", firstException);
|
||||||
Throwable exception = firstException.get();
|
Throwable exception = firstException.get();
|
||||||
|
|
||||||
if (exception != null) {
|
if (exception != null) {
|
||||||
|
@ -79,8 +91,8 @@ public class Deserializers<K, V> implements AutoCloseable {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "Deserializers{" +
|
return "Deserializers{" +
|
||||||
"keyDeserializer=" + keyDeserializer +
|
"keyDeserializer=" + keyDeserializerPlugin.get() +
|
||||||
", valueDeserializer=" + valueDeserializer +
|
", valueDeserializer=" + valueDeserializerPlugin.get() +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,13 +296,13 @@ public class ShareCompletedFetch {
|
||||||
K key;
|
K key;
|
||||||
V value;
|
V value;
|
||||||
try {
|
try {
|
||||||
key = keyBytes == null ? null : deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
|
key = keyBytes == null ? null : deserializers.keyDeserializer().deserialize(partition.topic(), headers, keyBytes);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Key Deserializers with error: {}", deserializers);
|
log.error("Key Deserializers with error: {}", deserializers);
|
||||||
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
|
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
value = valueBytes == null ? null : deserializers.valueDeserializer.deserialize(partition.topic(), headers, valueBytes);
|
value = valueBytes == null ? null : deserializers.valueDeserializer().deserialize(partition.topic(), headers, valueBytes);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
log.error("Value Deserializers with error: {}", deserializers);
|
log.error("Value Deserializers with error: {}", deserializers);
|
||||||
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
|
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
|
||||||
|
|
|
@ -257,12 +257,12 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
||||||
this.metrics = createMetrics(config, time, reporters);
|
this.metrics = createMetrics(config, time, reporters);
|
||||||
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
|
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
|
||||||
|
|
||||||
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
|
||||||
this.currentFetch = ShareFetch.empty();
|
this.currentFetch = ShareFetch.empty();
|
||||||
this.subscriptions = createSubscriptionState(config, logContext);
|
this.subscriptions = createSubscriptionState(config, logContext);
|
||||||
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
||||||
metrics.reporters(),
|
metrics.reporters(),
|
||||||
Arrays.asList(deserializers.keyDeserializer, deserializers.valueDeserializer));
|
Arrays.asList(deserializers.keyDeserializer(), deserializers.valueDeserializer()));
|
||||||
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
|
this.metadata = new ConsumerMetadata(config, subscriptions, logContext, clusterResourceListeners);
|
||||||
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
|
||||||
metadata.bootstrap(addresses);
|
metadata.bootstrap(addresses);
|
||||||
|
@ -363,7 +363,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.metrics = new Metrics(time);
|
this.metrics = new Metrics(time);
|
||||||
this.clientTelemetryReporter = Optional.empty();
|
this.clientTelemetryReporter = Optional.empty();
|
||||||
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(config, keyDeserializer, valueDeserializer, metrics);
|
||||||
this.currentFetch = ShareFetch.empty();
|
this.currentFetch = ShareFetch.empty();
|
||||||
this.subscriptions = subscriptions;
|
this.subscriptions = subscriptions;
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
|
@ -462,7 +462,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
|
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
|
||||||
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
|
this.deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
|
||||||
this.currentFetch = ShareFetch.empty();
|
this.currentFetch = ShareFetch.empty();
|
||||||
this.applicationEventHandler = applicationEventHandler;
|
this.applicationEventHandler = applicationEventHandler;
|
||||||
this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
|
this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.kafka.common.header.Header;
|
||||||
import org.apache.kafka.common.header.Headers;
|
import org.apache.kafka.common.header.Headers;
|
||||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||||
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
import org.apache.kafka.common.internals.ClusterResourceListeners;
|
||||||
|
import org.apache.kafka.common.internals.Plugin;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetricsContext;
|
import org.apache.kafka.common.metrics.KafkaMetricsContext;
|
||||||
import org.apache.kafka.common.metrics.MetricConfig;
|
import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
|
@ -249,7 +250,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
final Metrics metrics;
|
final Metrics metrics;
|
||||||
private final KafkaProducerMetrics producerMetrics;
|
private final KafkaProducerMetrics producerMetrics;
|
||||||
private final Partitioner partitioner;
|
private final Plugin<Partitioner> partitionerPlugin;
|
||||||
private final int maxRequestSize;
|
private final int maxRequestSize;
|
||||||
private final long totalMemorySize;
|
private final long totalMemorySize;
|
||||||
private final ProducerMetadata metadata;
|
private final ProducerMetadata metadata;
|
||||||
|
@ -259,8 +260,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
private final Compression compression;
|
private final Compression compression;
|
||||||
private final Sensor errors;
|
private final Sensor errors;
|
||||||
private final Time time;
|
private final Time time;
|
||||||
private final Serializer<K> keySerializer;
|
private final Plugin<Serializer<K>> keySerializerPlugin;
|
||||||
private final Serializer<V> valueSerializer;
|
private final Plugin<Serializer<V>> valueSerializerPlugin;
|
||||||
private final ProducerConfig producerConfig;
|
private final ProducerConfig producerConfig;
|
||||||
private final long maxBlockTimeMs;
|
private final long maxBlockTimeMs;
|
||||||
private final boolean partitionerIgnoreKeys;
|
private final boolean partitionerIgnoreKeys;
|
||||||
|
@ -366,29 +367,32 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
|
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
|
||||||
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
|
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
|
||||||
this.producerMetrics = new KafkaProducerMetrics(metrics);
|
this.producerMetrics = new KafkaProducerMetrics(metrics);
|
||||||
this.partitioner = config.getConfiguredInstance(
|
this.partitionerPlugin = Plugin.wrapInstance(
|
||||||
|
config.getConfiguredInstance(
|
||||||
ProducerConfig.PARTITIONER_CLASS_CONFIG,
|
ProducerConfig.PARTITIONER_CLASS_CONFIG,
|
||||||
Partitioner.class,
|
Partitioner.class,
|
||||||
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
|
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
|
||||||
|
metrics,
|
||||||
|
ProducerConfig.PARTITIONER_CLASS_CONFIG);
|
||||||
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
|
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
|
||||||
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
|
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||||
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
||||||
if (keySerializer == null) {
|
if (keySerializer == null) {
|
||||||
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||||
Serializer.class);
|
keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
|
||||||
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
|
|
||||||
} else {
|
} else {
|
||||||
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
|
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
|
||||||
this.keySerializer = keySerializer;
|
|
||||||
}
|
}
|
||||||
|
this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
|
||||||
|
|
||||||
if (valueSerializer == null) {
|
if (valueSerializer == null) {
|
||||||
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||||
Serializer.class);
|
valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
|
||||||
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
|
|
||||||
} else {
|
} else {
|
||||||
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
|
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
|
||||||
this.valueSerializer = valueSerializer;
|
|
||||||
}
|
}
|
||||||
|
this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
|
||||||
|
|
||||||
|
|
||||||
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(config,
|
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(config,
|
||||||
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
|
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
|
||||||
|
@ -396,11 +400,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
if (interceptors != null)
|
if (interceptors != null)
|
||||||
this.interceptors = interceptors;
|
this.interceptors = interceptors;
|
||||||
else
|
else
|
||||||
this.interceptors = new ProducerInterceptors<>(interceptorList);
|
this.interceptors = new ProducerInterceptors<>(interceptorList, metrics);
|
||||||
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
|
||||||
interceptorList,
|
interceptorList,
|
||||||
reporters,
|
reporters,
|
||||||
Arrays.asList(this.keySerializer, this.valueSerializer));
|
Arrays.asList(this.keySerializerPlugin.get(), this.valueSerializerPlugin.get()));
|
||||||
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
|
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
|
||||||
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
|
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
|
||||||
this.compression = configureCompression(config);
|
this.compression = configureCompression(config);
|
||||||
|
@ -411,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
this.apiVersions = apiVersions;
|
this.apiVersions = apiVersions;
|
||||||
this.transactionManager = configureTransactionState(config, logContext);
|
this.transactionManager = configureTransactionState(config, logContext);
|
||||||
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
|
// There is no need to do work required for adaptive partitioning, if we use a custom partitioner.
|
||||||
boolean enableAdaptivePartitioning = partitioner == null &&
|
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
|
||||||
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
|
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
|
||||||
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
|
RecordAccumulator.PartitionerConfig partitionerConfig = new RecordAccumulator.PartitionerConfig(
|
||||||
enableAdaptivePartitioning,
|
enableAdaptivePartitioning,
|
||||||
|
@ -485,9 +489,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
this.log = logContext.logger(KafkaProducer.class);
|
this.log = logContext.logger(KafkaProducer.class);
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.producerMetrics = new KafkaProducerMetrics(metrics);
|
this.producerMetrics = new KafkaProducerMetrics(metrics);
|
||||||
this.partitioner = partitioner;
|
this.partitionerPlugin = Plugin.wrapInstance(partitioner, metrics, ProducerConfig.PARTITIONER_CLASS_CONFIG);
|
||||||
this.keySerializer = keySerializer;
|
this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
|
||||||
this.valueSerializer = valueSerializer;
|
this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
|
||||||
this.interceptors = interceptors;
|
this.interceptors = interceptors;
|
||||||
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
|
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
|
||||||
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
|
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
|
||||||
|
@ -972,7 +976,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
Cluster cluster = clusterAndWaitTime.cluster;
|
Cluster cluster = clusterAndWaitTime.cluster;
|
||||||
byte[] serializedKey;
|
byte[] serializedKey;
|
||||||
try {
|
try {
|
||||||
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
|
serializedKey = keySerializerPlugin.get().serialize(record.topic(), record.headers(), record.key());
|
||||||
} catch (ClassCastException cce) {
|
} catch (ClassCastException cce) {
|
||||||
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
|
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
|
||||||
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
|
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
|
||||||
|
@ -980,7 +984,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
}
|
}
|
||||||
byte[] serializedValue;
|
byte[] serializedValue;
|
||||||
try {
|
try {
|
||||||
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
|
serializedValue = valueSerializerPlugin.get().serialize(record.topic(), record.headers(), record.value());
|
||||||
} catch (ClassCastException cce) {
|
} catch (ClassCastException cce) {
|
||||||
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
|
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
|
||||||
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
|
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
|
||||||
|
@ -1414,9 +1418,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
Utils.closeQuietly(interceptors, "producer interceptors", firstException);
|
Utils.closeQuietly(interceptors, "producer interceptors", firstException);
|
||||||
Utils.closeQuietly(producerMetrics, "producer metrics wrapper", firstException);
|
Utils.closeQuietly(producerMetrics, "producer metrics wrapper", firstException);
|
||||||
Utils.closeQuietly(metrics, "producer metrics", firstException);
|
Utils.closeQuietly(metrics, "producer metrics", firstException);
|
||||||
Utils.closeQuietly(keySerializer, "producer keySerializer", firstException);
|
Utils.closeQuietly(keySerializerPlugin, "producer keySerializer", firstException);
|
||||||
Utils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
|
Utils.closeQuietly(valueSerializerPlugin, "producer valueSerializer", firstException);
|
||||||
Utils.closeQuietly(partitioner, "producer partitioner", firstException);
|
Utils.closeQuietly(partitionerPlugin, "producer partitioner", firstException);
|
||||||
clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
|
clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
|
||||||
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
|
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
|
||||||
Throwable exception = firstException.get();
|
Throwable exception = firstException.get();
|
||||||
|
@ -1443,8 +1447,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
if (record.partition() != null)
|
if (record.partition() != null)
|
||||||
return record.partition();
|
return record.partition();
|
||||||
|
|
||||||
if (partitioner != null) {
|
if (partitionerPlugin.get() != null) {
|
||||||
int customPartition = partitioner.partition(
|
int customPartition = partitionerPlugin.get().partition(
|
||||||
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
|
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
|
||||||
if (customPartition < 0) {
|
if (customPartition < 0) {
|
||||||
throw new IllegalArgumentException(String.format(
|
throw new IllegalArgumentException(String.format(
|
||||||
|
|
|
@ -23,6 +23,9 @@ import java.io.Closeable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Partitioner Interface
|
* Partitioner Interface
|
||||||
|
* <br/>
|
||||||
|
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the partitioner to register metrics. The following tags are automatically added to
|
||||||
|
* all metrics registered: <code>config</code> set to <code>partitioner.class</code>, and <code>class</code> set to the Partitioner class name.
|
||||||
*/
|
*/
|
||||||
public interface Partitioner extends Configurable, Closeable {
|
public interface Partitioner extends Configurable, Closeable {
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.kafka.common.Configurable;
|
||||||
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
|
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
|
||||||
* <p>
|
* <p>
|
||||||
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
||||||
|
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the interceptor to register metrics. The following tags are automatically added to
|
||||||
|
* all metrics registered: <code>config</code> set to <code>interceptor.classes</code>, and <code>class</code> set to the ProducerInterceptor class name.
|
||||||
*/
|
*/
|
||||||
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
|
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,10 +17,13 @@
|
||||||
package org.apache.kafka.clients.producer.internals;
|
package org.apache.kafka.clients.producer.internals;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.internals.Plugin;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.record.RecordBatch;
|
import org.apache.kafka.common.record.RecordBatch;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -35,10 +38,10 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public class ProducerInterceptors<K, V> implements Closeable {
|
public class ProducerInterceptors<K, V> implements Closeable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
|
private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
|
||||||
private final List<ProducerInterceptor<K, V>> interceptors;
|
private final List<Plugin<ProducerInterceptor<K, V>>> interceptorPlugins;
|
||||||
|
|
||||||
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
|
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors, Metrics metrics) {
|
||||||
this.interceptors = interceptors;
|
this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -57,9 +60,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
|
||||||
*/
|
*/
|
||||||
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
|
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
|
||||||
ProducerRecord<K, V> interceptRecord = record;
|
ProducerRecord<K, V> interceptRecord = record;
|
||||||
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptRecord = interceptor.onSend(interceptRecord);
|
interceptRecord = interceptorPlugin.get().onSend(interceptRecord);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// do not propagate interceptor exception, log and continue calling other interceptors
|
// do not propagate interceptor exception, log and continue calling other interceptors
|
||||||
// be careful not to throw exception from here
|
// be careful not to throw exception from here
|
||||||
|
@ -84,9 +87,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
|
||||||
* @param exception The exception thrown during processing of this record. Null if no error occurred.
|
* @param exception The exception thrown during processing of this record. Null if no error occurred.
|
||||||
*/
|
*/
|
||||||
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
|
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
|
||||||
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptor.onAcknowledgement(metadata, exception);
|
interceptorPlugin.get().onAcknowledgement(metadata, exception);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// do not propagate interceptor exceptions, just log
|
// do not propagate interceptor exceptions, just log
|
||||||
log.warn("Error executing interceptor onAcknowledgement callback", e);
|
log.warn("Error executing interceptor onAcknowledgement callback", e);
|
||||||
|
@ -105,15 +108,15 @@ public class ProducerInterceptors<K, V> implements Closeable {
|
||||||
* @param exception The exception thrown during processing of this record.
|
* @param exception The exception thrown during processing of this record.
|
||||||
*/
|
*/
|
||||||
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
|
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
|
||||||
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
if (record == null && interceptTopicPartition == null) {
|
if (record == null && interceptTopicPartition == null) {
|
||||||
interceptor.onAcknowledgement(null, exception);
|
interceptorPlugin.get().onAcknowledgement(null, exception);
|
||||||
} else {
|
} else {
|
||||||
if (interceptTopicPartition == null) {
|
if (interceptTopicPartition == null) {
|
||||||
interceptTopicPartition = extractTopicPartition(record);
|
interceptTopicPartition = extractTopicPartition(record);
|
||||||
}
|
}
|
||||||
interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
|
interceptorPlugin.get().onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
|
||||||
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
|
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -132,9 +135,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
|
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
|
||||||
try {
|
try {
|
||||||
interceptor.close();
|
interceptorPlugin.close();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to close producer interceptor ", e);
|
log.error("Failed to close producer interceptor ", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.KafkaException;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.Monitorable;
|
||||||
|
import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
|
||||||
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
public class Plugin<T> implements Supplier<T>, AutoCloseable {
|
||||||
|
|
||||||
|
private final T instance;
|
||||||
|
private final Optional<PluginMetricsImpl> pluginMetrics;
|
||||||
|
|
||||||
|
private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
|
||||||
|
this.instance = instance;
|
||||||
|
this.pluginMetrics = Optional.ofNullable(pluginMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key) {
|
||||||
|
return wrapInstance(instance, metrics, () -> tags(key, instance));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> Map<String, String> tags(String key, T instance) {
|
||||||
|
Map<String, String> tags = new LinkedHashMap<>();
|
||||||
|
tags.put("config", key);
|
||||||
|
tags.put("class", instance.getClass().getSimpleName());
|
||||||
|
return tags;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics metrics, String key) {
|
||||||
|
List<Plugin<T>> plugins = new ArrayList<>();
|
||||||
|
for (T instance : instances) {
|
||||||
|
plugins.add(wrapInstance(instance, metrics, key));
|
||||||
|
}
|
||||||
|
return plugins;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
|
||||||
|
PluginMetricsImpl pluginMetrics = null;
|
||||||
|
if (instance instanceof Monitorable && metrics != null) {
|
||||||
|
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
|
||||||
|
((Monitorable) instance).withPluginMetrics(pluginMetrics);
|
||||||
|
}
|
||||||
|
return new Plugin<>(instance, pluginMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T get() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws Exception {
|
||||||
|
AtomicReference<Throwable> firstException = new AtomicReference<>();
|
||||||
|
if (instance instanceof AutoCloseable) {
|
||||||
|
Utils.closeQuietly((AutoCloseable) instance, instance.getClass().getSimpleName(), firstException);
|
||||||
|
}
|
||||||
|
pluginMetrics.ifPresent(metrics -> Utils.closeQuietly(metrics, "pluginMetrics", firstException));
|
||||||
|
Throwable throwable = firstException.get();
|
||||||
|
if (throwable != null) throw new KafkaException("failed closing plugin", throwable);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.metrics;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Plugins can implement this interface to register their own metrics.
|
||||||
|
*/
|
||||||
|
public interface Monitorable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides a {@link PluginMetrics} instance from the component that instantiates the plugin.
|
||||||
|
* PluginMetrics can be used by the plugin to register and unregister metrics
|
||||||
|
* at any point in their lifecycle prior to their close method being called.
|
||||||
|
* Any metrics registered will be automatically removed when the plugin is closed.
|
||||||
|
*/
|
||||||
|
void withPluginMetrics(PluginMetrics metrics);
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.metrics;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.MetricName;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This allows plugins to register metrics and sensors.
|
||||||
|
* Any metrics registered by the plugin are automatically removed when the plugin closed.
|
||||||
|
*/
|
||||||
|
public interface PluginMetrics {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link MetricName} with the given name, description and tags. The group will be set to "plugins"
|
||||||
|
* Tags to uniquely identify the plugins are automatically added to the provided tags
|
||||||
|
*
|
||||||
|
* @param name The name of the metric
|
||||||
|
* @param description A human-readable description to include in the metric
|
||||||
|
* @param tags Additional tags for the metric
|
||||||
|
* @throws IllegalArgumentException if any of the tag names collide with the default tags for the plugin
|
||||||
|
*/
|
||||||
|
MetricName metricName(String name, String description, Map<String, String> tags);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a metric to monitor an object that implements {@link MetricValueProvider}. This metric won't be associated with any
|
||||||
|
* sensor. This is a way to expose existing values as metrics.
|
||||||
|
*
|
||||||
|
* @param metricName The name of the metric
|
||||||
|
* @param metricValueProvider The metric value provider associated with this metric
|
||||||
|
* @throws IllegalArgumentException if a metric with same name already exists
|
||||||
|
*/
|
||||||
|
void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a metric if it exists.
|
||||||
|
*
|
||||||
|
* @param metricName The name of the metric
|
||||||
|
* @throws IllegalArgumentException if a metric with this name does not exist
|
||||||
|
*/
|
||||||
|
void removeMetric(MetricName metricName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link Sensor} with the given unique name. The name must only be unique for the plugin, so different
|
||||||
|
* plugins can use the same names.
|
||||||
|
*
|
||||||
|
* @param name The sensor name
|
||||||
|
* @return The sensor
|
||||||
|
* @throws IllegalArgumentException if a sensor with same name already exists for this plugin
|
||||||
|
*/
|
||||||
|
Sensor addSensor(String name);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a {@link Sensor} and its associated metrics.
|
||||||
|
*
|
||||||
|
* @param name The name of the sensor to be removed
|
||||||
|
* @throws IllegalArgumentException if a sensor with this name does not exist
|
||||||
|
*/
|
||||||
|
void removeSensor(String name);
|
||||||
|
}
|
|
@ -0,0 +1,113 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.metrics.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.MetricName;
|
||||||
|
import org.apache.kafka.common.metrics.MetricValueProvider;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.PluginMetrics;
|
||||||
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class PluginMetricsImpl implements PluginMetrics, Closeable {
|
||||||
|
|
||||||
|
private static final String GROUP = "plugins";
|
||||||
|
|
||||||
|
private final Metrics metrics;
|
||||||
|
private final Map<String, String> tags;
|
||||||
|
private final Set<MetricName> metricNames = ConcurrentHashMap.newKeySet();
|
||||||
|
private final Set<String> sensors = ConcurrentHashMap.newKeySet();
|
||||||
|
private volatile boolean closing = false;
|
||||||
|
|
||||||
|
public PluginMetricsImpl(Metrics metrics, Map<String, String> tags) {
|
||||||
|
this.metrics = metrics;
|
||||||
|
this.tags = tags;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MetricName metricName(String name, String description, Map<String, String> tags) {
|
||||||
|
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
|
||||||
|
for (String tagName : tags.keySet()) {
|
||||||
|
if (this.tags.containsKey(tagName)) {
|
||||||
|
throw new IllegalArgumentException("Cannot use " + tagName + " as a tag name");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Map<String, String> metricsTags = new LinkedHashMap<>(this.tags);
|
||||||
|
metricsTags.putAll(tags);
|
||||||
|
return metrics.metricName(name, GROUP, description, metricsTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueProvider) {
|
||||||
|
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
|
||||||
|
if (metricNames.contains(metricName)) {
|
||||||
|
throw new IllegalArgumentException("Metric " + metricName + " already exists");
|
||||||
|
}
|
||||||
|
metrics.addMetric(metricName, metricValueProvider);
|
||||||
|
metricNames.add(metricName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeMetric(MetricName metricName) {
|
||||||
|
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
|
||||||
|
if (metricNames.contains(metricName)) {
|
||||||
|
metrics.removeMetric(metricName);
|
||||||
|
metricNames.remove(metricName);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unknown metric " + metricName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Sensor addSensor(String name) {
|
||||||
|
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
|
||||||
|
if (sensors.contains(name)) {
|
||||||
|
throw new IllegalArgumentException("Sensor " + name + " already exists");
|
||||||
|
}
|
||||||
|
Sensor sensor = metrics.sensor(name);
|
||||||
|
sensors.add(name);
|
||||||
|
return sensor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeSensor(String name) {
|
||||||
|
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
|
||||||
|
if (sensors.contains(name)) {
|
||||||
|
metrics.removeSensor(name);
|
||||||
|
sensors.remove(name);
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Unknown sensor " + name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
closing = true;
|
||||||
|
for (String sensor : sensors) {
|
||||||
|
metrics.removeSensor(sensor);
|
||||||
|
}
|
||||||
|
for (MetricName metricName : metricNames) {
|
||||||
|
metrics.removeMetric(metricName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,7 +29,8 @@ import java.util.Map;
|
||||||
* A class that implements this interface is expected to have a constructor with no parameters.
|
* A class that implements this interface is expected to have a constructor with no parameters.
|
||||||
* <p>
|
* <p>
|
||||||
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
||||||
*
|
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the deserializer to register metrics. The following tags are automatically added to
|
||||||
|
* all metrics registered: <code>config</code> set to either <code>key.deserializer</code> or <code>value.deserializer</code>, and <code>class</code> set to the Deserializer class name.
|
||||||
* @param <T> Type to be deserialized into.
|
* @param <T> Type to be deserialized into.
|
||||||
*/
|
*/
|
||||||
public interface Deserializer<T> extends Closeable {
|
public interface Deserializer<T> extends Closeable {
|
||||||
|
|
|
@ -27,7 +27,8 @@ import java.util.Map;
|
||||||
* A class that implements this interface is expected to have a constructor with no parameter.
|
* A class that implements this interface is expected to have a constructor with no parameter.
|
||||||
* <p>
|
* <p>
|
||||||
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
|
||||||
*
|
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the serializer to register metrics. The following tags ae automatically added to
|
||||||
|
* all metrics registered: <code>config</code> set to either <code>key.serializer</code> or <code>value.serializer</code>, and <code>class</code> set to the Serializer class name.
|
||||||
* @param <T> Type to be serialized from.
|
* @param <T> Type to be serialized from.
|
||||||
*/
|
*/
|
||||||
public interface Serializer<T> extends Closeable {
|
public interface Serializer<T> extends Closeable {
|
||||||
|
|
|
@ -66,6 +66,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Measurable;
|
import org.apache.kafka.common.metrics.Measurable;
|
||||||
import org.apache.kafka.common.metrics.MetricConfig;
|
import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.Monitorable;
|
||||||
|
import org.apache.kafka.common.metrics.PluginMetrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.stats.Avg;
|
import org.apache.kafka.common.metrics.stats.Avg;
|
||||||
import org.apache.kafka.common.network.Selectable;
|
import org.apache.kafka.common.network.Selectable;
|
||||||
|
@ -101,6 +103,7 @@ import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.test.MockConsumerInterceptor;
|
import org.apache.kafka.test.MockConsumerInterceptor;
|
||||||
|
import org.apache.kafka.test.MockDeserializer;
|
||||||
import org.apache.kafka.test.MockMetricsReporter;
|
import org.apache.kafka.test.MockMetricsReporter;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
@ -3685,4 +3688,82 @@ public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
|
||||||
CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
|
CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(value = GroupProtocol.class)
|
||||||
|
void testMonitorablePlugins(GroupProtocol groupProtocol) {
|
||||||
|
try {
|
||||||
|
String clientId = "testMonitorablePlugins";
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
|
||||||
|
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
|
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, MonitorableDeserializer.class.getName());
|
||||||
|
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MonitorableDeserializer.class.getName());
|
||||||
|
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MonitorableInterceptor.class.getName());
|
||||||
|
|
||||||
|
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
|
||||||
|
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
|
||||||
|
|
||||||
|
MetricName expectedKeyDeserializerMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
||||||
|
MonitorableDeserializer.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedKeyDeserializerMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedKeyDeserializerMetric).metricValue());
|
||||||
|
|
||||||
|
MetricName expectedValueDeserializerMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||||
|
MonitorableDeserializer.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedValueDeserializerMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedValueDeserializerMetric).metricValue());
|
||||||
|
|
||||||
|
MetricName expectedInterceptorMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
|
||||||
|
MonitorableInterceptor.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedInterceptorMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedInterceptorMetric).metricValue());
|
||||||
|
|
||||||
|
consumer.close(Duration.ZERO);
|
||||||
|
metrics = consumer.metrics();
|
||||||
|
assertFalse(metrics.containsKey(expectedKeyDeserializerMetric));
|
||||||
|
assertFalse(metrics.containsKey(expectedValueDeserializerMetric));
|
||||||
|
assertFalse(metrics.containsKey(expectedInterceptorMetric));
|
||||||
|
} finally {
|
||||||
|
MockConsumerInterceptor.resetCounters();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetricName expectedMetricName(String clientId, String config, Class<?> clazz) {
|
||||||
|
Map<String, String> expectedTags = new LinkedHashMap<>();
|
||||||
|
expectedTags.put("client-id", clientId);
|
||||||
|
expectedTags.put("config", config);
|
||||||
|
expectedTags.put("class", clazz.getSimpleName());
|
||||||
|
expectedTags.putAll(TAGS);
|
||||||
|
return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String NAME = "name";
|
||||||
|
private static final String DESCRIPTION = "description";
|
||||||
|
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
|
||||||
|
private static final double VALUE = 123.0;
|
||||||
|
|
||||||
|
public static class MonitorableDeserializer extends MockDeserializer implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
|
||||||
|
metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MonitorableInterceptor extends MockConsumerInterceptor implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
|
||||||
|
metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
|
|
||||||
private AsyncKafkaConsumer<String, String> consumer = null;
|
private AsyncKafkaConsumer<String, String> consumer = null;
|
||||||
private Time time = new MockTime(0);
|
private Time time = new MockTime(0);
|
||||||
|
private final Metrics metrics = new Metrics();
|
||||||
private final FetchCollector<String, String> fetchCollector = mock(FetchCollector.class);
|
private final FetchCollector<String, String> fetchCollector = mock(FetchCollector.class);
|
||||||
private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class);
|
private final ApplicationEventHandler applicationEventHandler = mock(ApplicationEventHandler.class);
|
||||||
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
|
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
|
||||||
|
@ -248,7 +249,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
return new AsyncKafkaConsumer<>(
|
return new AsyncKafkaConsumer<>(
|
||||||
new LogContext(),
|
new LogContext(),
|
||||||
clientId,
|
clientId,
|
||||||
new Deserializers<>(new StringDeserializer(), new StringDeserializer()),
|
new Deserializers<>(new StringDeserializer(), new StringDeserializer(), metrics),
|
||||||
fetchBuffer,
|
fetchBuffer,
|
||||||
fetchCollector,
|
fetchCollector,
|
||||||
interceptors,
|
interceptors,
|
||||||
|
@ -257,7 +258,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
backgroundEventQueue,
|
backgroundEventQueue,
|
||||||
backgroundEventReaper,
|
backgroundEventReaper,
|
||||||
rebalanceListenerInvoker,
|
rebalanceListenerInvoker,
|
||||||
new Metrics(),
|
metrics,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
metadata,
|
metadata,
|
||||||
retryBackoffMs,
|
retryBackoffMs,
|
||||||
|
@ -671,7 +672,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
|
|
||||||
consumer = spy(newConsumer(
|
consumer = spy(newConsumer(
|
||||||
mock(FetchBuffer.class),
|
mock(FetchBuffer.class),
|
||||||
new ConsumerInterceptors<>(Collections.emptyList()),
|
new ConsumerInterceptors<>(Collections.emptyList(), metrics),
|
||||||
invoker,
|
invoker,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
"group-id",
|
"group-id",
|
||||||
|
@ -1543,7 +1544,7 @@ public class AsyncKafkaConsumerTest {
|
||||||
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
||||||
consumer = newConsumer(
|
consumer = newConsumer(
|
||||||
mock(FetchBuffer.class),
|
mock(FetchBuffer.class),
|
||||||
new ConsumerInterceptors<>(Collections.emptyList()),
|
new ConsumerInterceptors<>(Collections.emptyList(), metrics),
|
||||||
mock(ConsumerRebalanceListenerInvoker.class),
|
mock(ConsumerRebalanceListenerInvoker.class),
|
||||||
subscriptions,
|
subscriptions,
|
||||||
"group-id",
|
"group-id",
|
||||||
|
|
|
@ -232,11 +232,11 @@ public class CompletedFetchTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Deserializers<UUID, UUID> newUuidDeserializers() {
|
private static Deserializers<UUID, UUID> newUuidDeserializers() {
|
||||||
return new Deserializers<>(new UUIDDeserializer(), new UUIDDeserializer());
|
return new Deserializers<>(new UUIDDeserializer(), new UUIDDeserializer(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Deserializers<String, String> newStringDeserializers() {
|
private static Deserializers<String, String> newStringDeserializers() {
|
||||||
return new Deserializers<>(new StringDeserializer(), new StringDeserializer());
|
return new Deserializers<>(new StringDeserializer(), new StringDeserializer(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static FetchConfig newFetchConfig(IsolationLevel isolationLevel, boolean checkCrcs) {
|
private static FetchConfig newFetchConfig(IsolationLevel isolationLevel, boolean checkCrcs) {
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class ConsumerInterceptorsTest {
|
||||||
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
|
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
|
||||||
interceptorList.add(interceptor1);
|
interceptorList.add(interceptor1);
|
||||||
interceptorList.add(interceptor2);
|
interceptorList.add(interceptor2);
|
||||||
ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList);
|
ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList, null);
|
||||||
|
|
||||||
// verify that onConsumer modifies ConsumerRecords
|
// verify that onConsumer modifies ConsumerRecords
|
||||||
Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = new HashMap<>();
|
Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = new HashMap<>();
|
||||||
|
@ -177,7 +177,7 @@ public class ConsumerInterceptorsTest {
|
||||||
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
|
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
|
||||||
interceptorList.add(interceptor1);
|
interceptorList.add(interceptor1);
|
||||||
interceptorList.add(interceptor2);
|
interceptorList.add(interceptor2);
|
||||||
ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList);
|
ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList, null);
|
||||||
|
|
||||||
// verify that onCommit is called for all interceptors in the chain
|
// verify that onCommit is called for all interceptors in the chain
|
||||||
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
|
||||||
|
|
|
@ -712,7 +712,7 @@ public class FetchCollectorTest {
|
||||||
mock(ConsumerMetadata.class),
|
mock(ConsumerMetadata.class),
|
||||||
subscriptions,
|
subscriptions,
|
||||||
new FetchConfig(new ConsumerConfig(consumerProps)),
|
new FetchConfig(new ConsumerConfig(consumerProps)),
|
||||||
new Deserializers<>(new StringDeserializer(), new StringDeserializer()),
|
new Deserializers<>(new StringDeserializer(), new StringDeserializer(), null),
|
||||||
mock(FetchMetricsManager.class),
|
mock(FetchMetricsManager.class),
|
||||||
new MockTime()
|
new MockTime()
|
||||||
);
|
);
|
||||||
|
@ -741,12 +741,11 @@ public class FetchCollectorTest {
|
||||||
Properties p = consumerProperties(maxPollRecords);
|
Properties p = consumerProperties(maxPollRecords);
|
||||||
ConsumerConfig config = new ConsumerConfig(p);
|
ConsumerConfig config = new ConsumerConfig(p);
|
||||||
|
|
||||||
deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
|
|
||||||
|
|
||||||
subscriptions = createSubscriptionState(config, logContext);
|
subscriptions = createSubscriptionState(config, logContext);
|
||||||
fetchConfig = createFetchConfig(config, isolationLevel);
|
fetchConfig = createFetchConfig(config, isolationLevel);
|
||||||
Metrics metrics = createMetrics(config, time);
|
Metrics metrics = createMetrics(config, time);
|
||||||
metricsManager = createFetchMetricsManager(metrics);
|
metricsManager = createFetchMetricsManager(metrics);
|
||||||
|
deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer(), metrics);
|
||||||
metadata = new ConsumerMetadata(
|
metadata = new ConsumerMetadata(
|
||||||
0,
|
0,
|
||||||
1000,
|
1000,
|
||||||
|
|
|
@ -3936,7 +3936,7 @@ public class FetchRequestManagerTest {
|
||||||
SubscriptionState subscriptionState,
|
SubscriptionState subscriptionState,
|
||||||
LogContext logContext) {
|
LogContext logContext) {
|
||||||
buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext);
|
buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext);
|
||||||
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
|
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
|
||||||
FetchConfig fetchConfig = new FetchConfig(
|
FetchConfig fetchConfig = new FetchConfig(
|
||||||
minBytes,
|
minBytes,
|
||||||
maxBytes,
|
maxBytes,
|
||||||
|
|
|
@ -2824,7 +2824,7 @@ public class FetcherTest {
|
||||||
isolationLevel,
|
isolationLevel,
|
||||||
apiVersions);
|
apiVersions);
|
||||||
|
|
||||||
Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer());
|
Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer(), metrics);
|
||||||
FetchConfig fetchConfig = new FetchConfig(
|
FetchConfig fetchConfig = new FetchConfig(
|
||||||
minBytes,
|
minBytes,
|
||||||
maxBytes,
|
maxBytes,
|
||||||
|
@ -3858,7 +3858,7 @@ public class FetcherTest {
|
||||||
metadata,
|
metadata,
|
||||||
subscriptionState,
|
subscriptionState,
|
||||||
fetchConfig,
|
fetchConfig,
|
||||||
new Deserializers<>(keyDeserializer, valueDeserializer),
|
new Deserializers<>(keyDeserializer, valueDeserializer, metrics),
|
||||||
metricsManager,
|
metricsManager,
|
||||||
time,
|
time,
|
||||||
apiVersions));
|
apiVersions));
|
||||||
|
|
|
@ -1249,7 +1249,7 @@ public class OffsetFetcherTest {
|
||||||
metadata,
|
metadata,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
fetchConfig,
|
fetchConfig,
|
||||||
new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer()),
|
new Deserializers<>(new ByteArrayDeserializer(), new ByteArrayDeserializer(), metrics),
|
||||||
new FetchMetricsManager(metrics, metricsRegistry),
|
new FetchMetricsManager(metrics, metricsRegistry),
|
||||||
time,
|
time,
|
||||||
apiVersions);
|
apiVersions);
|
||||||
|
|
|
@ -374,11 +374,11 @@ public class ShareCompletedFetchTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Deserializers<UUID, UUID> newUuidDeserializers() {
|
private static Deserializers<UUID, UUID> newUuidDeserializers() {
|
||||||
return new Deserializers<>(new UUIDDeserializer(), new UUIDDeserializer());
|
return new Deserializers<>(new UUIDDeserializer(), new UUIDDeserializer(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Deserializers<String, String> newStringDeserializers() {
|
private static Deserializers<String, String> newStringDeserializers() {
|
||||||
return new Deserializers<>(new StringDeserializer(), new StringDeserializer());
|
return new Deserializers<>(new StringDeserializer(), new StringDeserializer(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Records newRecords(long baseOffset, int count) {
|
private Records newRecords(long baseOffset, int count) {
|
||||||
|
|
|
@ -1855,7 +1855,7 @@ public class ShareConsumeRequestManagerTest {
|
||||||
SubscriptionState subscriptionState,
|
SubscriptionState subscriptionState,
|
||||||
LogContext logContext) {
|
LogContext logContext) {
|
||||||
buildDependencies(metricConfig, subscriptionState, logContext);
|
buildDependencies(metricConfig, subscriptionState, logContext);
|
||||||
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer);
|
Deserializers<K, V> deserializers = new Deserializers<>(keyDeserializer, valueDeserializer, metrics);
|
||||||
int maxWaitMs = 0;
|
int maxWaitMs = 0;
|
||||||
int maxBytes = Integer.MAX_VALUE;
|
int maxBytes = Integer.MAX_VALUE;
|
||||||
int fetchSize = 1000;
|
int fetchSize = 1000;
|
||||||
|
|
|
@ -236,8 +236,8 @@ public class ShareFetchCollectorTest {
|
||||||
|
|
||||||
ConsumerConfig config = new ConsumerConfig(p);
|
ConsumerConfig config = new ConsumerConfig(p);
|
||||||
|
|
||||||
deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer());
|
|
||||||
Metrics metrics = createMetrics(config, Time.SYSTEM);
|
Metrics metrics = createMetrics(config, Time.SYSTEM);
|
||||||
|
deserializers = new Deserializers<>(new StringDeserializer(), new StringDeserializer(), metrics);
|
||||||
ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics);
|
ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics);
|
||||||
Set<TopicPartition> partitionSet = new HashSet<>();
|
Set<TopicPartition> partitionSet = new HashSet<>();
|
||||||
partitionSet.add(topicAPartition0.topicPartition());
|
partitionSet.add(topicAPartition0.topicPartition());
|
||||||
|
|
|
@ -60,6 +60,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Measurable;
|
import org.apache.kafka.common.metrics.Measurable;
|
||||||
import org.apache.kafka.common.metrics.MetricConfig;
|
import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.Monitorable;
|
||||||
|
import org.apache.kafka.common.metrics.PluginMetrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.metrics.stats.Avg;
|
import org.apache.kafka.common.metrics.stats.Avg;
|
||||||
import org.apache.kafka.common.network.Selectable;
|
import org.apache.kafka.common.network.Selectable;
|
||||||
|
@ -113,6 +115,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -1456,7 +1459,7 @@ public class KafkaProducerTest {
|
||||||
ApiVersions apiVersions = new ApiVersions();
|
ApiVersions apiVersions = new ApiVersions();
|
||||||
apiVersions.update(NODE.idString(), nodeApiVersions);
|
apiVersions.update(NODE.idString(), nodeApiVersions);
|
||||||
|
|
||||||
ProducerInterceptors<String, String> interceptor = new ProducerInterceptors<>(Collections.emptyList());
|
ProducerInterceptors<String, String> interceptor = new ProducerInterceptors<>(Collections.emptyList(), null);
|
||||||
|
|
||||||
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
|
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
|
||||||
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
|
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
|
||||||
|
@ -1730,7 +1733,7 @@ public class KafkaProducerTest {
|
||||||
|
|
||||||
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
|
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
|
||||||
new ProducerConfig(properties), new StringSerializer(), new StringSerializer(), metadata, client,
|
new ProducerConfig(properties), new StringSerializer(), new StringSerializer(), metadata, client,
|
||||||
new ProducerInterceptors<>(Collections.emptyList()), apiVersions, time)) {
|
new ProducerInterceptors<>(Collections.emptyList(), null), apiVersions, time)) {
|
||||||
producer.initTransactions();
|
producer.initTransactions();
|
||||||
producer.beginTransaction();
|
producer.beginTransaction();
|
||||||
producer.sendOffsetsToTransaction(Collections.singletonMap(
|
producer.sendOffsetsToTransaction(Collections.singletonMap(
|
||||||
|
@ -1782,7 +1785,7 @@ public class KafkaProducerTest {
|
||||||
ApiVersions apiVersions = new ApiVersions();
|
ApiVersions apiVersions = new ApiVersions();
|
||||||
apiVersions.update(NODE.idString(), nodeApiVersions);
|
apiVersions.update(NODE.idString(), nodeApiVersions);
|
||||||
|
|
||||||
ProducerInterceptors<String, String> interceptor = new ProducerInterceptors<>(Collections.emptyList());
|
ProducerInterceptors<String, String> interceptor = new ProducerInterceptors<>(Collections.emptyList(), null);
|
||||||
|
|
||||||
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
|
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some-txn", NODE));
|
||||||
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
|
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
|
||||||
|
@ -2310,7 +2313,7 @@ public class KafkaProducerTest {
|
||||||
String invalidTopicName = "topic abc"; // Invalid topic name due to space
|
String invalidTopicName = "topic abc"; // Invalid topic name due to space
|
||||||
|
|
||||||
ProducerInterceptors<String, String> producerInterceptors =
|
ProducerInterceptors<String, String> producerInterceptors =
|
||||||
new ProducerInterceptors<>(Collections.singletonList(new MockProducerInterceptor()));
|
new ProducerInterceptors<>(Collections.singletonList(new MockProducerInterceptor()), null);
|
||||||
|
|
||||||
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
|
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(), new StringSerializer(),
|
||||||
producerMetadata, client, producerInterceptors, time)) {
|
producerMetadata, client, producerInterceptors, time)) {
|
||||||
|
@ -2605,7 +2608,7 @@ public class KafkaProducerTest {
|
||||||
ProducerConfig producerConfig = new ProducerConfig(
|
ProducerConfig producerConfig = new ProducerConfig(
|
||||||
ProducerConfig.appendSerializerToConfig(configs, serializer, serializer));
|
ProducerConfig.appendSerializerToConfig(configs, serializer, serializer));
|
||||||
|
|
||||||
ProducerInterceptors<T, T> interceptors = new ProducerInterceptors<>(this.interceptors);
|
ProducerInterceptors<T, T> interceptors = new ProducerInterceptors<>(this.interceptors, metrics);
|
||||||
|
|
||||||
return new KafkaProducer<>(
|
return new KafkaProducer<>(
|
||||||
producerConfig,
|
producerConfig,
|
||||||
|
@ -2761,4 +2764,100 @@ public class KafkaProducerTest {
|
||||||
KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
|
KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
|
||||||
return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, streamClientMetricTwo);
|
return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, streamClientMetricTwo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMonitorablePlugins() {
|
||||||
|
try {
|
||||||
|
String clientId = "testMonitorablePlugins";
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
|
||||||
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||||
|
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MonitorableSerializer.class.getName());
|
||||||
|
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MonitorableSerializer.class.getName());
|
||||||
|
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MonitorablePartitioner.class.getName());
|
||||||
|
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MonitorableInterceptor.class.getName());
|
||||||
|
configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "");
|
||||||
|
|
||||||
|
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
|
||||||
|
Map<MetricName, ? extends Metric> metrics = producer.metrics();
|
||||||
|
|
||||||
|
MetricName expectedKeySerializerMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||||
|
MonitorableSerializer.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedKeySerializerMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedKeySerializerMetric).metricValue());
|
||||||
|
|
||||||
|
MetricName expectedValueSerializerMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||||
|
MonitorableSerializer.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedValueSerializerMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedValueSerializerMetric).metricValue());
|
||||||
|
|
||||||
|
MetricName expectedPartitionerMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ProducerConfig.PARTITIONER_CLASS_CONFIG,
|
||||||
|
MonitorablePartitioner.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedPartitionerMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedPartitionerMetric).metricValue());
|
||||||
|
|
||||||
|
MetricName expectedInterceptorMetric = expectedMetricName(
|
||||||
|
clientId,
|
||||||
|
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
|
||||||
|
MonitorableInterceptor.class);
|
||||||
|
assertTrue(metrics.containsKey(expectedInterceptorMetric));
|
||||||
|
assertEquals(VALUE, metrics.get(expectedInterceptorMetric).metricValue());
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
metrics = producer.metrics();
|
||||||
|
assertFalse(metrics.containsKey(expectedKeySerializerMetric));
|
||||||
|
assertFalse(metrics.containsKey(expectedValueSerializerMetric));
|
||||||
|
assertFalse(metrics.containsKey(expectedPartitionerMetric));
|
||||||
|
assertFalse(metrics.containsKey(expectedInterceptorMetric));
|
||||||
|
} finally {
|
||||||
|
MockProducerInterceptor.resetCounters();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private MetricName expectedMetricName(String clientId, String config, Class<?> clazz) {
|
||||||
|
Map<String, String> expectedTags = new LinkedHashMap<>();
|
||||||
|
expectedTags.put("client-id", clientId);
|
||||||
|
expectedTags.put("config", config);
|
||||||
|
expectedTags.put("class", clazz.getSimpleName());
|
||||||
|
expectedTags.putAll(TAGS);
|
||||||
|
return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String NAME = "name";
|
||||||
|
private static final String DESCRIPTION = "description";
|
||||||
|
private static final Map<String, String> TAGS = Collections.singletonMap("k", "v");
|
||||||
|
private static final double VALUE = 123.0;
|
||||||
|
|
||||||
|
public static class MonitorableSerializer extends MockSerializer implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
|
||||||
|
metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MonitorablePartitioner extends MockPartitioner implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
|
||||||
|
metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MonitorableInterceptor extends MockProducerInterceptor implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
|
||||||
|
metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class ProducerInterceptorsTest {
|
||||||
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
|
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
|
||||||
interceptorList.add(interceptor1);
|
interceptorList.add(interceptor1);
|
||||||
interceptorList.add(interceptor2);
|
interceptorList.add(interceptor2);
|
||||||
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
|
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
|
||||||
|
|
||||||
// verify that onSend() mutates the record as expected
|
// verify that onSend() mutates the record as expected
|
||||||
ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord);
|
ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord);
|
||||||
|
@ -142,7 +142,7 @@ public class ProducerInterceptorsTest {
|
||||||
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
|
AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
|
||||||
interceptorList.add(interceptor1);
|
interceptorList.add(interceptor1);
|
||||||
interceptorList.add(interceptor2);
|
interceptorList.add(interceptor2);
|
||||||
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
|
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
|
||||||
|
|
||||||
// verify onAck is called on all interceptors
|
// verify onAck is called on all interceptors
|
||||||
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
|
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
|
||||||
|
@ -166,7 +166,7 @@ public class ProducerInterceptorsTest {
|
||||||
List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
|
List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
|
||||||
AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
|
AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
|
||||||
interceptorList.add(interceptor1);
|
interceptorList.add(interceptor1);
|
||||||
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);
|
ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList, null);
|
||||||
|
|
||||||
// verify that metadata contains both topic and partition
|
// verify that metadata contains both topic and partition
|
||||||
interceptors.onSendError(producerRecord,
|
interceptors.onSendError(producerRecord,
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.KafkaException;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.Monitorable;
|
||||||
|
import org.apache.kafka.common.metrics.PluginMetrics;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
public class PluginTest {
|
||||||
|
|
||||||
|
private static final String CONFIG = "some.config";
|
||||||
|
private static final Metrics METRICS = new Metrics();
|
||||||
|
|
||||||
|
static class SomePlugin implements Closeable {
|
||||||
|
|
||||||
|
PluginMetrics pluginMetrics;
|
||||||
|
boolean closed;
|
||||||
|
boolean throwOnClose = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (throwOnClose) throw new RuntimeException("throw on close");
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class SomeMonitorablePlugin extends SomePlugin implements Monitorable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void withPluginMetrics(PluginMetrics metrics) {
|
||||||
|
pluginMetrics = metrics;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testWrapInstance() throws Exception {
|
||||||
|
SomeMonitorablePlugin someMonitorablePlugin = new SomeMonitorablePlugin();
|
||||||
|
Plugin<SomeMonitorablePlugin> pluginMonitorable = Plugin.wrapInstance(someMonitorablePlugin, METRICS, CONFIG);
|
||||||
|
checkPlugin(pluginMonitorable, someMonitorablePlugin, true);
|
||||||
|
|
||||||
|
someMonitorablePlugin = new SomeMonitorablePlugin();
|
||||||
|
assertFalse(someMonitorablePlugin.closed);
|
||||||
|
pluginMonitorable = Plugin.wrapInstance(someMonitorablePlugin, null, CONFIG);
|
||||||
|
checkPlugin(pluginMonitorable, someMonitorablePlugin, false);
|
||||||
|
|
||||||
|
SomePlugin somePlugin = new SomePlugin();
|
||||||
|
assertFalse(somePlugin.closed);
|
||||||
|
Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, null, CONFIG);
|
||||||
|
assertSame(somePlugin, plugin.get());
|
||||||
|
assertNull(somePlugin.pluginMetrics);
|
||||||
|
plugin.close();
|
||||||
|
assertTrue(somePlugin.closed);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testWrapInstances() throws Exception {
|
||||||
|
List<SomeMonitorablePlugin> someMonitorablePlugins = Arrays.asList(new SomeMonitorablePlugin(), new SomeMonitorablePlugin());
|
||||||
|
List<Plugin<SomeMonitorablePlugin>> pluginsMonitorable = Plugin.wrapInstances(someMonitorablePlugins, METRICS, CONFIG);
|
||||||
|
assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
|
||||||
|
for (int i = 0; i < pluginsMonitorable.size(); i++) {
|
||||||
|
Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
|
||||||
|
SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
|
||||||
|
checkPlugin(plugin, somePlugin, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
someMonitorablePlugins = Arrays.asList(new SomeMonitorablePlugin(), new SomeMonitorablePlugin());
|
||||||
|
pluginsMonitorable = Plugin.wrapInstances(someMonitorablePlugins, null, CONFIG);
|
||||||
|
assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
|
||||||
|
for (int i = 0; i < pluginsMonitorable.size(); i++) {
|
||||||
|
Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
|
||||||
|
SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
|
||||||
|
checkPlugin(plugin, somePlugin, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SomePlugin> somePlugins = Arrays.asList(new SomePlugin(), new SomePlugin());
|
||||||
|
List<Plugin<SomePlugin>> plugins = Plugin.wrapInstances(somePlugins, METRICS, CONFIG);
|
||||||
|
assertEquals(somePlugins.size(), plugins.size());
|
||||||
|
for (int i = 0; i < plugins.size(); i++) {
|
||||||
|
Plugin<SomePlugin> plugin = plugins.get(i);
|
||||||
|
SomePlugin somePlugin = somePlugins.get(i);
|
||||||
|
assertSame(somePlugin, plugin.get());
|
||||||
|
assertNull(somePlugin.pluginMetrics);
|
||||||
|
plugin.close();
|
||||||
|
assertTrue(somePlugin.closed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseThrows() {
|
||||||
|
SomePlugin somePlugin = new SomePlugin();
|
||||||
|
somePlugin.throwOnClose = true;
|
||||||
|
Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, METRICS, CONFIG);
|
||||||
|
assertThrows(KafkaException.class, plugin::close);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUsePluginMetricsAfterClose() throws Exception {
|
||||||
|
Plugin<SomeMonitorablePlugin> plugin = Plugin.wrapInstance(new SomeMonitorablePlugin(), METRICS, CONFIG);
|
||||||
|
PluginMetrics pluginMetrics = plugin.get().pluginMetrics;
|
||||||
|
plugin.close();
|
||||||
|
assertThrows(IllegalStateException.class, () -> pluginMetrics.metricName("", "", Collections.emptyMap()));
|
||||||
|
assertThrows(IllegalStateException.class, () -> pluginMetrics.addMetric(null, null));
|
||||||
|
assertThrows(IllegalStateException.class, () -> pluginMetrics.removeMetric(null));
|
||||||
|
assertThrows(IllegalStateException.class, () -> pluginMetrics.addSensor(""));
|
||||||
|
assertThrows(IllegalStateException.class, () -> pluginMetrics.removeSensor(""));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkPlugin(Plugin<SomeMonitorablePlugin> plugin, SomeMonitorablePlugin instance, boolean metricsSet) throws Exception {
|
||||||
|
assertSame(instance, plugin.get());
|
||||||
|
if (metricsSet) {
|
||||||
|
assertNotNull(instance.pluginMetrics);
|
||||||
|
} else {
|
||||||
|
assertNull(instance.pluginMetrics);
|
||||||
|
}
|
||||||
|
plugin.close();
|
||||||
|
assertTrue(instance.closed);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,120 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.metrics.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.MetricName;
|
||||||
|
import org.apache.kafka.common.metrics.Measurable;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
|
import org.apache.kafka.common.metrics.stats.Max;
|
||||||
|
import org.apache.kafka.common.metrics.stats.Rate;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
|
public class PluginMetricsImplTest {
|
||||||
|
|
||||||
|
private final Map<String, String> extraTags = Collections.singletonMap("my-tag", "my-value");
|
||||||
|
private Map<String, String> tags;
|
||||||
|
private Metrics metrics;
|
||||||
|
private int initialMetrics;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
metrics = new Metrics();
|
||||||
|
initialMetrics = metrics.metrics().size();
|
||||||
|
tags = new LinkedHashMap<>();
|
||||||
|
tags.put("k1", "v1");
|
||||||
|
tags.put("k2", "v2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMetricName() {
|
||||||
|
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
|
||||||
|
MetricName metricName = pmi.metricName("name", "description", extraTags);
|
||||||
|
assertEquals("name", metricName.name());
|
||||||
|
assertEquals("plugins", metricName.group());
|
||||||
|
assertEquals("description", metricName.description());
|
||||||
|
Map<String, String> expectedTags = new LinkedHashMap<>(tags);
|
||||||
|
expectedTags.putAll(extraTags);
|
||||||
|
assertEquals(expectedTags, metricName.tags());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testDuplicateTagName() {
|
||||||
|
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
|
||||||
|
assertThrows(IllegalArgumentException.class,
|
||||||
|
() -> pmi.metricName("name", "description", Collections.singletonMap("k1", "value")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAddRemoveMetrics() {
|
||||||
|
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
|
||||||
|
MetricName metricName = pmi.metricName("name", "description", extraTags);
|
||||||
|
pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0);
|
||||||
|
assertEquals(initialMetrics + 1, metrics.metrics().size());
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0));
|
||||||
|
|
||||||
|
pmi.removeMetric(metricName);
|
||||||
|
assertEquals(initialMetrics, metrics.metrics().size());
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> pmi.removeMetric(metricName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testAddRemoveSensor() {
|
||||||
|
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
|
||||||
|
String sensorName = "my-sensor";
|
||||||
|
MetricName metricName = pmi.metricName("name", "description", extraTags);
|
||||||
|
Sensor sensor = pmi.addSensor(sensorName);
|
||||||
|
assertEquals(initialMetrics, metrics.metrics().size());
|
||||||
|
sensor.add(metricName, new Rate());
|
||||||
|
sensor.add(metricName, new Max());
|
||||||
|
assertEquals(initialMetrics + 1, metrics.metrics().size());
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> pmi.addSensor(sensorName));
|
||||||
|
|
||||||
|
pmi.removeSensor(sensorName);
|
||||||
|
assertEquals(initialMetrics, metrics.metrics().size());
|
||||||
|
|
||||||
|
assertThrows(IllegalArgumentException.class, () -> pmi.removeSensor(sensorName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testClose() throws IOException {
|
||||||
|
PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
|
||||||
|
String sensorName = "my-sensor";
|
||||||
|
MetricName metricName1 = pmi.metricName("name1", "description", extraTags);
|
||||||
|
Sensor sensor = pmi.addSensor(sensorName);
|
||||||
|
sensor.add(metricName1, new Rate());
|
||||||
|
MetricName metricName2 = pmi.metricName("name2", "description", extraTags);
|
||||||
|
pmi.addMetric(metricName2, (Measurable) (config, now) -> 1.0);
|
||||||
|
|
||||||
|
assertEquals(initialMetrics + 2, metrics.metrics().size());
|
||||||
|
pmi.close();
|
||||||
|
assertEquals(initialMetrics, metrics.metrics().size());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue