This commit is contained in:
Nikita Shupletsov 2025-10-07 09:14:56 -07:00 committed by GitHub
commit 45505f4719
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 202 additions and 83 deletions

View File

@ -281,6 +281,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
streamsApplicationProperties = props(groupProtocol); streamsApplicationProperties = props(groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology(); final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
}
private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) { try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
@ -292,8 +297,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList(); final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList(); final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();
assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size()); assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());

View File

@ -71,6 +71,17 @@ public interface StateStore {
*/ */
void init(final StateStoreContext stateStoreContext, final StateStore root); void init(final StateStoreContext stateStoreContext, final StateStore root);
/**
* Assigns the store to a stream thread.
* <p>
* This function is called from the final stream thread,
* thus can be used to initialize resources that might require to know the running thread, e.g. metrics.
* </p>
* To access the thread use {@link Thread#currentThread()}
*/
default void assignThread() { }
/** /**
* Flush any cached data * Flush any cached data
*/ */

View File

@ -48,6 +48,11 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
throw new UnsupportedOperationException(ERROR_MESSAGE); throw new UnsupportedOperationException(ERROR_MESSAGE);
} }
@Override
public void assignThread() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override @Override
public void close() { public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE); throw new UnsupportedOperationException(ERROR_MESSAGE);

View File

