Renamed the method to better reflect it's purpose.

Moved thread assignment from init to assignThread in CachingKeyValueStore.
Moved restoreSensor creation to assignThread. it didn't work as is, the behavior will not be that much different.
This commit is contained in:
Nikita 2025-09-26 14:14:36 -07:00
parent 576d072707
commit 9566c6429e
No known key found for this signature in database
38 changed files with 60 additions and 57 deletions

View File

@ -365,7 +365,7 @@ public class IQv2IntegrationTest {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -821,7 +821,7 @@ public class VersionedKeyValueStoreIntegrationTest {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -71,7 +71,7 @@ public interface StateStore {
*/
void init(final StateStoreContext stateStoreContext, final StateStore root);
void initMetricsIfNeeded();
void assignThread();
/**
* Flush any cached data

View File

@ -49,7 +49,7 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

View File

@ -79,7 +79,7 @@ public class ReadOnlyTask implements Task {
}
@Override
public void initializeMetricsIfNeeded() {
public void assignThread() {
throw new UnsupportedOperationException("This task is read-only");
}

View File

@ -129,11 +129,11 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
public void initializeMetricsIfNeeded() {
public void assignThread() {
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
for (final StateStore stateStore : topology.stateStores()) {
stateStore.initMetricsIfNeeded();
stateStore.assignThread();
}
}

View File

@ -280,9 +280,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
public void initializeMetricsIfNeeded() {
public void assignThread() {
for (final StateStore stateStore : topology.stateStores()) {
stateStore.initMetricsIfNeeded();
stateStore.assignThread();
}
}

View File

@ -110,7 +110,7 @@ public interface Task {
*/
void initializeIfNeeded();
void initializeMetricsIfNeeded();
void assignThread();
default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException();

View File

@ -343,7 +343,7 @@ public class TaskManager {
final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) {
task.initializeMetricsIfNeeded();
task.assignThread();
// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
@ -930,7 +930,7 @@ public class TaskManager {
for (final Task task : tasks.allTasks()) {
try {
task.initializeIfNeeded();
task.initializeMetricsIfNeeded();
task.assignThread();
task.clearTaskTimeout();
} catch (final LockException lockException) {
// it is possible that if there are multiple threads within the instance that one thread
@ -1085,7 +1085,7 @@ public class TaskManager {
try {
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
task.initializeMetricsIfNeeded();
task.assignThread();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {

View File

@ -273,7 +273,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
}

View File

@ -322,7 +322,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
}

View File

@ -104,6 +104,11 @@ public class CachingKeyValueStore
}
});
super.init(stateStoreContext, root);
}
@Override
public void assignThread() {
super.assignThread();
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();

View File

@ -99,7 +99,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -135,7 +135,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
if (context != null) {
registerMetrics();
} else {

View File

@ -209,7 +209,7 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
}

View File

@ -140,7 +140,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
}

View File

@ -65,7 +65,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.initMetricsIfNeeded();
metricsRecorder.assignThread();
super.openExisting(context, streamTime);
}
}

View File

@ -127,8 +127,8 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
}
@Override
public void initMetricsIfNeeded() {
store.initMetricsIfNeeded();
public void assignThread() {
store.assignThread();
}
@Override

View File

@ -100,8 +100,8 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
}
@Override
public void initMetricsIfNeeded() {
store.initMetricsIfNeeded();
public void assignThread() {
store.assignThread();
}
@Override

View File

@ -142,7 +142,7 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
throw new UnsupportedOperationException("nothing to reassign");
}

View File

@ -105,7 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.initMetricsIfNeeded();
metricsRecorder.assignThread();
physicalStore.openDB(context.appConfigs(), context.stateDir());
}

View File

@ -112,7 +112,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -106,6 +106,7 @@ public class MeteredKeyValueStore<K, V>
(query, positionBound, config, store) -> runKeyQuery(query, positionBound, config)
)
);
private Sensor restoreSensor;
MeteredKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
@ -127,17 +128,13 @@ public class MeteredKeyValueStore<K, V>
initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
final Sensor restoreSensor =
StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time, restoreSensor);
super.init(stateStoreContext, root);
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
super.initMetricsIfNeeded();
super.assignThread();
}
private void registerMetrics() {
@ -155,6 +152,7 @@ public class MeteredKeyValueStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.sum());
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
}
protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {

View File

@ -180,8 +180,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
public void initMetricsIfNeeded() {
metricsRecorder.initMetricsIfNeeded();
public void assignThread() {
metricsRecorder.assignThread();
}
@SuppressWarnings("unchecked")

View File

@ -195,7 +195,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -383,9 +383,9 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
registerMetrics();
metricsRecorder.initMetricsIfNeeded();
metricsRecorder.assignThread();
}
private void registerMetrics() {

View File

@ -65,7 +65,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.initMetricsIfNeeded();
metricsRecorder.assignThread();
super.openExisting(context, streamTime);
}
}

View File

@ -93,8 +93,8 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
}
@Override
public void initMetricsIfNeeded() {
inner.initMetricsIfNeeded();
public void assignThread() {
inner.assignThread();
}
@Override

View File

@ -161,8 +161,8 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
}
@Override
public void initMetricsIfNeeded() {
store.initMetricsIfNeeded();
public void assignThread() {
store.assignThread();
}
@Override

View File

@ -64,8 +64,8 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
}
@Override
public void initMetricsIfNeeded() {
wrapped.initMetricsIfNeeded();
public void assignThread() {
wrapped.assignThread();
}
@SuppressWarnings("unchecked")

View File

@ -150,7 +150,7 @@ public class RocksDBMetricsRecorder {
this.streamsMetrics = streamsMetrics;
}
public void initMetricsIfNeeded() {
public void assignThread() {
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
initSensors(streamsMetrics, metricContext);
initGauges(streamsMetrics, metricContext);

View File

@ -4922,7 +4922,7 @@ public class TaskManagerTest {
}
@Override
public void initializeMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -59,7 +59,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
public void init(final StateStoreContext stateStoreContext, final StateStore root) {}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -381,7 +381,7 @@ public class ReadOnlyWindowStoreStub<K, V> implements ReadOnlyWindowStore<K, V>,
public void init(final StateStoreContext stateStoreContext, final StateStore root) {}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -65,7 +65,7 @@ public class MockKeyValueStore implements KeyValueStore<Object, Object> {
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -89,7 +89,7 @@ public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, Sta
}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -187,7 +187,7 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
public void init(StateStoreContext stateStoreContext, StateStore root) {}
@Override
public void initMetricsIfNeeded() {
public void assignThread() {
}
@Override

View File

@ -1214,8 +1214,8 @@ public class TopologyTestDriver implements Closeable {
}
@Override
public void initMetricsIfNeeded() {
inner.initMetricsIfNeeded();
public void assignThread() {
inner.assignThread();
}
@Override
@ -1283,8 +1283,8 @@ public class TopologyTestDriver implements Closeable {
}
@Override
public void initMetricsIfNeeded() {
inner.initMetricsIfNeeded();
public void assignThread() {
inner.assignThread();
}
@Override