mirror of https://github.com/apache/kafka.git
KAFKA-19529: State updater sensor names should be unique (#20262) (#20274)
CI / build (push) Has been cancelled
Details
CI / build (push) Has been cancelled
Details
All state updater threads use the same metrics instance, but do not use unique names for their sensors. This can have the following symptoms: 1) Data inserted into one sensor by one thread can affect the metrics of all state updater threads. 2) If one state updater thread is shutdown, the metrics associated to all state updater threads are removed. 3) If one state updater thread is started, while another one is removed, it can happen that a metric is registered with the `Metrics` instance, but not associated to any `Sensor` (because it is concurrently removed), which means that the metric will not be removed upon shutdown. If a thread with the same name later tries to register the same metric, we may run into a `java.lang.IllegalArgumentException: A metric named ... already exists`, as described in the ticket. This change fixes the bug giving unique names to the sensors. A test is added that there is no interference of the removal of sensors and metrics during shutdown. Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
9c83c6d1f3
commit
0179193b75
|
@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
|
|||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.Task.State;
|
||||
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
|
@ -89,7 +90,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl<>();
|
||||
|
||||
public StateUpdaterThread(final String name,
|
||||
final Metrics metrics,
|
||||
final StreamsMetricsImpl metrics,
|
||||
final ChangelogReader changelogReader) {
|
||||
super(name);
|
||||
this.changelogReader = changelogReader;
|
||||
|
@ -745,7 +746,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private final Time time;
|
||||
private final Logger log;
|
||||
private final String name;
|
||||
private final Metrics metrics;
|
||||
private final StreamsMetricsImpl metrics;
|
||||
private final Consumer<byte[], byte[]> restoreConsumer;
|
||||
private final ChangelogReader changelogReader;
|
||||
private final TopologyMetadata topologyMetadata;
|
||||
|
@ -766,7 +767,7 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private StateUpdaterThread stateUpdaterThread = null;
|
||||
|
||||
public DefaultStateUpdater(final String name,
|
||||
final Metrics metrics,
|
||||
final StreamsMetricsImpl metrics,
|
||||
final StreamsConfig config,
|
||||
final Consumer<byte[], byte[]> restoreConsumer,
|
||||
final ChangelogReader changelogReader,
|
||||
|
@ -1062,70 +1063,71 @@ public class DefaultStateUpdater implements StateUpdater {
|
|||
private final Sensor standbyRestoreRatioSensor;
|
||||
private final Sensor checkpointRatioSensor;
|
||||
|
||||
private final Deque<String> allSensorNames = new LinkedList<>();
|
||||
private final Deque<Sensor> allSensors = new LinkedList<>();
|
||||
private final Deque<MetricName> allMetricNames = new LinkedList<>();
|
||||
|
||||
private StateUpdaterMetrics(final Metrics metrics, final String threadId) {
|
||||
private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threadId) {
|
||||
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
|
||||
threadLevelTags.put(THREAD_ID_TAG, threadId);
|
||||
final Metrics metricsRegistry = metrics.metricsRegistry();
|
||||
|
||||
MetricName metricName = metrics.metricName("active-restoring-tasks",
|
||||
MetricName metricName = metricsRegistry.metricName("active-restoring-tasks",
|
||||
STATE_LEVEL_GROUP,
|
||||
"The number of active tasks currently undergoing restoration",
|
||||
threadLevelTags);
|
||||
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
stateUpdaterThread.numRestoringActiveTasks() : 0);
|
||||
allMetricNames.push(metricName);
|
||||
|
||||
metricName = metrics.metricName("standby-updating-tasks",
|
||||
metricName = metricsRegistry.metricName("standby-updating-tasks",
|
||||
STATE_LEVEL_GROUP,
|
||||
"The number of standby tasks currently undergoing state update",
|
||||
threadLevelTags);
|
||||
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
stateUpdaterThread.numUpdatingStandbyTasks() : 0);
|
||||
allMetricNames.push(metricName);
|
||||
|
||||
metricName = metrics.metricName("active-paused-tasks",
|
||||
metricName = metricsRegistry.metricName("active-paused-tasks",
|
||||
STATE_LEVEL_GROUP,
|
||||
"The number of active tasks paused restoring",
|
||||
threadLevelTags);
|
||||
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
stateUpdaterThread.numPausedActiveTasks() : 0);
|
||||
allMetricNames.push(metricName);
|
||||
|
||||
metricName = metrics.metricName("standby-paused-tasks",
|
||||
metricName = metricsRegistry.metricName("standby-paused-tasks",
|
||||
STATE_LEVEL_GROUP,
|
||||
"The number of standby tasks paused state update",
|
||||
threadLevelTags);
|
||||
metrics.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
metricsRegistry.addMetric(metricName, (config, now) -> stateUpdaterThread != null ?
|
||||
stateUpdaterThread.numPausedStandbyTasks() : 0);
|
||||
allMetricNames.push(metricName);
|
||||
|
||||
this.idleRatioSensor = metrics.sensor("idle-ratio", RecordingLevel.INFO);
|
||||
this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
|
||||
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
|
||||
allSensorNames.add("idle-ratio");
|
||||
allSensors.add(this.idleRatioSensor);
|
||||
|
||||
this.activeRestoreRatioSensor = metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
|
||||
this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
|
||||
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
|
||||
allSensorNames.add("active-restore-ratio");
|
||||
allSensors.add(this.activeRestoreRatioSensor);
|
||||
|
||||
this.standbyRestoreRatioSensor = metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
|
||||
this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
|
||||
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
|
||||
allSensorNames.add("standby-update-ratio");
|
||||
allSensors.add(this.standbyRestoreRatioSensor);
|
||||
|
||||
this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio", RecordingLevel.INFO);
|
||||
this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
|
||||
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
|
||||
allSensorNames.add("checkpoint-ratio");
|
||||
allSensors.add(this.checkpointRatioSensor);
|
||||
|
||||
this.restoreSensor = metrics.sensor("restore-records", RecordingLevel.INFO);
|
||||
this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
|
||||
this.restoreSensor.add(new MetricName("restore-records-rate", STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new Rate());
|
||||
this.restoreSensor.add(new MetricName("restore-call-rate", STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new WindowedCount()));
|
||||
allSensorNames.add("restore-records");
|
||||
allSensors.add(this.restoreSensor);
|
||||
}
|
||||
|
||||
void clear() {
|
||||
while (!allSensorNames.isEmpty()) {
|
||||
metrics.removeSensor(allSensorNames.pop());
|
||||
while (!allSensors.isEmpty()) {
|
||||
metrics.removeSensor(allSensors.pop());
|
||||
}
|
||||
|
||||
while (!allMetricNames.isEmpty()) {
|
||||
|
|
|
@ -646,7 +646,7 @@ public class StreamThread extends Thread implements ProcessingThread {
|
|||
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
|
||||
final StateUpdater stateUpdater = new DefaultStateUpdater(
|
||||
name,
|
||||
streamsMetrics.metricsRegistry(),
|
||||
streamsMetrics,
|
||||
streamsConfig,
|
||||
restoreConsumer,
|
||||
changelogReader,
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
|
|||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
|
||||
import org.apache.kafka.streams.processor.internals.Task.State;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
|
||||
import org.hamcrest.Matcher;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -105,7 +106,7 @@ class DefaultStateUpdaterTest {
|
|||
|
||||
// need an auto-tick timer to work for draining with timeout
|
||||
private final Time time = new MockTime(1L);
|
||||
private final Metrics metrics = new Metrics(time);
|
||||
private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new Metrics(time), "", "", time);
|
||||
private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
|
||||
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
|
||||
private final TopologyMetadata topologyMetadata = unnamedTopology().build();
|
||||
|
@ -1680,8 +1681,59 @@ class DefaultStateUpdaterTest {
|
|||
assertThat(metrics.metrics().size(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRemoveMetricsWithoutInterference() {
|
||||
final DefaultStateUpdater stateUpdater2 =
|
||||
new DefaultStateUpdater("test-state-updater2", metrics, config, null, changelogReader, topologyMetadata, time);
|
||||
final List<MetricName> threadMetrics = getMetricNames("test-state-updater");
|
||||
final List<MetricName> threadMetrics2 = getMetricNames("test-state-updater2");
|
||||
|
||||
stateUpdater.start();
|
||||
stateUpdater2.start();
|
||||
|
||||
for (final MetricName metricName : threadMetrics) {
|
||||
assertTrue(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
for (final MetricName metricName : threadMetrics2) {
|
||||
assertTrue(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
|
||||
stateUpdater2.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
for (final MetricName metricName : threadMetrics) {
|
||||
assertTrue(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
for (final MetricName metricName : threadMetrics2) {
|
||||
assertFalse(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
|
||||
stateUpdater.shutdown(Duration.ofMinutes(1));
|
||||
|
||||
for (final MetricName metricName : threadMetrics) {
|
||||
assertFalse(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
for (final MetricName metricName : threadMetrics2) {
|
||||
assertFalse(metrics.metrics().containsKey(metricName));
|
||||
}
|
||||
}
|
||||
|
||||
private static List<MetricName> getMetricNames(final String threadId) {
|
||||
final Map<String, String> tagMap = Map.of("thread-id", threadId);
|
||||
return List.of(
|
||||
new MetricName("active-restoring-tasks", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("standby-updating-tasks", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("active-paused-tasks", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("standby-paused-tasks", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("idle-ratio", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("standby-update-ratio", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("checkpoint-ratio", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("restore-records-rate", "stream-state-updater-metrics", "", tagMap),
|
||||
new MetricName("restore-call-rate", "stream-state-updater-metrics", "", tagMap)
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> void verifyMetric(final Metrics metrics,
|
||||
private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
|
||||
final MetricName metricName,
|
||||
final Matcher<T> matcher) {
|
||||
assertThat(metrics.metrics().get(metricName).metricName().description(), is(metricName.description()));
|
||||
|
|
Loading…
Reference in New Issue