mirror of https://github.com/apache/kafka.git
MINOR: remove unncessary helper method (#13209)
Reviewers: Christo Lolov (@clolov), Lucas Brutschy <lbrutschy@confluent.io>, Ismael Juma <ismale@confluent.io>
This commit is contained in:
parent
c2aaea3519
commit
463bb00b11
|
|
@ -34,14 +34,6 @@ public final class ProcessorContextUtils {
|
||||||
|
|
||||||
private ProcessorContextUtils() {}
|
private ProcessorContextUtils() {}
|
||||||
|
|
||||||
/**
|
|
||||||
* Note that KIP-622 would move currentSystemTimeMs to ProcessorContext,
|
|
||||||
* removing the need for this method.
|
|
||||||
*/
|
|
||||||
public static long currentSystemTime(final ProcessorContext context) {
|
|
||||||
return context.currentSystemTimeMs();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should be removed as part of KAFKA-10217
|
* Should be removed as part of KAFKA-10217
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -190,7 +190,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
||||||
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
|
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
|
||||||
|
|
||||||
if (segment == null) {
|
if (segment == null) {
|
||||||
expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
|
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||||
LOG.warn("Skipping record for expired segment.");
|
LOG.warn("Skipping record for expired segment.");
|
||||||
} else {
|
} else {
|
||||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||||
|
|
|
||||||
|
|
@ -262,7 +262,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
||||||
final long segmentId = segments.segmentId(timestamp);
|
final long segmentId = segments.segmentId(timestamp);
|
||||||
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
|
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
|
||||||
if (segment == null) {
|
if (segment == null) {
|
||||||
expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
|
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||||
LOG.warn("Skipping record for expired segment.");
|
LOG.warn("Skipping record for expired segment.");
|
||||||
} else {
|
} else {
|
||||||
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
StoreQueryUtils.updatePosition(position, stateStoreContext);
|
||||||
|
|
|
||||||
|
|
@ -159,7 +159,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
|
||||||
observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
|
observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
|
||||||
|
|
||||||
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
|
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
|
||||||
expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
|
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
|
||||||
LOG.warn("Skipping record for expired segment.");
|
LOG.warn("Skipping record for expired segment.");
|
||||||
} else {
|
} else {
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue