KAFKA-9033; Use consumer/producer identity in generated clientId (#7514)

By default, if the user does not configure a `client.id`, then we use a very generic identifier, such as `consumer-15`. It is more useful to include identifying information when available such as `group.id` for the consumer and `transactional.id` for the producer.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Jason Gustafson 2019-10-15 14:52:26 -07:00 committed by GitHub
parent e58401b2f0
commit b24e9f3ccb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 11 deletions

View File

@ -21,6 +21,7 @@ import static org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapt
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
@ -667,15 +668,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);
GroupRebalanceConfig.ProtocolType.CONSUMER);
this.groupId = groupRebalanceConfig.groupId;
this.clientId = buildClientId(config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), groupRebalanceConfig);
LogContext logContext;
// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
@ -854,6 +854,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.groupId = groupId;
}
private static String buildClientId(String configuredClientId, GroupRebalanceConfig rebalanceConfig) {
if (!configuredClientId.isEmpty())
return configuredClientId;
if (rebalanceConfig.groupId != null && !rebalanceConfig.groupId.isEmpty())
return "consumer-" + rebalanceConfig.groupId + "-" + rebalanceConfig.groupInstanceId.orElseGet(() ->
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() + "");
return "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
private static Metrics buildMetrics(ConsumerConfig config, Time time, String clientId) {
Map<String, String> metricsTags = Collections.singletonMap(CLIENT_ID_METRIC_TAG, clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))

View File

@ -329,13 +329,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
@ -434,6 +433,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
private static String buildClientId(String configuredClientId, String transactionalId) {
if (!configuredClientId.isEmpty())
return configuredClientId;
if (transactionalId != null)
return "producer-" + transactionalId;
return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}
// visible for testing
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);