KAFKA-13722: remove usage of old ProcessorContext (#18292)

We want to deprecate an remove the old ProcessorContext. Thus, we need
to refactor Kafka Streams runtime code, to not make calls into the old
ProcessorContext but only into new code path.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-24 10:31:31 -08:00 committed by GitHub
parent 70eab7778d
commit f13a22af0b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 119 additions and 96 deletions

View File

@ -213,7 +213,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements KTableRepartitionMapS
private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) { private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
return ValueAndTimestamp.make( return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)), mapper.apply(key, getValueOrNull(valueAndTimestamp)),
valueAndTimestamp == null ? context.timestamp() : valueAndTimestamp.timestamp() valueAndTimestamp == null ? context.recordContext().timestamp() : valueAndTimestamp.timestamp()
); );
} }
} }

View File

@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext<Object,
@Override @Override
public <KIn, VIn> void forward(final KIn key, final VIn value) { public <KIn, VIn> void forward(final KIn key, final VIn value) {
forward(new Record<>(key, value, timestamp(), headers())); forward(new Record<>(key, value, recordContext().timestamp(), headers()));
} }
/** /**

View File

@ -119,8 +119,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
final Record<Object, Object> toProcess = new Record<>( final Record<Object, Object> toProcess = new Record<>(
deserialized.key(), deserialized.key(),
deserialized.value(), deserialized.value(),
processorContext.timestamp(), processorContext.recordContext().timestamp(),
processorContext.headers() processorContext.recordContext().headers()
); );
((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess); ((SourceNode<Object, Object>) sourceNodeAndDeserializer.sourceNode()).process(toProcess);
} }

View File

@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>( final Record<K, V> toForward = new Record<>(
key, key,
value, value,
timestamp(), recordContext.timestamp(),
headers() headers()
); );
forward(toForward); forward(toForward);
@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>( final Record<K, V> toForward = new Record<>(
key, key,
value, value,
toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(), toInternal.hasTimestamp() ? toInternal.timestamp() : recordContext.timestamp(),
headers() headers()
); );
forward(toForward, toInternal.child()); forward(toForward, toInternal.child());
@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends AbstractProcessorContext<Object,
// old API processors wouldn't see the timestamps or headers of upstream // old API processors wouldn't see the timestamps or headers of upstream
// new API processors. But then again, from the perspective of those old-API // new API processors. But then again, from the perspective of those old-API
// processors, even consulting the timestamp or headers when the record context // processors, even consulting the timestamp or headers when the record context
// is undefined is itself not well defined. Plus, I don't think we need to worry // is undefined is itself not well-defined. Plus, I don't think we need to worry
// too much about heterogeneous applications, in which the upstream processor is // too much about heterogeneous applications, in which the upstream processor is
// implementing the new API and the downstream one is implementing the old API. // implementing the new API and the downstream one is implementing the old API.
// So, this seems like a fine compromise for now. // So, this seems like a fine compromise for now.
if (recordContext != null && (record.timestamp() != timestamp() || record.headers() != headers())) { if (recordContext != null && (record.timestamp() != recordContext.timestamp() || record.headers() != recordContext.headers())) {
recordContext = new ProcessorRecordContext( recordContext = new ProcessorRecordContext(
record.timestamp(), record.timestamp(),
recordContext.offset(), recordContext.offset(),

View File

@ -34,13 +34,6 @@ public final class ProcessorContextUtils {
private ProcessorContextUtils() {} private ProcessorContextUtils() {}
/**
* Should be removed as part of KAFKA-10217
*/
public static StreamsMetricsImpl metricsImpl(final ProcessorContext context) {
return (StreamsMetricsImpl) context.metrics();
}
/** /**
* Should be removed as part of KAFKA-10217 * Should be removed as part of KAFKA-10217
*/ */
@ -71,9 +64,10 @@ public final class ProcessorContextUtils {
} }
} }
public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { @SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final ProcessorContext context) {
if (context instanceof InternalProcessorContext) { if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext) context; return (InternalProcessorContext<K, V>) context;
} else { } else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests." "This component requires internal features of Kafka Streams and must be disabled for unit tests."

