Change to static ts for init processor context

This commit is contained in:
Eduwer Camacaro 2025-10-06 09:27:43 -05:00
parent 7a3c276c5d
commit aae4160f79
2 changed files with 5 additions and 6 deletions

View File

@ -18,23 +18,22 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Time;
public class InitProcessorRecordContext extends ProcessorRecordContext { public class InitProcessorRecordContext extends ProcessorRecordContext {
private final Time time; private final long initTime;
private static final long NO_OFFSET = -1; private static final long NO_OFFSET = -1;
private static final int NO_PARTITION = -1; private static final int NO_PARTITION = -1;
public InitProcessorRecordContext(final Time time) { public InitProcessorRecordContext(final long currentTimestamp) {
super(ConsumerRecord.NO_TIMESTAMP, NO_OFFSET, NO_PARTITION, null, new RecordHeaders()); super(ConsumerRecord.NO_TIMESTAMP, NO_OFFSET, NO_PARTITION, null, new RecordHeaders());
this.time = time; this.initTime = currentTimestamp;
} }
@Override @Override
public long timestamp() { public long timestamp() {
return time.milliseconds(); return initTime;
} }
@Override @Override

View File

@ -1104,7 +1104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// initialize the task by initializing all its processor nodes in the topology // initialize the task by initializing all its processor nodes in the topology
log.trace("Initializing processor nodes of the topology"); log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) { for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
final InitProcessorRecordContext initContext = new InitProcessorRecordContext(time); final InitProcessorRecordContext initContext = new InitProcessorRecordContext(time.milliseconds());
updateProcessorContext(node, time.milliseconds(), initContext); updateProcessorContext(node, time.milliseconds(), initContext);
try { try {
node.init(processorContext, processingExceptionHandler); node.init(processorContext, processingExceptionHandler);