diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index d5b594a2175..6c86689d37e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -46,6 +46,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.Optional; public class StreamsMetricsImpl implements StreamsMetrics { @@ -210,11 +211,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { final Sensor... parents) { final String key = threadSensorPrefix(threadId); synchronized (threadLevelSensors) { - threadLevelSensors.putIfAbsent(key, new LinkedList<>()); final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - threadLevelSensors.get(key).push(fullSensorName); - return sensor; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + threadLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); + return metrics.sensor(fullSensorName, recordingLevel, parents); + }); } } @@ -290,13 +292,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { final Sensor... parents) { final String key = taskSensorPrefix(threadId, taskId); synchronized (taskLevelSensors) { - if (!taskLevelSensors.containsKey(key)) { - taskLevelSensors.put(key, new LinkedList<>()); - } final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - taskLevelSensors.get(key).push(fullSensorName); - return sensor; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + taskLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); + return metrics.sensor(fullSensorName, recordingLevel, parents); + }); } } @@ -323,17 +324,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { final Sensor... parents) { final String key = nodeSensorPrefix(threadId, taskId, processorNodeName); synchronized (nodeLevelSensors) { - if (!nodeLevelSensors.containsKey(key)) { - nodeLevelSensors.put(key, new LinkedList<>()); - } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - nodeLevelSensors.get(key).push(fullSensorName); - - return sensor; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + nodeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); + return metrics.sensor(fullSensorName, recordingLevel, parents); + }); } } @@ -362,17 +358,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { final Sensor... parents) { final String key = cacheSensorPrefix(threadId, taskName, storeName); synchronized (cacheLevelSensors) { - if (!cacheLevelSensors.containsKey(key)) { - cacheLevelSensors.put(key, new LinkedList<>()); - } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - cacheLevelSensors.get(key).push(fullSensorName); - - return sensor; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + cacheLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); + return metrics.sensor(fullSensorName, recordingLevel, parents); + }); } } @@ -413,15 +404,12 @@ public class StreamsMetricsImpl implements StreamsMetrics { final Sensor... parents) { final String key = storeSensorPrefix(threadId, taskId, storeName); synchronized (storeLevelSensors) { - if (!storeLevelSensors.containsKey(key)) { - storeLevelSensors.put(key, new LinkedList<>()); - } final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - storeLevelSensors.get(key).push(fullSensorName); - - return sensor; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); + return metrics.sensor(fullSensorName, recordingLevel, parents); + }); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 2c980a94e5d..75057644d81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -193,16 +193,235 @@ public class StreamsMetricsImplTest { streamsMetrics.storeLevelSensor(THREAD_ID, TASK_ID, storeName, sensorName2, RecordingLevel.INFO); } - private void setupGetSensorTest(final Metrics metrics, - final String level, - final RecordingLevel recordingLevel) { - final String fullSensorName = - INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1; + private void setupGetNewSensorTest(final Metrics metrics, + final String level, + final RecordingLevel recordingLevel) { + final String fullSensorName = fullSensorName(level); + expect(metrics.getSensor(fullSensorName)).andStubReturn(null); final Sensor[] parents = {}; expect(metrics.sensor(fullSensorName, recordingLevel, parents)).andReturn(sensor); replay(metrics); } + private void setupGetExistingSensorTest(final Metrics metrics, + final String level) { + final String fullSensorName = fullSensorName(level); + expect(metrics.getSensor(fullSensorName)).andStubReturn(sensor); + replay(metrics); + } + + private String fullSensorName(final String level) { + return INTERNAL_PREFIX + SENSOR_PREFIX_DELIMITER + level + SENSOR_NAME_DELIMITER + sensorName1; + } + + @Test + public void shouldGetNewThreadLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetNewSensorTest(metrics, THREAD_ID, recordingLevel); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.threadLevelSensor( + THREAD_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetExistingThreadLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetExistingSensorTest(metrics, THREAD_ID); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.threadLevelSensor( + THREAD_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetNewTaskLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID, recordingLevel); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.taskLevelSensor( + THREAD_ID, + TASK_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetExistingTaskLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetExistingSensorTest(metrics, THREAD_ID + ".task." + TASK_ID); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.taskLevelSensor( + THREAD_ID, + TASK_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetNewStoreLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetNewSensorTest( + metrics, + THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER + + TASK_ID, + recordingLevel + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.storeLevelSensor( + THREAD_ID, + storeName, + TASK_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetExistingStoreLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + setupGetExistingSensorTest( + metrics, THREAD_ID + ".task." + storeName + SENSOR_PREFIX_DELIMITER + storeName + SENSOR_PREFIX_DELIMITER + + TASK_ID + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.storeLevelSensor( + THREAD_ID, + storeName, + TASK_ID, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetNewNodeLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String processorNodeName = "processorNodeName"; + setupGetNewSensorTest(metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + "node" + + SENSOR_PREFIX_DELIMITER + processorNodeName, + recordingLevel + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.nodeLevelSensor( + THREAD_ID, + TASK_ID, + processorNodeName, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetExistingNodeLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String processorNodeName = "processorNodeName"; + setupGetExistingSensorTest( + metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + + "node" + SENSOR_PREFIX_DELIMITER + processorNodeName + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.nodeLevelSensor( + THREAD_ID, + TASK_ID, + processorNodeName, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetNewCacheLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String processorCacheName = "processorNodeName"; + setupGetNewSensorTest( + metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + + "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName, + recordingLevel + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.cacheLevelSensor( + THREAD_ID, + TASK_ID, + processorCacheName, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + + @Test + public void shouldGetExistingCacheLevelSensor() { + final Metrics metrics = mock(Metrics.class); + final RecordingLevel recordingLevel = RecordingLevel.INFO; + final String processorCacheName = "processorNodeName"; + setupGetExistingSensorTest( + metrics, THREAD_ID + ".task." + TASK_ID + SENSOR_PREFIX_DELIMITER + + "cache" + SENSOR_PREFIX_DELIMITER + processorCacheName + ); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); + + final Sensor actualSensor = streamsMetrics.cacheLevelSensor( + THREAD_ID, TASK_ID, + processorCacheName, + sensorName1, + recordingLevel + ); + + verify(metrics); + assertThat(actualSensor, is(equalToObject(sensor))); + } + @Test public void shouldAddClientLevelImmutableMetric() { final Metrics metrics = mock(Metrics.class); @@ -244,19 +463,6 @@ public class StreamsMetricsImplTest { assertThat(ROLLUP_VALUE, is("all")); } - @Test - public void shouldGetThreadLevelSensor() { - final Metrics metrics = mock(Metrics.class); - final RecordingLevel recordingLevel = RecordingLevel.INFO; - setupGetSensorTest(metrics, THREAD_ID, recordingLevel); - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION); - - final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID, sensorName1, recordingLevel); - - verify(metrics); - assertThat(actualSensor, is(equalToObject(sensor))); - } - private void setupRemoveSensorsTest(final Metrics metrics, final String level, final RecordingLevel recordingLevel) {