KAFKA-9152; Improve Sensor Retrieval (#7928)

This ticket shall improve two aspects of the retrieval of sensors:
https://issues.apache.org/jira/browse/KAFKA-9152

Currently, when a sensor is retrieved with *Metrics.*Sensor() (e.g. ThreadMetrics.createTaskSensor()) after it was created with the same method *Metrics.*Sensor(), the sensor is added again to the corresponding queue in Sensors (e.g. threadLevelSensors) in StreamsMetricsImpl. Those queues are used to remove the sensors when removeAllLevelSensors() is called. Having multiple times the same sensors in this queue is not an issue from a correctness point of view. However, it would reduce the footprint to only store a sensor once in those queues.

When a sensor is retrieved, the current code attempts to create a new sensor and to add to it again the corresponding metrics. This could be avoided.

Both aspects could be improved by checking whether a sensor already exists by calling getSensor() on the Metrics object and checking the return value.

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
highluck 2020-01-25 06:55:50 +09:00 committed by Bill Bejeck
parent 8e5faca963
commit 2e351e06b3
2 changed files with 250 additions and 56 deletions

View File

@ -46,6 +46,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.Optional;
public class StreamsMetricsImpl implements StreamsMetrics {
@ -210,11 +211,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor... parents) {
final String key = threadSensorPrefix(threadId);
synchronized (threadLevelSensors) {
threadLevelSensors.putIfAbsent(key, new LinkedList<>());
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
threadLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
threadLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}
@ -290,13 +292,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor... parents) {
final String key = taskSensorPrefix(threadId, taskId);
synchronized (taskLevelSensors) {
if (!taskLevelSensors.containsKey(key)) {
taskLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
taskLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
taskLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}
@ -323,17 +324,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor... parents) {
final String key = nodeSensorPrefix(threadId, taskId, processorNodeName);
synchronized (nodeLevelSensors) {
if (!nodeLevelSensors.containsKey(key)) {
nodeLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
nodeLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
nodeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}
@ -362,17 +358,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor... parents) {
final String key = cacheSensorPrefix(threadId, taskName, storeName);
synchronized (cacheLevelSensors) {
if (!cacheLevelSensors.containsKey(key)) {
cacheLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
cacheLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
cacheLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}
@ -413,15 +404,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
final Sensor... parents) {
final String key = storeSensorPrefix(threadId, taskId, storeName);
synchronized (storeLevelSensors) {
if (!storeLevelSensors.containsKey(key)) {
storeLevelSensors.put(key, new LinkedList<>());
}
final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents);
storeLevelSensors.get(key).push(fullSensorName);
return sensor;
return Optional.ofNullable(metrics.getSensor(fullSensorName))
.orElseGet(() -> {
storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName);
return metrics.sensor(fullSensorName, recordingLevel, parents);
});
}
}

View File

@ -193,16 +193,235 @@ public class StreamsMetricsImplTest {
streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, storeName, sensorName2, RecordingLevel.INFO);
}
private void setupGetSensorTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
final String fullSensorName =
INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1;
private void setupGetNewSensorTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(null);
final Sensor[] parents = {};
expect(metrics.sensor(fullSensorName, recordingLevel, parents)).andReturn(sensor);
replay(metrics);
}
private void setupGetExistingSensorTest(final Metrics metrics,
final String level) {
final String fullSensorName = fullSensorName(level);
expect(metrics.getSensor(fullSensorName)).andStubReturn(sensor);
replay(metrics);
}
private String fullSensorName(final String level) {
return INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1;
}
@Test
public void shouldGetNewThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(
THREAD_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetNewTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingTaskLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(metrics, THREAD_ID + ".task." + TASK_ID);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.taskLevelSensor(
THREAD_ID,
TASK_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetNewStoreLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetNewSensorTest(
metrics,
THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingStoreLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER
+ TASK_ID
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.storeLevelSensor(
THREAD_ID,
storeName,
TASK_ID,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetNewNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + "node"
+ SENSOR_PREFIX_DELIMITER + processorNodeName,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
processorNodeName,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingNodeLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorNodeName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "node" + SENSOR_PREFIX_DELIMITER + processorNodeName
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.nodeLevelSensor(
THREAD_ID,
TASK_ID,
processorNodeName,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetNewCacheLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetNewSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName,
recordingLevel
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID,
TASK_ID,
processorCacheName,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldGetExistingCacheLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
final String processorCacheName = "processorNodeName";
setupGetExistingSensorTest(
metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER
+ "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName
);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.cacheLevelSensor(
THREAD_ID, TASK_ID,
processorCacheName,
sensorName1,
recordingLevel
);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
@Test
public void shouldAddClientLevelImmutableMetric() {
final Metrics metrics = mock(Metrics.class);
@ -244,19 +463,6 @@ public class StreamsMetricsImplTest {
assertThat(ROLLUP_VALUE, is("all"));
}
@Test
public void shouldGetThreadLevelSensor() {
final Metrics metrics = mock(Metrics.class);
final RecordingLevel recordingLevel = RecordingLevel.INFO;
setupGetSensorTest(metrics, THREAD_ID, recordingLevel);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION);
final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID, sensorName1, recordingLevel);
verify(metrics);
assertThat(actualSensor, is(equalToObject(sensor)));
}
private void setupRemoveSensorsTest(final Metrics metrics,
final String level,
final RecordingLevel recordingLevel) {