MINOR: Delete task-level commit sensor (#14677)

The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Bruno Cadonna 2023-11-09 15:37:13 +01:00 committed by GitHub
parent f1e58a35d7
commit 81cceedf7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 2 additions and 116 deletions

View File

@ -2956,26 +2956,6 @@ active-process-ratio metrics which have a recording level of <code>info</code>:
<td>The total number of processed records across all source processor nodes of this task.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-latency-avg</td>
<td>The average execution time in ns, for committing.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-latency-max</td>
<td>The maximum execution time in ns, for committing.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-rate</td>
<td>The average number of commit calls per second.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-total</td>
<td>The total number of commit calls.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-lateness-avg</td>
<td>The average observed lateness of records (stream time - record timestamp).</td>

View File

@ -41,12 +41,6 @@ public class TaskMetrics {
private static final String RATE_DESCRIPTION_SUFFIX = " per second";
private static final String ACTIVE_TASK_PREFIX = "active-";
private static final String COMMIT = "commit";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
private static final String COMMIT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
private static final String PUNCTUATE = "punctuate";
private static final String PUNCTUATE_DESCRIPTION = "calls to punctuate";
private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION;
@ -191,22 +185,6 @@ public class TaskMetrics {
);
}
public static Sensor commitSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics,
final Sensor... parentSensor) {
return invocationRateAndCountSensor(
threadId,
taskId,
COMMIT,
COMMIT_RATE_DESCRIPTION,
COMMIT_TOTAL_DESCRIPTION,
Sensor.RecordingLevel.DEBUG,
streamsMetrics,
parentSensor
);
}
public static Sensor restoreSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics,

View File

@ -27,8 +27,6 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
@ -78,10 +76,6 @@ public class ThreadMetrics {
private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION;
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String COMMIT_OVER_TASKS_DESCRIPTION =
"calls to commit over all tasks assigned to one stream thread";
private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
private static final String PROCESS_RATIO_DESCRIPTION =
"The fraction of time the thread spent on processing active tasks";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
@ -225,22 +219,6 @@ public class ThreadMetrics {
);
}
public static Sensor commitOverTasksSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor commitOverTasksSensor =
streamsMetrics.threadLevelSensor(threadId, COMMIT, Sensor.RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.taskLevelTagMap(threadId, ROLLUP_VALUE);
addInvocationRateAndCountToSensor(
commitOverTasksSensor,
TASK_LEVEL_GROUP,
tagMap,
COMMIT,
COMMIT_OVER_TASKS_RATE_DESCRIPTION,
COMMIT_OVER_TASKS_TOTAL_DESCRIPTION
);
return commitOverTasksSensor;
}
public static Sensor processRatioSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor =

View File

@ -177,30 +177,6 @@ public class TaskMetricsTest {
}
}
@Test
public void shouldGetCommitSensor() {
final String operation = "commit";
final String totalDescription = "The total number of calls to commit";
final String rateDescription = "The average number of calls to commit per second";
when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);
try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
final Sensor sensor = TaskMetrics.commitSensor(THREAD_ID, TASK_ID, streamsMetrics);
streamsMetricsStaticMock.verify(
() -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
tagMap,
operation,
rateDescription,
totalDescription
)
);
assertThat(sensor, is(expectedSensor));
}
}
@Test
public void shouldGetEnforcedProcessingSensor() {
final String operation = "enforced-processing";

View File

@ -35,7 +35,6 @@ import org.mockito.MockedStatic;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -43,7 +42,6 @@ public class ThreadMetricsTest {
private static final String THREAD_ID = "thread-id";
private static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
private static final String TASK_LEVEL_GROUP = "stream-task-metrics";
private final Sensor expectedSensor = mock(Sensor.class);
private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
@ -287,32 +285,6 @@ public class ThreadMetricsTest {
}
}
@Test
public void shouldGetCommitOverTasksSensor() {
final String operation = "commit";
final String totalDescription =
"The total number of calls to commit over all tasks assigned to one stream thread";
final String rateDescription =
"The average per-second number of calls to commit over all tasks assigned to one stream thread";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
when(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).thenReturn(tagMap);
try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics);
streamsMetricsStaticMock.verify(
() -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
tagMap,
operation,
rateDescription,
totalDescription
)
);
assertThat(sensor, is(expectedSensor));
}
}
@Test
public void shouldGetPunctuateSensor() {
final String operation = "punctuate";
@ -371,6 +343,8 @@ public class ThreadMetricsTest {
assertThat(sensor, is(expectedSensor));
}
}
@Test
public void shouldGetCreateTaskSensor() {
final String operation = "task-created";
final String totalDescription = "The total number of newly created tasks";