View File

@ -209,13 +209,14 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
// (instead of `RuntimeException`) to work well with those languages // (instead of `RuntimeException`) to work well with those languages
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.topic(), internalProcessorContext.recordContext().topic(),
internalProcessorContext.partition(), internalProcessorContext.recordContext().partition(),
internalProcessorContext.offset(), internalProcessorContext.recordContext().offset(),
internalProcessorContext.headers(), internalProcessorContext.recordContext().headers(),
internalProcessorContext.currentNode().name(), internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(), internalProcessorContext.taskId(),
internalProcessorContext.timestamp()); internalProcessorContext.recordContext().timestamp()
);
final ProcessingExceptionHandler.ProcessingHandlerResponse response; final ProcessingExceptionHandler.ProcessingHandlerResponse response;
try { try {

View File

@ -85,9 +85,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn, VIn, Void, Void> {
final ProcessorRecordContext contextForExtraction = final ProcessorRecordContext contextForExtraction =
new ProcessorRecordContext( new ProcessorRecordContext(
timestamp, timestamp,
context.offset(), context.recordContext().offset(),
context.partition(), context.recordContext().partition(),
context.topic(), context.recordContext().topic(),
record.headers() record.headers()
); );

View File

@ -861,8 +861,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
final Record<Object, Object> toProcess = new Record<>( final Record<Object, Object> toProcess = new Record<>(
record.key(), record.key(),
record.value(), record.value(),
processorContext.timestamp(), processorContext.recordContext().timestamp(),
processorContext.headers() processorContext.recordContext().headers()
); );
maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor); maybeMeasureLatency(() -> currNode.process(toProcess), time, processLatencySensor);

View File

@ -55,7 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final Optional<KeySchema> indexKeySchema; protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod; private final long retentionPeriod;
protected InternalProcessorContext internalProcessorContext; protected InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor; private Sensor expiredRecordSensor;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false; protected boolean consistencyEnabled = false;

View File

@ -272,12 +272,14 @@ public class CachingKeyValueStore
key, key,
new LRUCacheEntry( new LRUCacheEntry(
value, value,
internalContext.headers(), internalContext.recordContext().headers(),
true, true,
internalContext.offset(), internalContext.recordContext().offset(),
internalContext.timestamp(), internalContext.recordContext().timestamp(),
internalContext.partition(), internalContext.recordContext().partition(),
internalContext.topic())); internalContext.recordContext().topic()
)
);
StoreQueryUtils.updatePosition(position, internalContext); StoreQueryUtils.updatePosition(position, internalContext);
} }

View File

@ -135,12 +135,13 @@ class CachingSessionStore
final LRUCacheEntry entry = final LRUCacheEntry entry =
new LRUCacheEntry( new LRUCacheEntry(
value, value,
internalContext.headers(), internalContext.recordContext().headers(),
true, true,
internalContext.offset(), internalContext.recordContext().offset(),
internalContext.timestamp(), internalContext.recordContext().timestamp(),
internalContext.partition(), internalContext.recordContext().partition(),
internalContext.topic()); internalContext.recordContext().topic()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry); internalContext.cache().put(cacheName, cacheFunction.cacheKey(binaryKey), entry);
maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp); maxObservedTimestamp = Math.max(keySchema.segmentTimestamp(binaryKey), maxObservedTimestamp);

View File

@ -153,12 +153,13 @@ class CachingWindowStore
final LRUCacheEntry entry = final LRUCacheEntry entry =
new LRUCacheEntry( new LRUCacheEntry(
value, value,
internalContext.headers(), internalContext.recordContext().headers(),
true, true,
internalContext.offset(), internalContext.recordContext().offset(),
internalContext.timestamp(), internalContext.recordContext().timestamp(),
internalContext.partition(), internalContext.recordContext().partition(),
internalContext.topic()); internalContext.recordContext().topic()
);
internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry); internalContext.cache().put(cacheName, cacheFunction.cacheKey(keyBytes), entry);
maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp.get())); maxObservedTimestamp.set(Math.max(keySchema.segmentTimestamp(keyBytes), maxObservedTimestamp.get()));

