MINOR: fix NPE in KS `Topology` for new `AutoOffsetReset` (#18780)

Introduced via KIP-1106.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-02-03 17:24:47 -08:00 committed by GitHub
parent ab8ef87c7f
commit ce6f078192
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 76 additions and 21 deletions

View File

@ -169,7 +169,14 @@ public class Topology {
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
null,
null,
topics
);
return this;
}
@ -215,7 +222,14 @@ public class Topology {
public synchronized Topology addSource(final org.apache.kafka.streams.AutoOffsetReset offsetReset,
final String name,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, null, null, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
null,
null,
topicPattern
);
return this;
}
@ -304,7 +318,14 @@ public class Topology {
final TimestampExtractor timestampExtractor,
final String name,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
null,
null,
topics
);
return this;
}
@ -351,7 +372,14 @@ public class Topology {
final TimestampExtractor timestampExtractor,
final String name,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, null, null, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
null,
null,
topicPattern
);
return this;
}
@ -457,7 +485,14 @@ public class Topology {
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
keyDeserializer,
valueDeserializer,
topics
);
return this;
}
@ -514,7 +549,14 @@ public class Topology {
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, null, keyDeserializer, valueDeserializer, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
null,
keyDeserializer,
valueDeserializer,
topicPattern
);
return this;
}
@ -571,7 +613,14 @@ public class Topology {
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final String... topics) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topics);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topics
);
return this;
}
@ -634,7 +683,14 @@ public class Topology {
final Deserializer<?> keyDeserializer,
final Deserializer<?> valueDeserializer,
final Pattern topicPattern) {
internalTopologyBuilder.addSource(new AutoOffsetResetInternal(offsetReset), name, timestampExtractor, keyDeserializer, valueDeserializer, topicPattern);
internalTopologyBuilder.addSource(
offsetReset == null ? null : new AutoOffsetResetInternal(offsetReset),
name,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topicPattern
);
return this;
}

View File

@ -146,8 +146,7 @@ public class TopologyTest {
@Test
public void shouldNotAllowNullProcessorSupplierWhenAddingProcessor() {
assertThrows(NullPointerException.class, () -> topology.addProcessor("name",
(ProcessorSupplier<Object, Object, Object, Object>) null));
assertThrows(NullPointerException.class, () -> topology.addProcessor("name", null));
}
@Test
@ -376,6 +375,7 @@ public class TopologyTest {
}
}
@SuppressWarnings("resource")
@Test
public void shouldThrowOnUnassignedStateStoreAccess() {
final String sourceNodeName = "source";
@ -411,7 +411,7 @@ public class TopologyTest {
@Override
public Processor<Object, Object, Object, Object> get() {
return new Processor<Object, Object, Object, Object>() {
return new Processor<>() {
@Override
public void init(final ProcessorContext<Object, Object> context) {
context.getStateStore(STORE_NAME);
@ -1157,7 +1157,7 @@ public class TopologyTest {
public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
final StreamsBuilder builder = new StreamsBuilder();
final TopicNameExtractor<Object, Object> topicNameExtractor = new TopicNameExtractor<Object, Object>() {
final TopicNameExtractor<Object, Object> topicNameExtractor = new TopicNameExtractor<>() {
@Override
public String extract(final Object key, final Object value, final RecordContext recordContext) {
return recordContext.topic() + "-" + key;
@ -2257,16 +2257,16 @@ public class TopologyTest {
private TopologyDescription.Source addSource(final String sourceName,
final String... sourceTopic) {
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourceTopic);
final StringBuilder allSourceTopics = new StringBuilder(sourceTopic[0]);
for (int i = 1; i < sourceTopic.length; ++i) {
allSourceTopics.append(", ").append(sourceTopic[i]);
}
topology.addSource((AutoOffsetReset) null, sourceName, null, null, null, sourceTopic);
return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null);
}
@SuppressWarnings("deprecation")
private TopologyDescription.Source addSource(final String sourceName,
final Pattern sourcePattern) {
// we still test the old `Topology.AutoOffsetReset` here, to increase test coverage
// (cf `addSource` about which used the new one)
// When can rewrite this to the new one, when the old one is removed
topology.addSource((Topology.AutoOffsetReset) null, sourceName, null, null, null, sourcePattern);
return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern);
}
@ -2338,7 +2338,6 @@ public class TopologyTest {
return expectedSinkNode;
}
@Deprecated // testing old PAPI
private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
final String sourceName,
final String globalTopicName,
@ -2441,17 +2440,17 @@ public class TopologyTest {
topology.addSource("source", "topic");
topology.addProcessor(
"p1",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"source"
);
topology.addProcessor(
"p2",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"p1"
);
topology.addProcessor(
"p3",
() -> (Processor<Object, Object, Object, Object>) record -> System.out.println("Processing: " + random.nextInt()),
() -> record -> System.out.println("Processing: " + random.nextInt()),
"p2"
);
assertThat(counter.numWrappedProcessors(), is(3));