@ -78,6 +78,11 @@ public class ReadOnlyTask implements Task {
throw new UnsupportedOperationException("This task is read-only"); throw new UnsupportedOperationException("This task is read-only");
} }
@Override
public void assignThread() {
throw new UnsupportedOperationException("This task is read-only");
}
@Override @Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) { public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException("This task is read-only"); throw new UnsupportedOperationException("This task is read-only");

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -45,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/ */
public class StandbyTask extends AbstractTask implements Task { public class StandbyTask extends AbstractTask implements Task {
private final boolean eosEnabled; private final boolean eosEnabled;
private final Sensor closeTaskSensor; private Sensor closeTaskSensor;
private final Sensor updateSensor; private Sensor updateSensor;
private final StreamsMetricsImpl streamsMetrics; private final StreamsMetricsImpl streamsMetrics;
protected final InternalProcessorContext<?, ?> processorContext; protected final InternalProcessorContext<?, ?> processorContext;
@ -83,8 +84,6 @@ public class StandbyTask extends AbstractTask implements Task {
this.streamsMetrics = streamsMetrics; this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache); processorContext.transitionToStandby(cache);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
this.eosEnabled = config.eosEnabled; this.eosEnabled = config.eosEnabled;
} }
@ -129,6 +128,15 @@ public class StandbyTask extends AbstractTask implements Task {
} }
} }
@Override
public void assignThread() {
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}
@Override @Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) { public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new IllegalStateException("Standby task " + id + " should never be completing restoration"); throw new IllegalStateException("Standby task " + id + " should never be completing restoration");

View File

@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
@ -278,6 +279,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
} }
} }
@Override
public void assignThread() {
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) { public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
mainConsumer.pause(partitionsForOffsetReset); mainConsumer.pause(partitionsForOffsetReset);
resetOffsetsForPartitions.addAll(partitionsForOffsetReset); resetOffsetsForPartitions.addAll(partitionsForOffsetReset);

View File

@ -110,6 +110,8 @@ public interface Task {
*/ */
void initializeIfNeeded(); void initializeIfNeeded();
void assignThread();
default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) { default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -343,6 +343,8 @@ public class TaskManager {
final TaskId taskId = entry.getKey(); final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId); final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) { if (task != null) {
task.assignThread();
// replace our dummy values with the real ones, now we know our thread and assignment // replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue(); final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
@ -928,6 +930,7 @@ public class TaskManager {
for (final Task task : tasks.allTasks()) { for (final Task task : tasks.allTasks()) {
try { try {
task.initializeIfNeeded(); task.initializeIfNeeded();
task.assignThread();
task.clearTaskTimeout(); task.clearTaskTimeout();
} catch (final LockException lockException) { } catch (final LockException lockException) {
// it is possible that if there are multiple threads within the instance that one thread // it is possible that if there are multiple threads within the instance that one thread
@ -1082,6 +1085,7 @@ public class TaskManager {
try { try {
if (canTryInitializeTask(task.id(), nowMs)) { if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded(); task.initializeIfNeeded();
task.assignThread();
taskIdToBackoffRecord.remove(task.id()); taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task); stateUpdater.add(task);
} else { } else {

View File

@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
@ -243,16 +242,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
public void init(final StateStoreContext stateStoreContext, final StateStore root) { public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext); this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint); this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
@ -276,6 +265,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
); );
} }
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override @Override
public void flush() { public void flush() {
segments.flush(); segments.flush();

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
@ -294,16 +293,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
public void init(final StateStoreContext stateStoreContext, final StateStore root) { public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext); this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile); this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint); this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
@ -325,6 +314,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
false); false);
} }
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override @Override
public void flush() { public void flush() {
segments.flush(); segments.flush();

View File

@ -104,6 +104,11 @@ public class CachingKeyValueStore
} }
}); });
super.init(stateStoreContext, root); super.init(stateStoreContext, root);
}
@Override
public void assignThread() {
super.assignThread();
// save the stream thread as we only ever want to trigger a flush // save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread. // when the stream thread is the current thread.
streamThread = Thread.currentThread(); streamThread = Thread.currentThread();

View File

@ -25,10 +25,10 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper; import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.PositionBound;
@ -78,6 +78,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
private StateStoreContext stateStoreContext; private StateStoreContext stateStoreContext;
private final Position position; private final Position position;
private TaskId taskId;
InMemorySessionStore(final String name, InMemorySessionStore(final String name,
final long retentionPeriod, final long retentionPeriod,
@ -97,22 +98,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public void init(final StateStoreContext stateStoreContext, public void init(final StateStoreContext stateStoreContext,
final StateStore root) { final StateStore root) {
this.stateStoreContext = stateStoreContext; this.stateStoreContext = stateStoreContext;
final String threadId = Thread.currentThread().getName(); taskId = stateStoreContext.taskId();
final String taskName = stateStoreContext.taskId().toString();
// The provided context is not required to implement InternalProcessorContext, // The provided context is not required to implement InternalProcessorContext,
// If it doesn't, we can't record this metric. // If it doesn't, we can't record this metric.
if (stateStoreContext instanceof InternalProcessorContext) { if (stateStoreContext instanceof InternalProcessorContext) {
this.context = (InternalProcessorContext<?, ?>) stateStoreContext; this.context = (InternalProcessorContext<?, ?>) stateStoreContext;
final StreamsMetricsImpl metrics = this.context.metrics();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
} else { } else {
this.context = null; this.context = null;
expiredRecordSensor = null;
} }
if (root != null) { if (root != null) {
@ -140,6 +133,19 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
open = true; open = true;
} }
@Override
public void assignThread() {
if (context != null) {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
taskId.toString(),
this.context.metrics()
);
} else {
expiredRecordSensor = null;
}
}
@Override @Override
public Position getPosition() { public Position getPosition() {
return position; return position;

View File

@ -202,6 +202,14 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
taskId = context.taskId().toString(); taskId = context.taskId().toString();
streamsMetrics = context.metrics(); streamsMetrics = context.metrics();
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
}
@Override
public void assignThread() {
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
taskId, taskId,
METRIC_SCOPE, METRIC_SCOPE,
@ -214,11 +222,6 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
storeName, storeName,
streamsMetrics streamsMetrics
); );
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
} }
@Override @Override

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.PositionBound;
@ -104,15 +103,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final StateStore root) { final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext); this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
if (root != null) { if (root != null) {
final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean( final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(), stateStoreContext.appConfigs(),
@ -142,6 +132,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
open = true; open = true;
} }
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override @Override
public Position getPosition() { public Position getPosition() {
return position; return position;

View File

@ -65,6 +65,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override @Override
public void openExisting(final StateStoreContext context, final long streamTime) { public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
super.openExisting(context, streamTime); super.openExisting(context, streamTime);
} }
} }

View File

@ -126,6 +126,11 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
store.init(stateStoreContext, root); store.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
store.assignThread();
}
@Override @Override
public void flush() { public void flush() {
store.flush(); store.flush();

View File

@ -99,6 +99,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
store.init(stateStoreContext, root); store.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
store.assignThread();
}
@Override @Override
public void flush() { public void flush() {
store.flush(); store.flush();

View File

@ -141,6 +141,11 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
throw new UnsupportedOperationException("cannot initialize a logical segment"); throw new UnsupportedOperationException("cannot initialize a logical segment");
} }
@Override
public void assignThread() {
throw new UnsupportedOperationException("nothing to reassign");
}
@Override @Override
public void flush() { public void flush() {
throw new UnsupportedOperationException("nothing to flush for logical segment"); throw new UnsupportedOperationException("nothing to flush for logical segment");

View File

@ -105,6 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
@Override @Override
public void openExisting(final StateStoreContext context, final long streamTime) { public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
physicalStore.openDB(context.appConfigs(), context.stateDir()); physicalStore.openDB(context.appConfigs(), context.stateDir());
} }

View File

@ -128,13 +128,15 @@ public class MeteredKeyValueStore<K, V>
initStoreSerde(stateStoreContext); initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
super.init(stateStoreContext, root); super.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
registerMetrics();
super.assignThread();
}
private void registerMetrics() { private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics); putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics); putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@ -150,6 +152,7 @@ public class MeteredKeyValueStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.sum()); (config, now) -> openIterators.sum());
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics); openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
} }
@Override @Override