View File

@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore
if (wrapped() instanceof MemoryLRUCache) { if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> { ((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal // pass null to indicate removal
log(key, null, internalContext.timestamp()); log(key, null, internalContext.recordContext().timestamp());
}); });
} }
} }
@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore
public void put(final Bytes key, public void put(final Bytes key,
final byte[] value) { final byte[] value) {
wrapped().put(key, value); wrapped().put(key, value);
log(key, value, internalContext.timestamp()); log(key, value, internalContext.recordContext().timestamp());
} }
@Override @Override
@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore
final byte[] previous = wrapped().putIfAbsent(key, value); final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) { if (previous == null) {
// then it was absent // then it was absent
log(key, value, internalContext.timestamp()); log(key, value, internalContext.recordContext().timestamp());
} }
return previous; return previous;
} }
@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries); wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) { for (final KeyValue<Bytes, byte[]> entry : entries) {
log(entry.key, entry.value, internalContext.timestamp()); log(entry.key, entry.value, internalContext.recordContext().timestamp());
} }
} }
@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore
@Override @Override
public byte[] delete(final Bytes key) { public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key); final byte[] oldValue = wrapped().delete(key);
log(key, null, internalContext.timestamp()); log(key, null, internalContext.recordContext().timestamp());
return oldValue; return oldValue;
} }

View File

@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends ChangeLoggingKeyValueBytes
// we need to log the full new list and thus call get() on the inner store below // we need to log the full new list and thus call get() on the inner store below
// if the value is a tombstone, we delete the whole list and thus can save the get call // if the value is a tombstone, we delete the whole list and thus can save the get call
if (value == null) { if (value == null) {
log(key, null, internalContext.timestamp()); log(key, null, internalContext.recordContext().timestamp());
} else { } else {
log(key, wrapped().get(key), internalContext.timestamp()); log(key, wrapped().get(key), internalContext.recordContext().timestamp());
} }
} }

View File

@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore
@Override @Override
public void remove(final Windowed<Bytes> sessionKey) { public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey); wrapped().remove(sessionKey);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(), wrapped().getPosition()); internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, internalContext.recordContext().timestamp(), wrapped().getPosition());
} }
@Override @Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate); wrapped().put(sessionKey, aggregate);
internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(), wrapped().getPosition()); internalContext.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.recordContext().timestamp(), wrapped().getPosition());
} }
@Override @Override

View File

@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
public void put(final Bytes key, public void put(final Bytes key,
final byte[] valueAndTimestamp) { final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp); wrapped().put(key, valueAndTimestamp);
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
} }
@Override @Override
@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp); final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) { if (previous == null) {
// then it was absent // then it was absent
log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
} }
return previous; return previous;
} }
@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore extends ChangeLoggingKey
wrapped().putAll(entries); wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) { for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value; final byte[] valueAndTimestamp = entry.value;
log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.timestamp() : timestamp(valueAndTimestamp)); log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp == null ? internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
} }
} }
} }

View File

@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends ChangeLoggingWindowBytesS
name(), name(),
key, key,
rawValue(valueAndTimestamp), rawValue(valueAndTimestamp),
valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.timestamp(), valueAndTimestamp != null ? timestamp(valueAndTimestamp) : internalContext.recordContext().timestamp(),
wrapped().getPosition() wrapped().getPosition()
); );
} }

View File

@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore
} }
void log(final Bytes key, final byte[] value) { void log(final Bytes key, final byte[] value) {
internalContext.logChange(name(), key, value, internalContext.timestamp(), wrapped().getPosition()); internalContext.logChange(name(), key, value, internalContext.recordContext().timestamp(), wrapped().getPosition());
} }
private int maybeUpdateSeqnumForDups() { private int maybeUpdateSeqnumForDups() {

View File

@ -430,7 +430,7 @@ public class MeteredKeyValueStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't record anything. // In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) { if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds(); final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp(); final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime); e2eLatencySensor.record(e2eLatency, currentTime);
} }
} }

