From 1123a76110b5f5a5eb7ac77321655a2a8adc4f83 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 29 Jan 2025 11:13:57 -0800 Subject: [PATCH] KAFKA-13722: remove internal usage of old ProcessorContext (#18698) Reviewers: Lucas Brutschy --- .../apache/kafka/streams/kstream/KTable.java | 2 +- .../internals/KStreamKTableJoinProcessor.java | 6 +- .../streams/processor/ProcessorContext.java | 6 +- .../kafka/streams/processor/Punctuator.java | 11 +- .../internals/ProcessorContextUtils.java | 12 +- .../AbstractRocksDBSegmentedBytesStore.java | 5 +- .../ChangeLoggingKeyValueBytesStore.java | 2 +- .../internals/RocksDBSegmentedBytesStore.java | 2 +- ...RocksDBTimestampedSegmentedBytesStore.java | 2 +- .../kstream/internals/KTableImplTest.java | 120 ++++++++++-------- 10 files changed, 92 insertions(+), 76 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 1c8fb3fea39..7082355eb45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -1054,7 +1054,7 @@ public interface KTable { * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is * an internally generated name, and "-repartition" is a fixed suffix. * - * You can retrieve all generated internal topic names via {@link Topology#describe()}. + *

You can retrieve all generated internal topic names via {@link Topology#describe()}. * *

* All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java index e81877c99e7..637d870ee7e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java @@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess private final Optional gracePeriod; private TimeOrderedKeyValueBuffer buffer; protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private final boolean useBuffer; private final String storeName; @@ -78,7 +78,7 @@ class KStreamKTableJoinProcessor extends ContextualProcess final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); valueGetter.init(context); - internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context); + internalProcessorContext = asInternalProcessorContext(context); if (useBuffer) { if (!valueGetter.isVersioned() && gracePeriod.isPresent()) { throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join."); @@ -90,7 +90,6 @@ class KStreamKTableJoinProcessor extends ContextualProcess @Override public void process(final Record record) { - internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context()); updateObservedStreamTime(record.timestamp()); if (maybeDropRecord(record)) { return; @@ -123,7 +122,6 @@ class KStreamKTableJoinProcessor extends ContextualProcess observedStreamTime = Math.max(observedStreamTime, timestamp); } - @SuppressWarnings("unchecked") private void doJoin(final Record record) { final K2 mappedKey = keyMapper.apply(record.key(), record.value()); final V2 value2 = getValue2(record, mappedKey); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 3d057c5ce2b..d65244bcc86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -41,7 +41,11 @@ import java.util.Map; * We need to clean this all up (https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface * deprecated afterward. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("deprecation") // Not deprecating the old context, since it is used by Transformers. See KAFKA-10603. +/* + * When we deprecate `ProcessorContext` can also deprecate `To` class, + * as it is only used in the `ProcessorContext#forward` method. + */ public interface ProcessorContext { /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java index dd533ad7ba2..9b76962f3b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java @@ -21,7 +21,8 @@ import org.apache.kafka.streams.processor.api.Record; import java.time.Duration; /** - * A functional interface used as an argument to {@link ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. + * A functional interface used as an argument to + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}. * * @see Cancellable */ @@ -30,14 +31,16 @@ public interface Punctuator { /** * Perform the scheduled periodic operation. * - *

If this method accesses {@link ProcessorContext} or + *

If this method accesses {@link org.apache.kafka.streams.processor.api.ProcessorContext} or * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record metadata like topic, * partition, and offset or {@link org.apache.kafka.streams.processor.api.RecordMetadata} won't * be available. * - *

Furthermore, for any record that is sent downstream via {@link ProcessorContext#forward(Object, Object)} + *

Furthermore, for any record that is sent downstream via + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} * or {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there - * won't be any record metadata. If {@link ProcessorContext#forward(Object, Object)} is used, + * won't be any record metadata. If + * {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is used, * it's also not possible to set records headers. * * @param timestamp when the operation is being called, depending on {@link PunctuationType} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java index 20890088999..0515f8718aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import java.util.Map; @@ -64,8 +64,9 @@ public final class ProcessorContextUtils { } } - @SuppressWarnings("unchecked") - public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) { + public static InternalProcessorContext asInternalProcessorContext( + final ProcessorContext context + ) { if (context instanceof InternalProcessorContext) { return (InternalProcessorContext) context; } else { @@ -75,10 +76,9 @@ public final class ProcessorContextUtils { } } - @SuppressWarnings("unchecked") - public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { + public static InternalProcessorContext asInternalProcessorContext(final StateStoreContext context) { if (context instanceof InternalProcessorContext) { - return (InternalProcessorContext) context; + return (InternalProcessorContext) context; } else { throw new IllegalArgumentException( "This component requires internal features of Kafka Streams and must be disabled for unit tests." diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index f5b4366ae98..bde8d831919 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se private final String name; private final AbstractSegments segments; - private final String metricScope; private final long retentionPeriod; private final KeySchema keySchema; - private InternalProcessorContext internalProcessorContext; + private InternalProcessorContext internalProcessorContext; private Sensor expiredRecordSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private boolean consistencyEnabled = false; @@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore implements Se private volatile boolean open; AbstractRocksDBSegmentedBytesStore(final String name, - final String metricScope, final long retentionPeriod, final KeySchema keySchema, final AbstractSegments segments) { this.name = name; - this.metricScope = metricScope; this.retentionPeriod = retentionPeriod; this.keySchema = keySchema; this.segments = segments; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 5405ad9a71c..9c1c3f9ae76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -131,4 +131,4 @@ public class ChangeLoggingKeyValueBytesStore void log(final Bytes key, final byte[] value, final long timestamp) { internalContext.logChange(name(), key, value, timestamp, wrapped().getPosition()); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index e7b7198d1cf..33e787adb53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends AbstractRocksDBSegmentedBytesSto final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); + super(name, retention, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval)); } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java index 39f493c761b..2f6bcc5c052 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java @@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends AbstractRocksDBSegmen final long retention, final long segmentInterval, final KeySchema keySchema) { - super(name, metricsScope, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); + super(name, retention, keySchema, new TimestampedSegments(name, metricsScope, retention, segmentInterval)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index ebc06819631..a293625dc30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -233,7 +233,7 @@ public class KTableImplTest { final ValueMapper mapper = value -> value; final ValueJoiner joiner = (value1, value2) -> value1; final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = - () -> new ValueTransformerWithKey() { + () -> new ValueTransformerWithKey<>() { @Override public void init(final ProcessorContext context) {} @@ -247,103 +247,103 @@ public class KTableImplTest { }; assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper)).keySerde(), + ((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); + assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.toStream()).keySerde(), + ((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde()); assertEquals( - ((AbstractStream) table1.toStream()).valueSerde(), + ((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde()); - assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); + assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); assertEquals( - ((AbstractStream) table1.toStream(selector)).valueSerde(), + ((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); + assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); assertEquals( - ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); - assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); - assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde()); + assertNull(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde()); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner)).keySerde(), + ((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.join(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); + assertNull(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde()); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); assertEquals( - ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), + ((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); } @@ -462,25 +462,25 @@ public class KTableImplTest { assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007"); assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008"); - final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) + final Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")) .getClass() .getDeclaredField("valSerializer"); - final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) + final Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")) .getClass() .getDeclaredField("valDeserializer"); valSerializerField.setAccessible(true); valDeserializerField.setAccessible(true); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); - assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); - assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner()); + assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner()); + assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner()); } } @Test public void shouldNotAllowNullSelectorOnToStream() { - assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.toStream((KeyValueMapper) null)); } @Test @@ -495,12 +495,12 @@ public class KTableImplTest { @Test public void shouldNotAllowNullMapperOnMapValues() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapper) null)); } @Test public void shouldNotAllowNullMapperOnMapValueWithKey() { - assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); + assertThrows(NullPointerException.class, () -> table.mapValues((ValueMapperWithKey) null)); } @Test @@ -545,27 +545,42 @@ public class KTableImplTest { @Test public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filter((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filter((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.filterNot((key, value) -> false, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.filterNot((key, value) -> false, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() { - assertThrows(NullPointerException.class, () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null)); + assertThrows( + NullPointerException.class, + () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized>) null) + ); } @Test @@ -573,12 +588,11 @@ public class KTableImplTest { assertThrows(NullPointerException.class, () -> table.transformValues(null)); } - @SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class); - assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized) null)); + assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); } @Test