Improve kafka client sensor registration performance by lazily calculating JMX attributes

When any metric (e.g. per-partition metric) is created or deleted,
registerMBean() is called which in turn calls getMBeanInfo().getClassName().
However, KafkaMbean.getMBeanInfo() instantiates an array of all sensors even
though we only need the class name. This costs a lot of CPU to register
sensors when consumer with large partition assignment starts. For example, it
takes 5 minutes to start a consumer with 35k partitions. This patch reduces the
consumer startup time seconds.

Author: radai-rosenblatt <radai.rosenblatt@gmail.com>

Reviewers: Satish Duggana <satish.duggana@gmail.com>, Dong Lin <lindong28@gmail.com>

Closes #5011 from radai-rosenblatt/fun-with-jmx
This commit is contained in:
Radai Rosenblatt 2018-05-25 15:38:59 -07:00 committed by Dong Lin
parent 70f0d0bd3f
commit c9ec292135
2 changed files with 76 additions and 14 deletions

View File

@ -214,20 +214,25 @@ public class JmxReporter implements MetricsReporter {
@Override
public MBeanInfo getMBeanInfo() {
MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
int i = 0;
for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
String attribute = entry.getKey();
KafkaMetric metric = entry.getValue();
attrs[i] = new MBeanAttributeInfo(attribute,
double.class.getName(),
metric.metricName().description(),
true,
false,
false);
i += 1;
}
return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
return new LazyMBeanInfo(this.getClass().getName(), "", null, null, null) {
@Override
protected MBeanAttributeInfo[] buildAttributes() {
MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
int i = 0;
for (Map.Entry<String, KafkaMetric> entry : metrics.entrySet()) {
String attribute = entry.getKey();
KafkaMetric metric = entry.getValue();
attrs[i] = new MBeanAttributeInfo(attribute,
double.class.getName(),
metric.metricName().description(),
true,
false,
false);
i += 1;
}
return attrs;
}
};
}
@Override

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.metrics;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanConstructorInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanOperationInfo;
/**
* an MBeanInfo subclass that lazily calculates attributes
*/
public abstract class LazyMBeanInfo extends MBeanInfo {
private volatile MBeanAttributeInfo[] lazyAttrs = null;
public LazyMBeanInfo(
String className,
String description,
MBeanConstructorInfo[] constructors,
MBeanOperationInfo[] operations,
MBeanNotificationInfo[] notifications
) throws IllegalArgumentException {
super(className, description, null, constructors, operations, notifications);
}
@Override
public MBeanAttributeInfo[] getAttributes() {
MBeanAttributeInfo[] val = lazyAttrs;
if (val != null) {
return val.clone(); //match upstream behaviour
}
val = buildAttributes();
if (val == null) {
val = new MBeanAttributeInfo[0];
}
lazyAttrs = val;
return val.clone();
}
protected abstract MBeanAttributeInfo[] buildAttributes();
}