diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index fffca97400e..9a25afd4350 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -169,7 +169,14 @@ public class Topology { public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final String... topics) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topics); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + null, + null, + null, + topics + ); return this; } @@ -215,7 +222,14 @@ public class Topology { public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + null, + null, + null, + topicPattern + ); return this; } @@ -304,7 +318,14 @@ public class Topology { final TimestampExtractor timestampExtractor, final String name, final String... topics) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topics); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + timestampExtractor, + null, + null, + topics + ); return this; } @@ -351,7 +372,14 @@ public class Topology { final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topicPattern); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + timestampExtractor, + null, + null, + topicPattern + ); return this; } @@ -457,7 +485,14 @@ public class Topology { final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String... topics) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topics); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + null, + keyDeserializer, + valueDeserializer, + topics + ); return this; } @@ -514,7 +549,14 @@ public class Topology { final Deserializer keyDeserializer, final Deserializer valueDeserializer, final Pattern topicPattern) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topicPattern); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + null, + keyDeserializer, + valueDeserializer, + topicPattern + ); return this; } @@ -571,7 +613,14 @@ public class Topology { final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String... topics) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topics); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + timestampExtractor, + keyDeserializer, + valueDeserializer, + topics + ); return this; } @@ -634,7 +683,14 @@ public class Topology { final Deserializer keyDeserializer, final Deserializer valueDeserializer, final Pattern topicPattern) { - internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern); + internalTopologyBuilder.addSource( + offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset), + name, + timestampExtractor, + keyDeserializer, + valueDeserializer, + topicPattern + ); return this; } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 0dc0179c6e5..0cb91b12a58 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -146,8 +146,7 @@ public class TopologyTest { @Test public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() { - assertThrows(NullPointerException.class, () -> topology.addProcessor("name", - (ProcessorSupplier) null)); + assertThrows(NullPointerException.class, () -> topology.addProcessor("name", null)); } @Test @@ -376,6 +375,7 @@ public class TopologyTest { } } + @SuppressWarnings("resource") @Test public void shouldThrowOnUnassignedStateStoreAccess() { final String sourceNodeName = "source"; @@ -411,7 +411,7 @@ public class TopologyTest { @Override public Processor get() { - return new Processor() { + return new Processor<>() { @Override public void init(final ProcessorContext context) { context.getStateStore(STORE_NAME); @@ -1157,7 +1157,7 @@ public class TopologyTest { public void topologyWithDynamicRoutingShouldDescribeExtractorClass() { final StreamsBuilder builder = new StreamsBuilder(); - final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() { + final TopicNameExtractor topicNameExtractor = new TopicNameExtractor<>() { @Override public String extract(final Object key, final Object value, final RecordContext recordContext) { return recordContext.topic() + "-" + key; @@ -2257,16 +2257,16 @@ public class TopologyTest { private TopologyDescription.Source addSource(final String sourceName, final String... sourceTopic) { - topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourceTopic); - final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]); - for (int i = 1; i < sourceTopic.length; ++i) { - allSourceTopics.append(", ").append(sourceTopic[i]); - } + topology.addSource((AutoOffsetReset) null, sourceName, null, null, null, sourceTopic); return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null); } + @SuppressWarnings("deprecation") private TopologyDescription.Source addSource(final String sourceName, final Pattern sourcePattern) { + // we still test the old `Topology.AutoOffsetReset` here, to increase test coverage + // (cf `addSource` about which used the new one) + // When can rewrite this to the new one, when the old one is removed topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourcePattern); return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern); } @@ -2338,7 +2338,6 @@ public class TopologyTest { return expectedSinkNode; } - @Deprecated // testing old PAPI private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName, final String sourceName, final String globalTopicName, @@ -2441,17 +2440,17 @@ public class TopologyTest { topology.addSource("source", "topic"); topology.addProcessor( "p1", - () -> (Processor) record -> System.out.println("Processing: " + random.nextInt()), + () -> record -> System.out.println("Processing: " + random.nextInt()), "source" ); topology.addProcessor( "p2", - () -> (Processor) record -> System.out.println("Processing: " + random.nextInt()), + () -> record -> System.out.println("Processing: " + random.nextInt()), "p1" ); topology.addProcessor( "p3", - () -> (Processor) record -> System.out.println("Processing: " + random.nextInt()), + () -> record -> System.out.println("Processing: " + random.nextInt()), "p2" ); assertThat(counter.numWrappedProcessors(), is(3));