KAFKA-13722: remove internal usage of old ProcessorContext (#18698)

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-29 11:13:57 -08:00 committed by GitHub
parent 20b073bbee
commit 1123a76110
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 92 additions and 76 deletions

View File

@ -1054,7 +1054,7 @@ public interface KTable<K, V> {
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>You can retrieve all generated internal topic names via {@link Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update

View File

@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext internalProcessorContext;
private InternalProcessorContext<K1, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;
@ -78,7 +78,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
valueGetter.init(context);
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
internalProcessorContext = asInternalProcessorContext(context);
if (useBuffer) {
if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
@ -90,7 +90,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
@Override
public void process(final Record<K1, V1> record) {
internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
return;
@ -123,7 +122,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
@SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);

View File

@ -41,7 +41,11 @@ import java.util.Map;
* We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface
* deprecated afterward.
*/
@SuppressWarnings("deprecation")
@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603.
/*
* When we deprecate `ProcessorContext` can also deprecate `To` class,
* as it is only used in the `ProcessorContext#forward` method.
*/
public interface ProcessorContext {
/**

View File

@ -21,7 +21,8 @@ import org.apache.kafka.streams.processor.api.Record;
import java.time.Duration;
/**
* A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
* A functional interface used as an argument to
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
*
* @see Cancellable
*/
@ -30,14 +31,16 @@ public interface Punctuator {
/**
* Perform the scheduled periodic operation.
*
* <p> If this method accesses {@link ProcessorContext} or
* <p> If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic,
* partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't
* be available.
*
* <p> Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)}
* <p> Furthermore, for any record that is sent downstream via
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}
* or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
* won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used,
* won't be any record metadata. If
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used,
* it's also not possible to set records headers.
*
* @param timestamp when the operation is being called, depending on {@link PunctuationType}

View File

@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
@ -64,8 +64,9 @@ public final class ProcessorContextUtils {
}
}
@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final ProcessorContext context) {
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(
final ProcessorContext<K, V> context
) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
} else {
@ -75,10 +76,9 @@ public final class ProcessorContextUtils {
}
}
@SuppressWarnings("unchecked")
public static <K, V> InternalProcessorContext<K, V> asInternalProcessorContext(final StateStoreContext context) {
public static InternalProcessorContext<?, ?> asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
return (InternalProcessorContext<?, ?>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams and must be disabled for unit tests."

View File

@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;
private InternalProcessorContext internalProcessorContext;
private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
private volatile boolean open;
AbstractRocksDBSegmentedBytesStore(final String name,
final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;

View File

@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
super(name, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
}
}

View File

@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
super(name, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval));
}
}

View File

@ -233,7 +233,7 @@ public class KTableImplTest {
final ValueMapper<String, String> mapper = value -> value;
final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1;
final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier =
() -> new ValueTransformerWithKey<String, String, String>() {
() -> new ValueTransformerWithKey<>() {
@Override
public void init(final ProcessorContext context) {}
@ -247,103 +247,103 @@ public class KTableImplTest {
};
assertEquals(
((AbstractStream) table1.filter((key, value) -> false)).keySerde(),
((AbstractStream<String, String>) table1.filter((key, value) -> false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream) table1.filter((key, value) -> false)).valueSerde(),
((AbstractStream<String, String>) table1.filter((key, value) -> false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(),
((AbstractStream<String, String>) table1.filterNot((key, value) -> false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(),
((AbstractStream<String, String>) table1.filterNot((key, value) -> false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.mapValues(mapper)).keySerde(),
((AbstractStream<String, String>) table1.mapValues(mapper)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.mapValues(mapper)).valueSerde());
assertEquals(
((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.toStream()).keySerde(),
((AbstractStream<String, String>) table1.toStream()).keySerde(),
consumedInternal.keySerde());
assertEquals(
((AbstractStream) table1.toStream()).valueSerde(),
((AbstractStream<String, String>) table1.toStream()).valueSerde(),
consumedInternal.valueSerde());
assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
assertNull(((AbstractStream<String, String>) table1.toStream(selector)).keySerde());
assertEquals(
((AbstractStream) table1.toStream(selector)).valueSerde(),
((AbstractStream<String, String>) table1.toStream(selector)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
assertEquals(
((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(),
assertEquals(((AbstractStream<String, String>) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde());
assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.groupBy(KeyValue::new)).keySerde());
assertNull(((AbstractStream<String, String>) table1.groupBy(KeyValue::new)).valueSerde());
assertEquals(
((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.join(table1, joiner)).keySerde(),
((AbstractStream<String, String>) table1.join(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.join(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(),
((AbstractStream<String, String>) table1.leftJoin(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.leftJoin(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(),
((AbstractStream<String, String>) table1.outerJoin(table1, joiner)).keySerde(),
consumedInternal.keySerde());
assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde());
assertNull(((AbstractStream<String, String>) table1.outerJoin(table1, joiner)).valueSerde());
assertEquals(
((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
((AbstractStream<String, String>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
((AbstractStream<String, String>) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
}
@ -462,25 +462,25 @@ public class KTableImplTest {
assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003"))
final Field valSerializerField = ((SinkNode<?, ?>) driver.getProcessor("KSTREAM-SINK-0000000003"))
.getClass()
.getDeclaredField("valSerializer");
final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004"))
final Field valDeserializerField = ((SourceNode<?, ?>) driver.getProcessor("KSTREAM-SOURCE-0000000004"))
.getClass()
.getDeclaredField("valDeserializer");
valSerializerField.setAccessible(true);
valDeserializerField.setAccessible(true);
assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
assertNotNull(((ChangedSerializer<?>) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
assertNotNull(((ChangedDeserializer<?>) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
assertNotNull(((ChangedSerializer<?>) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
assertNotNull(((ChangedDeserializer<?>) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
}
}
@Test
public void shouldNotAllowNullSelectorOnToStream() {
assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null));
assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper<String, String, ?>) null));
}
@Test
@ -495,12 +495,12 @@ public class KTableImplTest {
@Test
public void shouldNotAllowNullMapperOnMapValues() {
assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null));
assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper<String, ?>) null));
}
@Test
public void shouldNotAllowNullMapperOnMapValueWithKey() {
assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null));
assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey<String, String, ?>) null));
}
@Test
@ -545,27 +545,42 @@ public class KTableImplTest {
@Test
public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
assertThrows(NullPointerException.class, () -> table.filter((key, value) -> false, (Materialized) null));
assertThrows(
NullPointerException.class,
() -> table.filter((key, value) -> false, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
);
}
@Test
public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
assertThrows(NullPointerException.class, () -> table.filterNot((key, value) -> false, (Materialized) null));
assertThrows(
NullPointerException.class,
() -> table.filterNot((key, value) -> false, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
);
}
@Test
public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
assertThrows(NullPointerException.class, () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
assertThrows(
NullPointerException.class,
() -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
);
}
@Test
public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
assertThrows(NullPointerException.class, () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
assertThrows(
NullPointerException.class,
() -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
);
}
@Test
public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
assertThrows(NullPointerException.class, () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
assertThrows(
NullPointerException.class,
() -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
);
}
@Test
@ -573,12 +588,11 @@ public class KTableImplTest {
assertThrows(NullPointerException.class, () -> table.transformValues(null));
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
final ValueTransformerWithKeySupplier<String, String, ?> valueTransformerSupplier =
mock(ValueTransformerWithKeySupplier.class);
assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized) null));
assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized<String, Object, KeyValueStore<Bytes, byte[]>>) null));
}
@Test