diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index c23c0fc9e49..c93297c97cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -34,14 +34,6 @@ public final class ProcessorContextUtils { private ProcessorContextUtils() {} - /** - * Note that KIP-622 would move currentSystemTimeMs to ProcessorContext, - * removing the need for this method. - */ - public static long currentSystemTime(final ProcessorContext context) { - return context.currentSystemTimeMs(); - } - /** * Should be removed as part of KAFKA-10217 */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index b446a52eb5f..441c17201b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -190,7 +190,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements Se final long segmentId = segments.segmentId(timestamp); final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment == null) { - expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context)); + expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); LOG.warn("Skipping record for expired segment."); } else { StoreQueryUtils.updatePosition(position, stateStoreContext); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 5122789e31a..2ddeadc3585 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -159,7 +159,7 @@ public class InMemoryWindowStore implements WindowStore { observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp); if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { - expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context)); + expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); LOG.warn("Skipping record for expired segment."); } else { if (value != null) {