From f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 24 Jan 2025 10:31:31 -0800 Subject: [PATCH] KAFKA-13722: remove usage of old ProcessorContext (#18292) We want to deprecate an remove the old ProcessorContext. Thus, we need to refactor Kafka Streams runtime code, to not make calls into the old ProcessorContext but only into new code path. Reviewers: Bill Bejeck --- .../internals/KTableRepartitionMap.java | 2 +- .../internals/GlobalProcessorContextImpl.java | 2 +- .../internals/GlobalStateUpdateTask.java | 4 ++-- .../internals/ProcessorContextImpl.java | 8 +++---- .../internals/ProcessorContextUtils.java | 12 +++------- .../processor/internals/ProcessorNode.java | 11 +++++---- .../streams/processor/internals/SinkNode.java | 6 ++--- .../processor/internals/StreamTask.java | 4 ++-- ...tDualSchemaRocksDBSegmentedBytesStore.java | 2 +- .../state/internals/CachingKeyValueStore.java | 12 ++++++---- .../state/internals/CachingSessionStore.java | 11 +++++---- .../state/internals/CachingWindowStore.java | 11 +++++---- .../ChangeLoggingKeyValueBytesStore.java | 10 ++++---- .../ChangeLoggingListValueBytesStore.java | 4 ++-- .../ChangeLoggingSessionBytesStore.java | 4 ++-- ...eLoggingTimestampedKeyValueBytesStore.java | 6 ++--- ...ngeLoggingTimestampedWindowBytesStore.java | 2 +- .../ChangeLoggingWindowBytesStore.java | 2 +- .../state/internals/MeteredKeyValueStore.java | 2 +- .../state/internals/MeteredSessionStore.java | 2 +- .../state/internals/MeteredWindowStore.java | 2 +- .../TimeOrderedCachingWindowStore.java | 23 +++++++++++-------- .../AbstractProcessorContextTest.java | 2 +- .../internals/ProcessorContextImplTest.java | 4 ++++ .../internals/ProcessorNodeTest.java | 2 +- .../ChangeLoggingSessionBytesStoreTest.java | 5 ++++ ...oggingTimestampedWindowBytesStoreTest.java | 13 +++++++---- .../ChangeLoggingWindowBytesStoreTest.java | 13 +++++++---- .../GlobalStateStoreProviderTest.java | 4 ++-- .../internals/MeteredWindowStoreTest.java | 4 ++-- ...deredCachingPersistentWindowStoreTest.java | 6 ++--- .../internals/TimeOrderedWindowStoreTest.java | 9 ++++---- .../kafka/test/NoOpProcessorContext.java | 6 ++--- .../kafka/streams/TopologyTestDriver.java | 5 ++-- 34 files changed, 119 insertions(+), 96 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index f5cc449db7f..7c2361565da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -213,7 +213,7 @@ public class KTableRepartitionMap implements KTableRepartitionMapS private ValueAndTimestamp> mapValue(final K key, final ValueAndTimestamp valueAndTimestamp) { return ValueAndTimestamp.make( mapper.apply(key, getValueOrNull(valueAndTimestamp)), - valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() + valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp() ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 828ae3a0a79..01b694863fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext void forward(final KIn key, final VIn value) { - forward(new Record<>(key, value, timestamp(), headers())); + forward(new Record<>(key, value, recordContext().timestamp(), headers())); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 12a6beedbcd..f417ce76b5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -119,8 +119,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer { final Record toProcess = new Record<>( deserialized.key(), deserialized.value(), - processorContext.timestamp(), - processorContext.headers() + processorContext.recordContext().timestamp(), + processorContext.recordContext().headers() ); ((SourceNode) sourceNodeAndDeserializer.sourceNode()).process(toProcess); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 6a53afd07b3..8f739d0c056 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext toForward = new Record<>( key, value, - timestamp(), + recordContext.timestamp(), headers() ); forward(toForward); @@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext toForward = new Record<>( key, value, - toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), + toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(), headers() ); forward(toForward, toInternal.child()); @@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends AbstractProcessorContext InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { if (context instanceof InternalProcessorContext) { - return (InternalProcessorContext) context; + return (InternalProcessorContext) context; } else { throw new IllegalArgumentException( "This component requires internal features of Kafka Streams and must be disabled for unit tests." diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 2bb58eb6b82..5d245ef5f30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -209,13 +209,14 @@ public class ProcessorNode { // (instead of `RuntimeException`) to work well with those languages final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler - internalProcessorContext.topic(), - internalProcessorContext.partition(), - internalProcessorContext.offset(), - internalProcessorContext.headers(), + internalProcessorContext.recordContext().topic(), + internalProcessorContext.recordContext().partition(), + internalProcessorContext.recordContext().offset(), + internalProcessorContext.recordContext().headers(), internalProcessorContext.currentNode().name(), internalProcessorContext.taskId(), - internalProcessorContext.timestamp()); + internalProcessorContext.recordContext().timestamp() + ); final ProcessingExceptionHandler.ProcessingHandlerResponse response; try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index e95e098c26c..c7dcf135eaa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -85,9 +85,9 @@ public class SinkNode extends ProcessorNode { final ProcessorRecordContext contextForExtraction = new ProcessorRecordContext( timestamp, - context.offset(), - context.partition(), - context.topic(), + context.recordContext().offset(), + context.recordContext().partition(), + context.recordContext().topic(), record.headers() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 5000522ed0d..b612223197e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -861,8 +861,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final Record toProcess = new Record<>( record.key(), record.value(), - processorContext.timestamp(), - processorContext.headers() + processorContext.recordContext().timestamp(), + processorContext.recordContext().headers() ); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index e55d8452fae..e05b6328ec8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -55,7 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore indexKeySchema; private final long retentionPeriod; - protected InternalProcessorContext internalProcessorContext; + protected InternalProcessorContext internalProcessorContext; private Sensor expiredRecordSensor; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; protected boolean consistencyEnabled = false; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index b5f05b5c475..f59271920f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -272,12 +272,14 @@ public class CachingKeyValueStore key, new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic())); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ) + ); StoreQueryUtils.updatePosition(position, internalContext); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c863050f94d..00dbaa5589b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -135,12 +135,13 @@ class CachingSessionStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry); maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index dff0ac70749..f138ff9202a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -153,12 +153,13 @@ class CachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry); maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp.get())); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 78bcbd83a0b..5405ad9a71c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore if (wrapped() instanceof MemoryLRUCache) { ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { // pass null to indicate removal - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); }); } } @@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore public void put(final Bytes key, final byte[] value) { wrapped().put(key, value); - log(key, value, internalContext.timestamp()); + log(key, value, internalContext.recordContext().timestamp()); } @Override @@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore final byte[] previous = wrapped().putIfAbsent(key, value); if (previous == null) { // then it was absent - log(key, value, internalContext.timestamp()); + log(key, value, internalContext.recordContext().timestamp()); } return previous; } @@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore public void putAll(final List> entries) { wrapped().putAll(entries); for (final KeyValue entry : entries) { - log(entry.key, entry.value, internalContext.timestamp()); + log(entry.key, entry.value, internalContext.recordContext().timestamp()); } } @@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore @Override public byte[] delete(final Bytes key) { final byte[] oldValue = wrapped().delete(key); - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); return oldValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java index 9070fc8da5f..ba43ba30b17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java @@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes // we need to log the full new list and thus call get() on the inner store below // if the value is a tombstone, we delete the whole list and thus can save the get call if (value == null) { - log(key, null, internalContext.timestamp()); + log(key, null, internalContext.recordContext().timestamp()); } else { - log(key, wrapped().get(key), internalContext.timestamp()); + log(key, wrapped().get(key), internalContext.recordContext().timestamp()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 06097aa7a71..248889211c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore @Override public void remove(final Windowed sessionKey) { wrapped().remove(sessionKey); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition()); } @Override public void put(final Windowed sessionKey, final byte[] aggregate) { wrapped().put(sessionKey, aggregate); - internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java index 916c9547184..b95ede1bba8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java @@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey public void put(final Bytes key, final byte[] valueAndTimestamp) { wrapped().put(key, valueAndTimestamp); - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } @Override @@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp); if (previous == null) { // then it was absent - log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } return previous; } @@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey wrapped().putAll(entries); for (final KeyValue entry : entries) { final byte[] valueAndTimestamp = entry.value; - log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); + log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp)); } } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java index 2bf87f9d2a8..5ae334f95cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java @@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS name(), key, rawValue(valueAndTimestamp), - valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.timestamp(), + valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(), wrapped().getPosition() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index d5857d0456e..0d0f378af75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore } void log(final Bytes key, final byte[] value) { - internalContext.logChange(name(), key, value, internalContext.timestamp(), wrapped().getPosition()); + internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition()); } private int maybeUpdateSeqnumForDups() { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 991b9b365d7..b22da77441c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -430,7 +430,7 @@ public class MeteredKeyValueStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7fb7276bcfc..20a32bbb1f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -497,7 +497,7 @@ public class MeteredSessionStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index ed3d31e86d0..923728fc409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -513,7 +513,7 @@ public class MeteredWindowStore // In that case, we _can't_ get the current timestamp, so we don't record anything. if (e2eLatencySensor.shouldRecord() && internalContext != null) { final long currentTime = time.milliseconds(); - final long e2eLatency = currentTime - internalContext.timestamp(); + final long e2eLatency = currentTime - internalContext.recordContext().timestamp(); e2eLatencySensor.record(e2eLatency, currentTime); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 47cbfde4c40..7f443c3e32c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -97,12 +97,13 @@ class TimeOrderedCachingWindowStore hasIndex = timeOrderedWindowStore.hasIndex(); } + @SuppressWarnings("unchecked") private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) { if (wrapped instanceof RocksDBTimeOrderedWindowStore) { return (RocksDBTimeOrderedWindowStore) wrapped; } if (wrapped instanceof WrappedStateStore) { - return getWrappedStore(((WrappedStateStore) wrapped).wrapped()); + return getWrappedStore(((WrappedStateStore) wrapped).wrapped()); } return null; } @@ -255,12 +256,13 @@ class TimeOrderedCachingWindowStore final LRUCacheEntry entry = new LRUCacheEntry( value, - internalContext.headers(), + internalContext.recordContext().headers(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - internalContext.topic()); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + internalContext.recordContext().topic() + ); // Put to index first so that base can be evicted later if (hasIndex) { @@ -274,10 +276,11 @@ class TimeOrderedCachingWindowStore new byte[0], new RecordHeaders(), true, - internalContext.offset(), - internalContext.timestamp(), - internalContext.partition(), - ""); + internalContext.recordContext().offset(), + internalContext.recordContext().timestamp(), + internalContext.recordContext().partition(), + "" + ); final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0); internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry); } else { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index b4d5c994265..26b8d36f3b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -58,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail; public class AbstractProcessorContextTest { private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics()); - private final AbstractProcessorContext context = new TestProcessorContext(metrics); + private final AbstractProcessorContext context = new TestProcessorContext(metrics); private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false); private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 42c466c2120..9410ca5a978 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -593,6 +593,8 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForward() { context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value") @@ -602,6 +604,8 @@ public class ProcessorContextImplTest { @Test public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { context = getStandbyContext(); + context.recordContext = mock(ProcessorRecordContext.class); + assertThrows( UnsupportedOperationException.class, () -> context.forward("key", "value", To.child("child-name")) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index a16315d363b..5b4303a1695 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -358,7 +358,7 @@ public class ProcessorNodeTest { assertEquals(internalProcessorContext.offset(), context.offset()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.taskId(), context.taskId()); - assertEquals(internalProcessorContext.timestamp(), context.timestamp()); + assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp()); assertEquals(KEY, record.key()); assertEquals(VALUE, record.value()); assertInstanceOf(RuntimeException.class, exception); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index d3243ef2fc6..9a23e657600 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; @@ -75,6 +77,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPuts() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -86,6 +89,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.put(key1, value1); @@ -97,6 +101,7 @@ public class ChangeLoggingSessionBytesStoreTest { public void shouldLogRemoves() { final Bytes binaryKey = SessionKeySchema.toBinary(key1); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); store.remove(key1); store.remove(key1); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index 03701bdcb00..1c1b713ce21 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -77,8 +79,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, Position.emptyPosition()); @@ -88,8 +91,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key, value, 42, POSITION); @@ -118,9 +122,10 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); - store.put(bytesKey, valueAndTimestamp, context.timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); + store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp()); verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0); verify(context).logChange(store.name(), key1, value, 42L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index 2607e56ad9f..e80a2325a2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -17,9 +17,11 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.WindowStore; @@ -76,8 +78,9 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPuts() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, Position.emptyPosition()); @@ -87,8 +90,9 @@ public class ChangeLoggingWindowBytesStoreTest { public void shouldLogPutsWithPosition() { final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); when(inner.getPosition()).thenReturn(POSITION); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner).put(bytesKey, value, 0); verify(context).logChange(store.name(), key, value, 0L, POSITION); @@ -131,12 +135,13 @@ public class ChangeLoggingWindowBytesStoreTest { store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary); store.init(context, store); when(inner.getPosition()).thenReturn(Position.emptyPosition()); + when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders())); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); - store.put(bytesKey, value, context.timestamp()); - store.put(bytesKey, value, context.timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); + store.put(bytesKey, value, context.recordContext().timestamp()); verify(inner, times(2)).put(bytesKey, value, 0); verify(context).logChange(store.name(), key1, value, 0L, Position.emptyPosition()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java index 8c28a9eabec..4239e3e5000 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -108,7 +108,7 @@ public class GlobalStateStoreProviderTest { Serdes.String(), Serdes.String()).build()); - final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class); + final InternalProcessorContext mockContext = mock(InternalProcessorContext.class); when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.metrics()) .thenReturn( diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index ba557104ebd..1c8935d1e1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -223,10 +223,10 @@ public class MeteredWindowStoreTest { @Test public void shouldPutToInnerStoreAndRecordPutMetrics() { final byte[] bytes = "a".getBytes(); - doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp())); + doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.recordContext().timestamp())); store.init(context, store); - store.put("a", "a", context.timestamp()); + store.put("a", "a", context.recordContext().timestamp()); // it suffices to verify one put metric since all put metrics are recorded by the same sensor // and the sensor is tested elsewhere diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index f2f1d513704..7aba2434457 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.offset(), - context.timestamp(), - context.partition(), + context.recordContext().offset(), + context.recordContext().timestamp(), + context.recordContext().partition(), "") ); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index a82ca8e7300..9eb9ec21b5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -941,10 +941,11 @@ public class TimeOrderedWindowStoreTest { new byte[0], new RecordHeaders(), true, - context.offset(), - context.timestamp(), - context.partition(), - "") + context.recordContext().offset(), + context.recordContext().timestamp(), + context.recordContext().partition(), + "" + ) ); underlyingStore.put(key, value, 1); diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 47ebe4bdb44..734f744f20f 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -81,12 +81,12 @@ public class NoOpProcessorContext extends AbstractProcessorContext void forward(final Record record) { - forward(record.key(), record.value()); + forwardedValues.put(record.key(), record.value()); } @Override public void forward(final Record record, final String childName) { - forward(record.key(), record.value()); + forwardedValues.put(record.key(), record.value()); } @Override @@ -96,7 +96,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext void forward(final K key, final V value, final To to) { - forward(key, value); + forwardedValues.put(key, value); } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 2fc8400239d..a4cee67ad5f 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -61,6 +62,7 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -447,7 +449,6 @@ public class TopologyTestDriver implements Closeable { streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) ); globalStateTask.initialize(); - globalProcessorContext.setRecordContext(null); } else { globalStateManager = null; globalStateTask = null; @@ -492,6 +493,7 @@ public class TopologyTestDriver implements Closeable { streamsMetrics, cache ); + context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders())); task = new StreamTask( TASK_ID, @@ -511,7 +513,6 @@ public class TopologyTestDriver implements Closeable { ); task.initializeIfNeeded(); task.completeRestoration(noOpResetter -> { }); - task.processorContext().setRecordContext(null); for (final TopicPartition tp: task.inputPartitions()) { task.updateNextOffsets(tp, new OffsetAndMetadata(0, Optional.empty(), "")); }