mirror of https://github.com/apache/kafka.git
KAFKA-7660: fix parentSensors memory leak (#5953)
In StreamsMetricsImpl, the parentSensors map was keeping references to Sensors after the sensors themselves had been removed. Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
4fb5520901
commit
bfbc32d9bc
|
@ -234,9 +234,11 @@ public class ProcessorNode<K, V> {
|
||||||
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
||||||
addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
|
|
||||||
final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
|
final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
|
||||||
addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
|
|
||||||
return sensor;
|
return sensor;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,11 +115,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
public final void removeAllTaskLevelSensors(final String taskName) {
|
public final void removeAllTaskLevelSensors(final String taskName) {
|
||||||
final String key = taskSensorPrefix(taskName);
|
final String key = taskSensorPrefix(taskName);
|
||||||
synchronized (taskLevelSensors) {
|
synchronized (taskLevelSensors) {
|
||||||
if (taskLevelSensors.containsKey(key)) {
|
final Deque<String> sensors = taskLevelSensors.remove(key);
|
||||||
while (!taskLevelSensors.get(key).isEmpty()) {
|
while (sensors != null && !sensors.isEmpty()) {
|
||||||
metrics.removeSensor(taskLevelSensors.get(key).pop());
|
metrics.removeSensor(sensors.pop());
|
||||||
}
|
|
||||||
taskLevelSensors.remove(key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -152,10 +150,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
|
public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) {
|
||||||
final String key = nodeSensorPrefix(taskName, processorNodeName);
|
final String key = nodeSensorPrefix(taskName, processorNodeName);
|
||||||
synchronized (nodeLevelSensors) {
|
synchronized (nodeLevelSensors) {
|
||||||
if (nodeLevelSensors.containsKey(key)) {
|
final Deque<String> sensors = nodeLevelSensors.remove(key);
|
||||||
while (!nodeLevelSensors.get(key).isEmpty()) {
|
while (sensors != null && !sensors.isEmpty()) {
|
||||||
metrics.removeSensor(nodeLevelSensors.get(key).pop());
|
metrics.removeSensor(sensors.pop());
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,11 +185,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
|
public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) {
|
||||||
final String key = cacheSensorPrefix(taskName, cacheName);
|
final String key = cacheSensorPrefix(taskName, cacheName);
|
||||||
synchronized (cacheLevelSensors) {
|
synchronized (cacheLevelSensors) {
|
||||||
if (cacheLevelSensors.containsKey(key)) {
|
final Deque<String> strings = cacheLevelSensors.remove(key);
|
||||||
while (!cacheLevelSensors.get(key).isEmpty()) {
|
while (strings != null && !strings.isEmpty()) {
|
||||||
metrics.removeSensor(cacheLevelSensors.get(key).pop());
|
metrics.removeSensor(strings.pop());
|
||||||
}
|
|
||||||
cacheLevelSensors.remove(key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -225,11 +220,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
|
public final void removeAllStoreLevelSensors(final String taskName, final String storeName) {
|
||||||
final String key = storeSensorPrefix(taskName, storeName);
|
final String key = storeSensorPrefix(taskName, storeName);
|
||||||
synchronized (storeLevelSensors) {
|
synchronized (storeLevelSensors) {
|
||||||
if (storeLevelSensors.containsKey(key)) {
|
final Deque<String> sensors = storeLevelSensors.remove(key);
|
||||||
while (!storeLevelSensors.get(key).isEmpty()) {
|
while (sensors != null && !sensors.isEmpty()) {
|
||||||
metrics.removeSensor(storeLevelSensors.get(key).pop());
|
metrics.removeSensor(sensors.pop());
|
||||||
}
|
|
||||||
storeLevelSensors.remove(key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -413,12 +406,19 @@ public class StreamsMetricsImpl implements StreamsMetrics {
|
||||||
Objects.requireNonNull(sensor, "Sensor is null");
|
Objects.requireNonNull(sensor, "Sensor is null");
|
||||||
metrics.removeSensor(sensor.name());
|
metrics.removeSensor(sensor.name());
|
||||||
|
|
||||||
final Sensor parent = parentSensors.get(sensor);
|
final Sensor parent = parentSensors.remove(sensor);
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
metrics.removeSensor(parent.name());
|
metrics.removeSensor(parent.name());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Visible for testing
|
||||||
|
*/
|
||||||
|
Map<Sensor, Sensor> parentSensors() {
|
||||||
|
return Collections.unmodifiableMap(parentSensors);
|
||||||
|
}
|
||||||
|
|
||||||
private static String groupNameFromScope(final String scopeName) {
|
private static String groupNameFromScope(final String scopeName) {
|
||||||
return "stream-" + scopeName + "-metrics";
|
return "stream-" + scopeName + "-metrics";
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.streams.processor.internals;
|
package org.apache.kafka.streams.processor.internals.metrics;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.kafka.common.MetricName;
|
import org.apache.kafka.common.MetricName;
|
||||||
|
@ -23,12 +23,21 @@ import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||||
|
import static org.apache.kafka.common.utils.Utils.mkMap;
|
||||||
|
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
|
||||||
|
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
|
||||||
|
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
public class StreamsMetricsImplTest {
|
public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
|
@ -62,6 +71,60 @@ public class StreamsMetricsImplTest {
|
||||||
|
|
||||||
final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
|
final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
|
||||||
streamsMetrics.removeSensor(sensor3);
|
streamsMetrics.removeSensor(sensor3);
|
||||||
|
|
||||||
|
assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMutiLevelSensorRemoval() {
|
||||||
|
final Metrics registry = new Metrics();
|
||||||
|
final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, "");
|
||||||
|
for (final MetricName defaultMetric : registry.metrics().keySet()) {
|
||||||
|
registry.removeMetric(defaultMetric);
|
||||||
|
}
|
||||||
|
|
||||||
|
final String taskName = "taskName";
|
||||||
|
final String operation = "operation";
|
||||||
|
final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));
|
||||||
|
|
||||||
|
final String processorNodeName = "processorNodeName";
|
||||||
|
final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
|
||||||
|
|
||||||
|
final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
||||||
|
addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
|
addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
|
|
||||||
|
final int numberOfTaskMetrics = registry.metrics().size();
|
||||||
|
|
||||||
|
final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
|
||||||
|
addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
|
addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
|
||||||
|
|
||||||
|
metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
|
||||||
|
|
||||||
|
final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
|
||||||
|
addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
|
addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
|
||||||
|
|
||||||
|
final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
|
||||||
|
addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
|
addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
|
||||||
|
|
||||||
|
metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
|
||||||
|
|
||||||
|
metrics.removeAllTaskLevelSensors(taskName);
|
||||||
|
|
||||||
|
assertThat(registry.metrics().size(), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -115,21 +178,21 @@ public class StreamsMetricsImplTest {
|
||||||
final String operation = "op";
|
final String operation = "op";
|
||||||
|
|
||||||
final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
|
final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
|
||||||
scope,
|
scope,
|
||||||
entity,
|
entity,
|
||||||
operation,
|
operation,
|
||||||
Sensor.RecordingLevel.INFO
|
Sensor.RecordingLevel.INFO
|
||||||
);
|
);
|
||||||
|
|
||||||
final double latency = 100.0;
|
final double latency = 100.0;
|
||||||
final MetricName totalMetricName = metrics.metricName(
|
final MetricName totalMetricName = metrics.metricName(
|
||||||
"op-total",
|
"op-total",
|
||||||
"stream-scope-metrics",
|
"stream-scope-metrics",
|
||||||
"",
|
"",
|
||||||
"client-id",
|
"client-id",
|
||||||
"",
|
"",
|
||||||
"scope-id",
|
"scope-id",
|
||||||
"entity"
|
"entity"
|
||||||
);
|
);
|
||||||
|
|
||||||
final KafkaMetric totalMetric = metrics.metric(totalMetricName);
|
final KafkaMetric totalMetric = metrics.metric(totalMetricName);
|
Loading…
Reference in New Issue