diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index f5cc449db7f..7c2361565da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -213,7 +213,7 @@ public class KTableRepartitionMap implements KTableRepartitionMapS private ValueAndTimestamp> mapValue(final K key, final ValueAndTimestamp valueAndTimestamp) { return ValueAndTimestamp.make( mapper.apply(key, getValueOrNull(valueAndTimestamp)), - valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() + valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp() ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 828ae3a0a79..01b694863fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext void forward(final KIn key, final VIn value) { - forward(new Record<>(key, value, timestamp(), headers())); + forward(new Record<>(key, value, recordContext().timestamp(), headers())); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 12a6beedbcd..f417ce76b5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -119,8 +119,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { final Record toProcess = new Record<>( deserialized.key(), deserialized.value(), - processorContext.timestamp(), - processorContext.headers() + processorContext.recordContext().timestamp(), + processorContext.recordContext().headers() ); ((SourceNode) sourceNodeAndDeserializer.sourceNode()).process(toProcess); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a53afd07b3..8f739d0c056 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext toForward = new Record<>( key, value, - timestamp(), + recordContext.timestamp(), headers() ); forward(toForward); @@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext toForward = new Record<>( key, value, - toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), + toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(), headers() ); forward(toForward, toInternal.child()); @@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends AbstractProcessorContext InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { if (context instanceof InternalProcessorContext) { - return (InternalProcessorContext) context; + return (InternalProcessorContext) context; } else { throw new IllegalArgumentException( "This component requires internal features of Kafka Streams and must be disabled for unit tests." diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 2bb58eb6b82..5d245ef5f30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,13 +209,14 @@ public class ProcessorNode { // (instead of `RuntimeException`) to work well with those languages final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler - internalProcessorContext.topic(), - internalProcessorContext.partition(), - internalProcessorContext.offset(), - internalProcessorContext.headers(), + internalProcessorContext.recordContext().topic(), + internalProcessorContext.recordContext().partition(), + internalProcessorContext.recordContext().offset(), + internalProcessorContext.recordContext().headers(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId(), - internalProcessorContext.timestamp()); + internalProcessorContext.recordContext().timestamp() + ); final ProcessingExceptionHandler.ProcessingHandlerResponse response; try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index e95e098c26c..c7dcf135eaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -85,9 +85,9 @@ public class SinkNode extends ProcessorNode { final ProcessorRecordContext contextForExtraction = new ProcessorRecordContext( timestamp, - context.offset(), - context.partition(), - context.topic(), + context.recordContext().offset(), + context.recordContext().partition(), + context.recordContext().topic(), record.headers() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 5000522ed0d..b612223197e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -861,8 +861,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final Record toProcess = new Record<>( record.key(), record.value(), - processorContext.timestamp(), - processorContext.headers() + processorContext.recordContext().timestamp(), + processorContext.recordContext().headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index e55d8452fae..e05b6328ec8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -55,7 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore indexKeySchema; private final long retentionPeriod; - protected InternalProcessorContext internalProcessorContext; + protected InternalProcessorContext internalProcessorContext; private Sensor expiredRecordSensor; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; protected boolean consistencyEnabled = false; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index b5f05b5c475..f59271920f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -272,12 +272,14 @@ public class CachingKeyValueStore key, new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic())); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ) + ); StoreQueryUtils.updatePosition(position, internalContext); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c863050f94d..00dbaa5589b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -135,12 +135,13 @@ class CachingSessionStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry); maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index dff0ac70749..f138ff9202a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -153,12 +153,13 @@ class CachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry); maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp.get())); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 78bcbd83a0b..5405ad9a71c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore if (wrapped() instanceof MemoryLRUCache) { ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { // pass null to indicate removal - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); }); } } @@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore public void put(final Bytes key, final byte[] value) { wrapped().put(key, value); - log(key, value, internalContext.timestamp()); + log(key, value, internalContext.recordContext().timestamp()); } @Override @@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore final byte[] previous = wrapped().putIfAbsent(key, value); if (previous == null) { // then it was absent - log(key, value, internalContext.timestamp()); + log(key, value, internalContext.recordContext().timestamp()); } return previous; } @@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore public void putAll(final List> entries) { wrapped().putAll(entries); for (final KeyValue entry : entries) { - log(entry.key, entry.value, internalContext.timestamp()); + log(entry.key, entry.value, internalContext.recordContext().timestamp()); } } @@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore @Override public byte[] delete(final Bytes key) { final byte[] oldValue = wrapped().delete(key); - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); return oldValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java index 9070fc8da5f..ba43ba30b17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java @@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes // we need to log the full new list and thus call get() on the inner store below // if the value is a tombstone, we delete the whole list and thus can save the get call if (value == null) { - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); } else { - log(key, wrapped().get(key), internalContext.timestamp()); + log(key, wrapped().get(key), internalContext.recordContext().timestamp()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 06097aa7a71..248889211c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore @Override public void remove(final Windowed sessionKey) { wrapped().remove(sessionKey); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition()); } @Override public void put(final Windowed sessionKey, final byte[] aggregate) { wrapped().put(sessionKey, aggregate); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java index 916c9547184..b95ede1bba8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java @@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey public void put(final Bytes key, final byte[] valueAndTimestamp) { wrapped().put(key, valueAndTimestamp); - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } @Override @@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp); if (previous == null) { // then it was absent - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } return previous; } @@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey wrapped().putAll(entries); for (final KeyValue entry : entries) { final byte[] valueAndTimestamp = entry.value; - log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java index 2bf87f9d2a8..5ae334f95cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java @@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS name(), key, rawValue(valueAndTimestamp), - valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.timestamp(), + valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(), wrapped().getPosition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index d5857d0456e..0d0f378af75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore } void log(final Bytes key, final byte[] value) { - internalContext.logChange(name(), key, value, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition()); } private int maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 991b9b365d7..b22da77441c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -430,7 +430,7 @@ public class MeteredKeyValueStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7fb7276bcfc..20a32bbb1f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -497,7 +497,7 @@ public class MeteredSessionStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index ed3d31e86d0..923728fc409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -513,7 +513,7 @@ public class MeteredWindowStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 47cbfde4c40..7f443c3e32c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -97,12 +97,13 @@ class TimeOrderedCachingWindowStore hasIndex = timeOrderedWindowStore.hasIndex(); } + @SuppressWarnings("unchecked") private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) { if (wrapped instanceof RocksDBTimeOrderedWindowStore) { return (RocksDBTimeOrderedWindowStore) wrapped; } if (wrapped instanceof WrappedStateStore) { - return getWrappedStore(((WrappedStateStore) wrapped).wrapped()); + return getWrappedStore(((WrappedStateStore) wrapped).wrapped()); } return null; } @@ -255,12 +256,13 @@ class TimeOrderedCachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); // Put to index first so that base can be evicted later if (hasIndex) { @@ -274,10 +276,11 @@ class TimeOrderedCachingWindowStore new byte[0], new RecordHeaders(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - ""); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + "" + ); final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0); internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index b4d5c994265..26b8d36f3b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -58,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class AbstractProcessorContextTest { private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics()); - private final AbstractProcessorContext context = new TestProcessorContext(metrics); + private final AbstractProcessorContext context = new TestProcessorContext(metrics); private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false); private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 42c466c2120..9410ca5a978 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -593,6 +593,8 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForward() { context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value") @@ -602,6 +604,8 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value", To.child("child-name")) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index a16315d363b..5b4303a1695 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -358,7 +358,7 @@ public class ProcessorNodeTest { assertEquals(internalProcessorContext.offset(), context.offset()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); - assertEquals(internalProcessorContext.timestamp(), context.timestamp()); + assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp()); assertEquals(KEY, record.key()); assertEquals(VALUE, record.value()); assertInstanceOf(RuntimeException.class, exception); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index d3243ef2fc6..9a23e657600 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -75,6 +77,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPuts() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -86,6 +89,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -97,6 +101,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogRemoves() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.remove(key1); store.remove(key1); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index 03701bdcb00..1c1b713ce21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -77,8 +79,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, Position.emptyPosition()); @@ -88,8 +91,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, POSITION); @@ -118,9 +122,10 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key1, value, 42L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index 2607e56ad9f..e80a2325a2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -76,8 +78,9 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, Position.emptyPosition()); @@ -87,8 +90,9 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, POSITION); @@ -131,12 +135,13 @@ public class ChangeLoggingWindowBytesStoreTest { store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary); store.init(context, store); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); - store.put(bytesKey, value, context.timestamp()); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner, times(2)).put(bytesKey, value, 0); verify(context).logChange(store.name(), key1, value, 0L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 8c28a9eabec..4239e3e5000 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -108,7 +108,7 @@ public class GlobalStateStoreProviderTest { Serdes.String(), Serdes.String()).build()); - final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class); + final InternalProcessorContext mockContext = mock(InternalProcessorContext.class); when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index ba557104ebd..1c8935d1e1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -223,10 +223,10 @@ public class MeteredWindowStoreTest { @Test public void shouldPutToInnerStoreAndRecordPutMetrics() { final byte[] bytes = "a".getBytes(); - doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp())); + doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.recordContext().timestamp())); store.init(context, store); - store.put("a", "a", context.timestamp()); + store.put("a", "a", context.recordContext().timestamp()); // it suffices to verify one put metric since all put metrics are recorded by the same sensor // and the sensor is tested elsewhere diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index f2f1d513704..7aba2434457 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.offset(), - context.timestamp(), - context.partition(), + context.recordContext().offset(), + context.recordContext().timestamp(), + context.recordContext().partition(), "") ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index a82ca8e7300..9eb9ec21b5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -941,10 +941,11 @@ public class TimeOrderedWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.offset(), - context.timestamp(), - context.partition(), - "") + context.recordContext().offset(), + context.recordContext().timestamp(), + context.recordContext().partition(), + "" + ) ); underlyingStore.put(key, value, 1); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 47ebe4bdb44..734f744f20f 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -81,12 +81,12 @@ public class NoOpProcessorContext extends AbstractProcessorContext void forward(final Record record) { - forward(record.key(), record.value()); + forwardedValues.put(record.key(), record.value()); } @Override public void forward(final Record record, final String childName) { - forward(record.key(), record.value()); + forwardedValues.put(record.key(), record.value()); } @Override @@ -96,7 +96,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext void forward(final K key, final V value, final To to) { - forward(key, value); + forwardedValues.put(key, value); } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2fc8400239d..a4cee67ad5f 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -61,6 +62,7 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -447,7 +449,6 @@ public class TopologyTestDriver implements Closeable { streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ); globalStateTask.initialize(); - globalProcessorContext.setRecordContext(null); } else { globalStateManager = null; globalStateTask = null; @@ -492,6 +493,7 @@ public class TopologyTestDriver implements Closeable { streamsMetrics, cache ); + context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); task = new StreamTask( TASK_ID, @@ -511,7 +513,6 @@ public class TopologyTestDriver implements Closeable { ); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.processorContext().setRecordContext(null); for (final TopicPartition tp: task.inputPartitions()) { task.updateNextOffsets(tp, new OffsetAndMetadata(0, Optional.empty(), "")); }