View File

@ -497,7 +497,7 @@ public class MeteredSessionStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't record anything. // In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) { if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds(); final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp(); final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime); e2eLatencySensor.record(e2eLatency, currentTime);
} }
} }

View File

@ -513,7 +513,7 @@ public class MeteredWindowStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't record anything. // In that case, we _can't_ get the current timestamp, so we don't record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) { if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds(); final long currentTime = time.milliseconds();
final long e2eLatency = currentTime - internalContext.timestamp(); final long e2eLatency = currentTime - internalContext.recordContext().timestamp();
e2eLatencySensor.record(e2eLatency, currentTime); e2eLatencySensor.record(e2eLatency, currentTime);
} }
} }

View File

@ -97,12 +97,13 @@ class TimeOrderedCachingWindowStore
hasIndex = timeOrderedWindowStore.hasIndex(); hasIndex = timeOrderedWindowStore.hasIndex();
} }
@SuppressWarnings("unchecked")
private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) { private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore wrapped) {
if (wrapped instanceof RocksDBTimeOrderedWindowStore) { if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
return (RocksDBTimeOrderedWindowStore) wrapped; return (RocksDBTimeOrderedWindowStore) wrapped;
} }
if (wrapped instanceof WrappedStateStore) { if (wrapped instanceof WrappedStateStore) {
return getWrappedStore(((WrappedStateStore<?, ?, ?>) wrapped).wrapped()); return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) wrapped).wrapped());
} }
return null; return null;
} }
@ -255,12 +256,13 @@ class TimeOrderedCachingWindowStore
final LRUCacheEntry entry = final LRUCacheEntry entry =
new LRUCacheEntry( new LRUCacheEntry(
value, value,
internalContext.headers(), internalContext.recordContext().headers(),
true, true,
internalContext.offset(), internalContext.recordContext().offset(),
internalContext.timestamp(), internalContext.recordContext().timestamp(),
internalContext.partition(), internalContext.recordContext().partition(),
internalContext.topic()); internalContext.recordContext().topic()
);
// Put to index first so that base can be evicted later // Put to index first so that base can be evicted later
if (hasIndex) { if (hasIndex) {
@ -274,10 +276,11 @@ class TimeOrderedCachingWindowStore
new byte[0], new byte[0],
new RecordHeaders(), new RecordHeaders(),
true, true,
internalContext.offset(), internalContext.recordContext().offset(),
internalContext.timestamp(), internalContext.recordContext().timestamp(),
internalContext.partition(), internalContext.recordContext().partition(),
""); ""
);
final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0); final Bytes indexKey = KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry); internalContext.cache().put(cacheName, indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);
} else { } else {

View File

@ -58,7 +58,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class AbstractProcessorContextTest { public class AbstractProcessorContextTest {
private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics()); private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
private final AbstractProcessorContext context = new TestProcessorContext(metrics); private final AbstractProcessorContext<?, ?> context = new TestProcessorContext(metrics);
private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false); private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers); private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);

View File

