diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index b479446389a..8594ea68c95 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -281,6 +281,11 @@ public class KafkaStreamsTelemetryIntegrationTest { streamsApplicationProperties = props(groupProtocol); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); + shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT); + shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT); + } + + private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); @@ -292,8 +297,8 @@ public class KafkaStreamsTelemetryIntegrationTest { - final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList(); - final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); + final List consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList(); + final List adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList(); assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 38a3e23e28a..7de5a534cac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -71,6 +71,17 @@ public interface StateStore { */ void init(final StateStoreContext stateStoreContext, final StateStore root); + + /** + * Assigns the store to a stream thread. + *

+ * This function is called from the final stream thread, + * thus can be used to initialize resources that might require to know the running thread, e.g. metrics. + *

+ * To access the thread use {@link Thread#currentThread()} + */ + default void assignThread() { } + /** * Flush any cached data */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index d9772027cb2..a2d8d1b52c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -48,6 +48,11 @@ abstract class AbstractReadWriteDecorator extends Wr throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void assignThread() { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java index dd5a2c6e1d7..acee901af33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java @@ -78,6 +78,11 @@ public class ReadOnlyTask implements Task { throw new UnsupportedOperationException("This task is read-only"); } + @Override + public void assignThread() { + throw new UnsupportedOperationException("This task is read-only"); + } + @Override public void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { throw new UnsupportedOperationException("This task is read-only"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4c6e6674bdb..4513e065ce7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; @@ -45,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric */ public class StandbyTask extends AbstractTask implements Task { private final boolean eosEnabled; - private final Sensor closeTaskSensor; - private final Sensor updateSensor; + private Sensor closeTaskSensor; + private Sensor updateSensor; private final StreamsMetricsImpl streamsMetrics; protected final InternalProcessorContext processorContext; @@ -83,8 +84,6 @@ public class StandbyTask extends AbstractTask implements Task { this.streamsMetrics = streamsMetrics; processorContext.transitionToStandby(cache); - closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); - updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics); this.eosEnabled = config.eosEnabled; } @@ -129,6 +128,15 @@ public class StandbyTask extends AbstractTask implements Task { } } + @Override + public void assignThread() { + closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); + updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics); + for (final StateStore stateStore : topology.stateStores()) { + stateStore.assignThread(); + } + } + @Override public void completeRestoration(final java.util.function.Consumer> offsetResetter) { throw new IllegalStateException("Standby task " + id + " should never be completing restoration"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 42b57e46aa4..fb15b88626d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.api.Record; @@ -278,6 +279,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } } + @Override + public void assignThread() { + for (final StateStore stateStore : topology.stateStores()) { + stateStore.assignThread(); + } + } + public void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { mainConsumer.pause(partitionsForOffsetReset); resetOffsetsForPartitions.addAll(partitionsForOffsetReset); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index ba09700af8a..3f338beea39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -110,6 +110,8 @@ public interface Task { */ void initializeIfNeeded(); + void assignThread(); + default void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { throw new UnsupportedOperationException(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f..c8a922f9533 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -343,6 +343,8 @@ public class TaskManager { final TaskId taskId = entry.getKey(); final Task task = stateDirectory.removeStartupTask(taskId); if (task != null) { + task.assignThread(); + // replace our dummy values with the real ones, now we know our thread and assignment final Set inputPartitions = entry.getValue(); task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); @@ -928,6 +930,7 @@ public class TaskManager { for (final Task task : tasks.allTasks()) { try { task.initializeIfNeeded(); + task.assignThread(); task.clearTaskTimeout(); } catch (final LockException lockException) { // it is possible that if there are multiple threads within the instance that one thread @@ -1082,6 +1085,7 @@ public class TaskManager { try { if (canTryInitializeTask(task.id(), nowMs)) { task.initializeIfNeeded(); + task.assignThread(); taskIdToBackoffRecord.remove(task.id()); stateUpdater.add(task); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index e05b6328ec8..cd5cf764e3a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; @@ -243,16 +242,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements Se public void init(final StateStoreContext stateStoreContext, final StateStore root) { this.internalProcessorContext = asInternalProcessorContext(stateStoreContext); - final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext); - final String threadId = Thread.currentThread().getName(); - final String taskName = stateStoreContext.taskId().toString(); - - expiredRecordSensor = TaskMetrics.droppedRecordsSensor( - threadId, - taskName, - metrics - ); - final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint); @@ -325,6 +314,15 @@ public class AbstractRocksDBSegmentedBytesStore implements Se false); } + @Override + public void assignThread() { + expiredRecordSensor = TaskMetrics.droppedRecordsSensor( + Thread.currentThread().getName(), + internalProcessorContext.taskId().toString(), + ProcessorContextUtils.metricsImpl(internalProcessorContext) + ); + } + @Override public void flush() { segments.flush(); @@ -404,4 +402,4 @@ public class AbstractRocksDBSegmentedBytesStore implements Se public Position getPosition() { return position; } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 83343d04494..f03273b4fb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -104,6 +104,11 @@ public class CachingKeyValueStore } }); super.init(stateStoreContext, root); + } + + @Override + public void assignThread() { + super.assignThread(); // save the stream thread as we only ever want to trigger a flush // when the stream thread is the current thread. streamThread = Thread.currentThread(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index ed2bb186886..b84c195004d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -25,10 +25,10 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; @@ -78,6 +78,7 @@ public class InMemorySessionStore implements SessionStore { private StateStoreContext stateStoreContext; private final Position position; + private TaskId taskId; InMemorySessionStore(final String name, final long retentionPeriod, @@ -97,22 +98,14 @@ public class InMemorySessionStore implements SessionStore { public void init(final StateStoreContext stateStoreContext, final StateStore root) { this.stateStoreContext = stateStoreContext; - final String threadId = Thread.currentThread().getName(); - final String taskName = stateStoreContext.taskId().toString(); + taskId = stateStoreContext.taskId(); // The provided context is not required to implement InternalProcessorContext, // If it doesn't, we can't record this metric. if (stateStoreContext instanceof InternalProcessorContext) { this.context = (InternalProcessorContext) stateStoreContext; - final StreamsMetricsImpl metrics = this.context.metrics(); - expiredRecordSensor = TaskMetrics.droppedRecordsSensor( - threadId, - taskName, - metrics - ); } else { this.context = null; - expiredRecordSensor = null; } if (root != null) { @@ -140,6 +133,19 @@ public class InMemorySessionStore implements SessionStore { open = true; } + @Override + public void assignThread() { + if (context != null) { + expiredRecordSensor = TaskMetrics.droppedRecordsSensor( + Thread.currentThread().getName(), + taskId.toString(), + this.context.metrics() + ); + } else { + expiredRecordSensor = null; + } + } + @Override public Position getPosition() { return position; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java index b1471591cb6..9b2468ba678 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java @@ -202,6 +202,14 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer implements T taskId = context.taskId().toString(); streamsMetrics = context.metrics(); + this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch); + updateBufferMetrics(); + open = true; + partition = context.taskId().partition(); + } + + @Override + public void assignThread() { bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( taskId, METRIC_SCOPE, @@ -214,11 +222,6 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer implements T storeName, streamsMetrics ); - - this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch); - updateBufferMetrics(); - open = true; - partition = context.taskId().partition(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index d3d5ba4a20d..ef576b979e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; @@ -104,15 +103,6 @@ public class InMemoryWindowStore implements WindowStore { final StateStore root) { this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext); - final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext); - final String threadId = Thread.currentThread().getName(); - final String taskName = stateStoreContext.taskId().toString(); - expiredRecordSensor = TaskMetrics.droppedRecordsSensor( - threadId, - taskName, - metrics - ); - if (root != null) { final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean( stateStoreContext.appConfigs(), @@ -142,6 +132,15 @@ public class InMemoryWindowStore implements WindowStore { open = true; } + @Override + public void assignThread() { + expiredRecordSensor = TaskMetrics.droppedRecordsSensor( + Thread.currentThread().getName(), + internalProcessorContext.taskId().toString(), + ProcessorContextUtils.metricsImpl(internalProcessorContext) + ); + } + @Override public Position getPosition() { return position; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java index a18d901b83f..e7b2c39170c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -65,6 +65,7 @@ class KeyValueSegments extends AbstractSegments { @Override public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); + metricsRecorder.assignThread(); super.openExisting(context, streamTime); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index dce28444d85..b17ce42fbf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -126,6 +126,11 @@ public class KeyValueStoreWrapper implements StateStore { store.init(stateStoreContext, root); } + @Override + public void assignThread() { + store.assignThread(); + } + @Override public void flush() { store.flush(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 8e79a86bc2b..4ff06f9db41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -99,6 +99,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt store.init(stateStoreContext, root); } + @Override + public void assignThread() { + store.assignThread(); + } + @Override public void flush() { store.flush(); @@ -235,4 +240,4 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt return KeyValue.pair(next.key, convertToTimestampedFormat(next.value)); } } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index 18b371048c3..ca8635963ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -141,6 +141,11 @@ class LogicalKeyValueSegment implements Comparable, Segm throw new UnsupportedOperationException("cannot initialize a logical segment"); } + @Override + public void assignThread() { + throw new UnsupportedOperationException("nothing to reassign"); + } + @Override public void flush() { throw new UnsupportedOperationException("nothing to flush for logical segment"); @@ -368,4 +373,4 @@ class LogicalKeyValueSegment implements Comparable, Segm private static byte[] serializeLongToBytes(final long l) { return ByteBuffer.allocate(Long.BYTES).putLong(l).array(); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index c46a2c2788c..f682b725111 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -105,6 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments initStoreSerde(stateStoreContext); streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); - registerMetrics(); - - restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - super.init(stateStoreContext, root); } + @Override + public void assignThread() { + registerMetrics(); + super.assignThread(); + } + private void registerMetrics() { putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics); putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics); @@ -150,6 +152,7 @@ public class MeteredKeyValueStore StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, (config, now) -> openIterators.sum()); openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics); + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7794a6ebc51..0cb3f3b61ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -108,12 +108,15 @@ public class MeteredSessionStore initStoreSerde(stateStoreContext); streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); - registerMetrics(); - restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); - super.init(stateStoreContext, root); } + @Override + public void assignThread() { + registerMetrics(); + super.assignThread(); + } + private void registerMetrics() { putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics); fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics); @@ -129,6 +132,8 @@ public class MeteredSessionStore return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null; } ); + + restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ede618237cf..1ecb8d5713e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -179,6 +179,11 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS false); } + @Override + public void assignThread() { + metricsRecorder.assignThread(); + } + @SuppressWarnings("unchecked") void openDB(final Map configs, final File stateDir) { // initialize the default rocksdb options @@ -1016,4 +1021,4 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS return null; } } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 54580a26a1b..1d3fb90f930 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; @@ -352,16 +351,6 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore> records) { @@ -1012,4 +1011,4 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore { @Override public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); + metricsRecorder.assignThread(); super.openExisting(context, streamTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java index 5daa6ed1815..072ebbe0e6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java @@ -92,6 +92,11 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore inner.init(stateStoreContext, root); } + @Override + public void assignThread() { + inner.assignThread(); + } + @Override public void flush() { inner.flush(); @@ -180,4 +185,4 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore null, ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp())); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index eec2e2ff1d8..0bec148bb91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -160,6 +160,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore implements S wrapped.init(stateStoreContext, root); } + @Override + public void assignThread() { + wrapped.assignThread(); + } + @SuppressWarnings("unchecked") @Override public boolean setFlushListener(final CacheFlushListener listener, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 10e8cb804fe..5f5f49c645f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -146,11 +146,14 @@ public class RocksDBMetricsRecorder { + "This is a bug in Kafka Streams. " + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); } + this.taskId = taskId; + this.streamsMetrics = streamsMetrics; + } + + public void assignThread() { final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName); initSensors(streamsMetrics, metricContext); initGauges(streamsMetrics, metricContext); - this.taskId = taskId; - this.streamsMetrics = streamsMetrics; } public void addValueProviders(final String segmentName, @@ -510,4 +513,4 @@ public class RocksDBMetricsRecorder { } return (double) sum / count; } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 8d83d1e99fa..becf1d22af9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4898,6 +4898,10 @@ public class TaskManagerTest { } } + @Override + public void assignThread() { + } + @Override public void addPartitionsForOffsetReset(final Set partitionsForOffsetReset) { this.partitionsForOffsetReset = partitionsForOffsetReset; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 81c90d043ce..c6c984f6211 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1213,6 +1213,11 @@ public class TopologyTestDriver implements Closeable { inner.init(stateStoreContext, root); } + @Override + public void assignThread() { + inner.assignThread(); + } + @Override public void put(final K key, final V value) { inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)); @@ -1277,6 +1282,11 @@ public class TopologyTestDriver implements Closeable { inner.init(stateStoreContext, root); } + @Override + public void assignThread() { + inner.assignThread(); + } + @Override public void put(final K key, final V value,