KAFKA-12829: Remove deprecated Topology#addGlobalStore of old Processor API (#16791)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
JohnHuang 2024-08-30 07:52:52 +08:00 committed by GitHub
parent 865cdfc1cd
commit b154f58ce8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1 additions and 154 deletions

View File

@ -820,110 +820,6 @@ public class Topology {
);
}
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the provided input topic.
* There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
* <p>
* A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
* of the input topic.
* <p>
* The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
* records forwarded from the {@link SourceNode}.
* The supplier should always generate a new instance each time
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single
* {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern
* and leads to runtime exceptions.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
*
* @param storeBuilder user defined state store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param keyDeserializer the {@link Deserializer} to deserialize keys with
* @param valueDeserializer the {@link Deserializer} to deserialize values with
* @param topic the topic to source the data from
* @param processorName the name of the {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
* @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final String topic,
final String processorName,
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
valueDeserializer,
topic,
processorName,
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
true
);
return this;
}
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the provided input topic.
* There will be exactly one instance of this {@link StateStore} per Kafka Streams instance.
* <p>
* A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions
* of the input topic.
* <p>
* The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all
* records forwarded from the {@link SourceNode}.
* The supplier should always generate a new instance each time
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single
* {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in
* {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern
* and leads to runtime exceptions.
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
*
* @param storeBuilder user defined key value store builder
* @param sourceName name of the {@link SourceNode} that will be automatically added
* @param timestampExtractor the stateless timestamp extractor used for this source,
* if not specified the default extractor defined in the configs will be used
* @param keyDeserializer the {@link Deserializer} to deserialize keys with
* @param valueDeserializer the {@link Deserializer} to deserialize values with
* @param topic the topic to source the data from
* @param processorName the name of the {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
* @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final String topic,
final String processorName,
final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
true
);
return this;
}
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the provided input topic.

View File

@ -1268,35 +1268,6 @@ public class ProcessorTopologyTest {
}
@Deprecated // testing old PAPI
@Test
public void shouldDriveGlobalStore() {
final String storeName = "my-store";
final String global = "global";
final String topic = "topic";
topology.addGlobalStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(storeName),
Serdes.String(),
Serdes.String()
).withLoggingDisabled(),
global,
STRING_DESERIALIZER,
STRING_DESERIALIZER,
topic,
"processor",
define(new OldAPIStatefulProcessor(storeName)));
driver = new TopologyTestDriver(topology, props);
final TestInputTopic<String, String> inputTopic = driver.createInputTopic(topic, STRING_SERIALIZER, STRING_SERIALIZER);
final KeyValueStore<String, String> globalStore = driver.getKeyValueStore(storeName);
inputTopic.pipeInput("key1", "value1");
inputTopic.pipeInput("key2", "value2");
assertEquals("value1", globalStore.get("key1"));
assertEquals("value2", globalStore.get("key2"));
}
@Test
public void testDrivingSimpleMultiSourceTopology() {
final int partition = 10;
@ -1512,18 +1483,6 @@ public class ProcessorTopologyTest {
assertTrue(processorTopology.hasPersistentLocalStore());
}
@Test
public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
assertFalse(processorTopology.hasPersistentGlobalStore());
}
@Test
public void persistentGlobalStoreShouldBeDetected() {
final ProcessorTopology processorTopology = createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
assertTrue(processorTopology.hasPersistentGlobalStore());
}
private ProcessorTopology createLocalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
final TopologyWrapper topology = new TopologyWrapper();
final String processor = "processor";
@ -1535,15 +1494,7 @@ public class ProcessorTopologyTest {
return topology.getInternalBuilder("anyAppId").buildTopology();
}
@Deprecated // testing old PAPI
private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
final TopologyWrapper topology = new TopologyWrapper();
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled();
topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor",
define(new OldAPIStatefulProcessor(storeSupplier.name())));
return topology.getInternalBuilder("anyAppId").buildTopology();
}
private void assertNextOutputRecord(final TestRecord<String, String> record,
final String key,