@ -593,6 +593,8 @@ public class ProcessorContextImplTest {
@Test @Test
public void shouldThrowUnsupportedOperationExceptionOnForward() { public void shouldThrowUnsupportedOperationExceptionOnForward() {
context = getStandbyContext(); context = getStandbyContext();
context.recordContext = mock(ProcessorRecordContext.class);
assertThrows( assertThrows(
UnsupportedOperationException.class, UnsupportedOperationException.class,
() -> context.forward("key", "value") () -> context.forward("key", "value")
@ -602,6 +604,8 @@ public class ProcessorContextImplTest {
@Test @Test
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
context = getStandbyContext(); context = getStandbyContext();
context.recordContext = mock(ProcessorRecordContext.class);
assertThrows( assertThrows(
UnsupportedOperationException.class, UnsupportedOperationException.class,
() -> context.forward("key", "value", To.child("child-name")) () -> context.forward("key", "value", To.child("child-name"))

View File

@ -358,7 +358,7 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.offset(), context.offset()); assertEquals(internalProcessorContext.offset(), context.offset());
assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId()); assertEquals(internalProcessorContext.currentNode().name(), context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId()); assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(internalProcessorContext.timestamp(), context.timestamp()); assertEquals(internalProcessorContext.recordContext().timestamp(), context.timestamp());
assertEquals(KEY, record.key()); assertEquals(KEY, record.key());
assertEquals(VALUE, record.value()); assertEquals(VALUE, record.value());
assertInstanceOf(RuntimeException.class, exception); assertInstanceOf(RuntimeException.class, exception);

View File

@ -16,10 +16,12 @@
*/ */
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
@ -75,6 +77,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPuts() { public void shouldLogPuts() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1); final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(key1, value1); store.put(key1, value1);
@ -86,6 +89,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPutsWithPosition() { public void shouldLogPutsWithPosition() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1); final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(POSITION); when(inner.getPosition()).thenReturn(POSITION);
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(key1, value1); store.put(key1, value1);
@ -97,6 +101,7 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogRemoves() { public void shouldLogRemoves() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1); final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.remove(key1); store.remove(key1);
store.remove(key1); store.remove(key1);

View File

@ -17,9 +17,11 @@
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
@ -77,8 +79,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPuts() { public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(bytesKey, valueAndTimestamp, context.timestamp()); store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42, Position.emptyPosition()); verify(context).logChange(store.name(), key, value, 42, Position.emptyPosition());
@ -88,8 +91,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPutsWithPosition() { public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION); when(inner.getPosition()).thenReturn(POSITION);
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(bytesKey, valueAndTimestamp, context.timestamp()); store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0); verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42, POSITION); verify(context).logChange(store.name(), key, value, 42, POSITION);
@ -118,9 +122,10 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(bytesKey, valueAndTimestamp, context.timestamp()); store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp());
store.put(bytesKey, valueAndTimestamp, context.timestamp()); store.put(bytesKey, valueAndTimestamp, context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0); verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key1, value, 42L, Position.emptyPosition()); verify(context).logChange(store.name(), key1, value, 42L, Position.emptyPosition());

View File

@ -17,9 +17,11 @@
package org.apache.kafka.streams.state.internals; package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
@ -76,8 +78,9 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPuts() { public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(bytesKey, value, context.timestamp()); store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0); verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L, Position.emptyPosition()); verify(context).logChange(store.name(), key, value, 0L, Position.emptyPosition());
@ -87,8 +90,9 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPutsWithPosition() { public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0); final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION); when(inner.getPosition()).thenReturn(POSITION);
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
store.put(bytesKey, value, context.timestamp()); store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0); verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L, POSITION); verify(context).logChange(store.name(), key, value, 0L, POSITION);
@ -131,12 +135,13 @@ public class ChangeLoggingWindowBytesStoreTest {
store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary); store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary);
store.init(context, store); store.init(context, store);
when(inner.getPosition()).thenReturn(Position.emptyPosition()); when(inner.getPosition()).thenReturn(Position.emptyPosition());
when(context.recordContext()).thenReturn(new ProcessorRecordContext(0, 0, 0, "topic", new RecordHeaders()));
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1); final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2); final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
store.put(bytesKey, value, context.timestamp()); store.put(bytesKey, value, context.recordContext().timestamp());
store.put(bytesKey, value, context.timestamp()); store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, value, 0); verify(inner, times(2)).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key1, value, 0L, Position.emptyPosition()); verify(context).logChange(store.name(), key1, value, 0L, Position.emptyPosition());

View File