View File

@ -108,12 +108,15 @@ public class MeteredSessionStore<K, V>
initStoreSerde(stateStoreContext); initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics(); streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
super.init(stateStoreContext, root); super.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
registerMetrics();
super.assignThread();
}
private void registerMetrics() { private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics); putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics); fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@ -129,6 +132,8 @@ public class MeteredSessionStore<K, V>
return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null; return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
} }
); );
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
} }
@Override @Override

View File

@ -179,6 +179,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
false); false);
} }
@Override
public void assignThread() {
metricsRecorder.assignThread();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
void openDB(final Map<String, Object> configs, final File stateDir) { void openDB(final Map<String, Object> configs, final File stateDir) {
// initialize the default rocksdb options // initialize the default rocksdb options

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.PositionBound;
@ -352,16 +351,6 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
public void init(final StateStoreContext stateStoreContext, final StateStore root) { public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext); this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId());
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position"); final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
@ -386,6 +375,16 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
); );
} }
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
metricsRecorder.assignThread();
}
// VisibleForTesting // VisibleForTesting
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {

View File

@ -65,6 +65,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override @Override
public void openExisting(final StateStoreContext context, final long streamTime) { public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
super.openExisting(context, streamTime); super.openExisting(context, streamTime);
} }
} }

View File

@ -92,6 +92,11 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
inner.init(stateStoreContext, root); inner.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
inner.assignThread();
}
@Override @Override
public void flush() { public void flush() {
inner.flush(); inner.flush();

View File

@ -160,6 +160,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
store.init(stateStoreContext, root); store.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
store.assignThread();
}
@Override @Override
public void flush() { public void flush() {
store.flush(); store.flush();

View File

@ -63,6 +63,11 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
wrapped.init(stateStoreContext, root); wrapped.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
wrapped.assignThread();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener, public boolean setFlushListener(final CacheFlushListener<K, V> listener,

View File

@ -146,11 +146,14 @@ public class RocksDBMetricsRecorder {
+ "This is a bug in Kafka Streams. " + + "This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues"); "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
} }
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
}
public void assignThread() {
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName); final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
initSensors(streamsMetrics, metricContext); initSensors(streamsMetrics, metricContext);
initGauges(streamsMetrics, metricContext); initGauges(streamsMetrics, metricContext);
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
} }
public void addValueProviders(final String segmentName, public void addValueProviders(final String segmentName,

View File

@ -4898,6 +4898,10 @@ public class TaskManagerTest {
} }
} }
@Override
public void assignThread() {
}
@Override @Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) { public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
this.partitionsForOffsetReset = partitionsForOffsetReset; this.partitionsForOffsetReset = partitionsForOffsetReset;

View File

@ -1213,6 +1213,11 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root); inner.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
inner.assignThread();
}
@Override @Override
public void put(final K key, final V value) { public void put(final K key, final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP)); inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
@ -1277,6 +1282,11 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root); inner.init(stateStoreContext, root);
} }
@Override
public void assignThread() {
inner.assignThread();
}
@Override @Override
public void put(final K key, public void put(final K key,
final V value, final V value,