MINOR: Clean up process rate and latency metrics test (#8172)

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Bruno Cadonna 2020-03-02 17:22:01 +01:00 committed by GitHub
parent 75268cb8e9
commit ea0c027531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 46 deletions

View File

@ -150,9 +150,8 @@ public class ThreadMetrics {
public static Sensor processLatencySensor(final String threadId, public static Sensor processLatencySensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) { final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, final Sensor sensor =
PROCESS + LATENCY_SUFFIX, streamsMetrics.threadLevelSensor(threadId, PROCESS + LATENCY_SUFFIX, RecordingLevel.INFO);
RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics); final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addAvgAndMaxToSensor( addAvgAndMaxToSensor(
@ -167,10 +166,9 @@ public class ThreadMetrics {
} }
public static Sensor processRateSensor(final String threadId, public static Sensor processRateSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) { final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, final Sensor sensor =
PROCESS + RATE_SUFFIX, streamsMetrics.threadLevelSensor(threadId, PROCESS + RATE_SUFFIX, RecordingLevel.INFO);
RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId); final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
final String threadLevelGroup = threadLevelGroup(streamsMetrics); final String threadLevelGroup = threadLevelGroup(streamsMetrics);
addRateOfSumAndSumMetricsToSensor( addRateOfSumAndSumMetricsToSensor(

View File

@ -16,7 +16,6 @@
*/ */
package org.apache.kafka.streams.processor.internals.metrics; package org.apache.kafka.streams.processor.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
@ -35,9 +34,9 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock; import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@ -78,6 +77,7 @@ public class ThreadMetricsTest {
@Before @Before
public void setUp() { public void setUp() {
expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes(); expect(streamsMetrics.version()).andReturn(builtInMetricsVersion).anyTimes();
mockStatic(StreamsMetricsImpl.class);
} }
@Test @Test
@ -87,7 +87,6 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of newly created tasks"; final String rateDescription = "The average per-second number of newly created tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -111,7 +110,6 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of closed tasks"; final String rateDescription = "The average per-second number of closed tasks";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -138,7 +136,6 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum commit latency"; final String maxLatencyDescription = "The maximum commit latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -172,7 +169,6 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum poll latency"; final String maxLatencyDescription = "The maximum poll latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -199,22 +195,19 @@ public class ThreadMetricsTest {
@Test @Test
public void shouldGetProcessLatencySensor() { public void shouldGetProcessLatencySensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-latency", RecordingLevel.INFO)) final String operationLatency = "process" + LATENCY_SUFFIX;
.andReturn(expectedSensor); final String avgLatencyDescription = "The average process latency";
final String maxLatencyDescription = "The maximum process latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName( StreamsMetricsImpl.addAvgAndMaxToSensor(
"process-latency-avg", expectedSensor,
threadLevelGroup, threadLevelGroup,
"The average execution time in ms for processing, across all running tasks of this thread.", tagMap,
tagMap operationLatency,
)), anyObject())).andReturn(true); avgLatencyDescription,
maxLatencyDescription
expect(expectedSensor.add(eq(new MetricName( );
"process-latency-max",
threadLevelGroup,
"The maximum execution time in ms for processing across all running tasks of this thread.",
tagMap
)), anyObject())).andReturn(true);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);
final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics); final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
@ -225,22 +218,20 @@ public class ThreadMetricsTest {
@Test @Test
public void shouldGetProcessRateSensor() { public void shouldGetProcessRateSensor() {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, "process-rate", RecordingLevel.INFO)) final String operation = "process";
.andReturn(expectedSensor); final String operationRate = "process" + RATE_SUFFIX;
final String totalDescription = "The total number of calls to process";
final String rateDescription = "The average per-second number of calls to process";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
expect(expectedSensor.add(eq(new MetricName( StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
"process-rate", expectedSensor,
threadLevelGroup, threadLevelGroup,
"The average per-second number of calls to process", tagMap,
tagMap operation,
)), anyObject())).andReturn(true); rateDescription,
totalDescription
expect(expectedSensor.add(eq(new MetricName( );
"process-total",
threadLevelGroup,
"The total number of calls to process",
tagMap
)), anyObject())).andReturn(true);
replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor); replay(StreamsMetricsImpl.class, streamsMetrics, expectedSensor);
final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics); final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
@ -259,13 +250,13 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum punctuate latency"; final String maxLatencyDescription = "The maximum punctuate latency";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
tagMap, tagMap,
operation, operation,
rateDescription, totalDescription rateDescription,
totalDescription
); );
StreamsMetricsImpl.addAvgAndMaxToSensor( StreamsMetricsImpl.addAvgAndMaxToSensor(
expectedSensor, expectedSensor,
@ -291,7 +282,6 @@ public class ThreadMetricsTest {
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)) expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO))
.andReturn(expectedSensor); .andReturn(expectedSensor);
expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
threadLevelGroup, threadLevelGroup,
@ -322,7 +312,6 @@ public class ThreadMetricsTest {
"The maximum commit latency over all tasks assigned to one stream thread"; "The maximum commit latency over all tasks assigned to one stream thread";
expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor); expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).andReturn(expectedSensor);
expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap); expect(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).andReturn(tagMap);
mockStatic(StreamsMetricsImpl.class);
StreamsMetricsImpl.addInvocationRateAndCountToSensor( StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor, expectedSensor,
TASK_LEVEL_GROUP, TASK_LEVEL_GROUP,