@ -23,7 +23,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@ -108,7 +108,7 @@ public class GlobalStateStoreProviderTest {
Serdes.String(), Serdes.String(),
Serdes.String()).build()); Serdes.String()).build());
final ProcessorContextImpl mockContext = mock(ProcessorContextImpl.class); final InternalProcessorContext<?, ?> mockContext = mock(InternalProcessorContext.class);
when(mockContext.applicationId()).thenReturn("appId"); when(mockContext.applicationId()).thenReturn("appId");
when(mockContext.metrics()) when(mockContext.metrics())
.thenReturn( .thenReturn(

View File

@ -223,10 +223,10 @@ public class MeteredWindowStoreTest {
@Test @Test
public void shouldPutToInnerStoreAndRecordPutMetrics() { public void shouldPutToInnerStoreAndRecordPutMetrics() {
final byte[] bytes = "a".getBytes(); final byte[] bytes = "a".getBytes();
doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.timestamp())); doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), eq(context.recordContext().timestamp()));
store.init(context, store); store.init(context, store);
store.put("a", "a", context.timestamp()); store.put("a", "a", context.recordContext().timestamp());
// it suffices to verify one put metric since all put metrics are recorded by the same sensor // it suffices to verify one put metric since all put metrics are recorded by the same sensor
// and the sensor is tested elsewhere // and the sensor is tested elsewhere

View File

@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
new byte[0], new byte[0],
new RecordHeaders(), new RecordHeaders(),
true, true,
context.offset(), context.recordContext().offset(),
context.timestamp(), context.recordContext().timestamp(),
context.partition(), context.recordContext().partition(),
"") "")
); );

View File

@ -941,10 +941,11 @@ public class TimeOrderedWindowStoreTest {
new byte[0], new byte[0],
new RecordHeaders(), new RecordHeaders(),
true, true,
context.offset(), context.recordContext().offset(),
context.timestamp(), context.recordContext().timestamp(),
context.partition(), context.recordContext().partition(),
"") ""
)
); );
underlyingStore.put(key, value, 1); underlyingStore.put(key, value, 1);

View File

@ -81,12 +81,12 @@ public class NoOpProcessorContext extends AbstractProcessorContext<Object, Objec
@Override @Override
public <K, V> void forward(final Record<K, V> record) { public <K, V> void forward(final Record<K, V> record) {
forward(record.key(), record.value()); forwardedValues.put(record.key(), record.value());
} }
@Override @Override
public <K, V> void forward(final Record<K, V> record, final String childName) { public <K, V> void forward(final Record<K, V> record, final String childName) {
forward(record.key(), record.value()); forwardedValues.put(record.key(), record.value());
} }
@Override @Override
@ -96,7 +96,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext<Object, Objec
@Override @Override
public <K, V> void forward(final K key, final V value, final To to) { public <K, V> void forward(final K key, final V value, final To to) {
forward(key, value); forwardedValues.put(key, value);
} }
@Override @Override

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
@ -61,6 +62,7 @@ import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollector;
@ -447,7 +449,6 @@ public class TopologyTestDriver implements Closeable {
streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
); );
globalStateTask.initialize(); globalStateTask.initialize();
globalProcessorContext.setRecordContext(null);
} else { } else {
globalStateManager = null; globalStateManager = null;
globalStateTask = null; globalStateTask = null;
@ -492,6 +493,7 @@ public class TopologyTestDriver implements Closeable {
streamsMetrics, streamsMetrics,
cache cache
); );
context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
task = new StreamTask( task = new StreamTask(
TASK_ID, TASK_ID,
@ -511,7 +513,6 @@ public class TopologyTestDriver implements Closeable {
); );
task.initializeIfNeeded(); task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); task.completeRestoration(noOpResetter -> { });
task.processorContext().setRecordContext(null);
for (final TopicPartition tp: task.inputPartitions()) { for (final TopicPartition tp: task.inputPartitions()) {
task.updateNextOffsets(tp, new OffsetAndMetadata(0, Optional.empty(), "")); task.updateNextOffsets(tp, new OffsetAndMetadata(0, Optional.empty(), ""));
} }