KAFKA-10605: Deprecate old PAPI registration methods (#9448)

Add deprecation annotations to the methods replaced in KIP-478.

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
John Roesler 2020-10-19 15:29:27 -05:00 committed by GitHub
parent fd71e1355b
commit 659b05f78a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 35 deletions

View File

@ -22,10 +22,11 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@ -54,17 +55,17 @@ import java.util.concurrent.CountDownLatch;
*/
public final class WordCountProcessorDemo {
static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
public Processor<String, String, String, String> get() {
return new Processor<String, String, String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<String, String> context) {
this.context = context;
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
@ -75,30 +76,27 @@ public final class WordCountProcessorDemo {
System.out.println("[" + entry.key + ", " + entry.value + "]");
context.forward(entry.key, entry.value.toString());
context.forward(new Record<>(entry.key, entry.value.toString(), timestamp));
}
}
});
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
kvStore = context.getStateStore("Counts");
}
@Override
public void process(final String dummy, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
public void process(final Record<String, String> record) {
final String[] words = record.value().toLowerCase(Locale.getDefault()).split(" ");
for (final String word : words) {
final Integer oldValue = this.kvStore.get(word);
final Integer oldValue = kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void close() {}
};
}
}

View File

@ -17,27 +17,27 @@
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
import java.util.Iterator;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
*/
public class WordCountProcessorTest {
@SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void test() {
final MockProcessorContext context = new MockProcessorContext();
final MockProcessorContext<String, String> context = new MockProcessorContext<String, String>();
// Create, initialize, and register the state store.
final KeyValueStore<String, Integer> store =
@ -45,15 +45,14 @@ public class WordCountProcessorTest {
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
// Caching is disabled by default, but FYI: caching is also not supported by MockProcessorContext.
.build();
store.init(context, store);
context.register(store, null);
store.init(context.getStateStoreContext(), store);
// Create and initialize the processor under test
final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get();
processor.init(context);
// send a record to the processor
processor.process("key", "alpha beta gamma alpha");
processor.process(new Record<>("key", "alpha beta gamma alpha", 0L));
// note that the processor does not forward during process()
assertTrue(context.forwarded().isEmpty());
@ -62,10 +61,11 @@ public class WordCountProcessorTest {
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
// finally, we can verify the output.
final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue());
assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue());
assertFalse(capturedForwards.hasNext());
final List<MockProcessorContext.CapturedForward<String, String>> expected = Arrays.asList(
new MockProcessorContext.CapturedForward<>(new Record<>("alpha", "2", 0L)),
new MockProcessorContext.CapturedForward<>(new Record<>("beta", "1", 0L)),
new MockProcessorContext.CapturedForward<>(new Record<>("gamma", "1", 0L))
);
assertThat(context.forwarded(), is(expected));
}
}

View File

@ -655,8 +655,10 @@ public class Topology {
* and process
* @return itself
* @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name
* @deprecated Since 2.7.0 Use {@link this#addProcessor(String, ProcessorSupplier, String...)} instead.
*/
@SuppressWarnings("rawtypes")
@Deprecated
public synchronized Topology addProcessor(final String name,
final org.apache.kafka.streams.processor.ProcessorSupplier supplier,
final String... parentNames) {
@ -740,7 +742,9 @@ public class Topology {
* @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
* @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final Deserializer<K> keyDeserializer,
@ -784,7 +788,9 @@ public class Topology {
* @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already registered
* @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
*/
@Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder,
final String sourceName,
final TimestampExtractor timestampExtractor,

View File

@ -60,7 +60,7 @@ public interface TopologyDescription {
/**
* Represents a {@link Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
* org.apache.kafka.common.serialization.Deserializer, org.apache.kafka.common.serialization.Deserializer, String,
* String, org.apache.kafka.streams.processor.ProcessorSupplier) global store}.
* String, org.apache.kafka.streams.processor.api.ProcessorSupplier) global store}.
* Adding a global store results in adding a source node and one stateful processor node.
* Note, that all added global stores form a single unit (similar to a {@link Subtopology}) even if different
* global stores are not connected to each other.

View File

@ -937,6 +937,7 @@ public class KafkaStreamsTest {
}
@SuppressWarnings("unchecked")
@Deprecated // testing old PAPI
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,

View File

@ -382,6 +382,7 @@ public class TopologyTest {
}
}
@Deprecated // testing old PAPI
@Test(expected = TopologyException.class)
public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
@ -1227,6 +1228,7 @@ public class TopologyTest {
return expectedSinkNode;
}
@Deprecated // testing old PAPI
private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
final String sourceName,
final String globalTopicName,

View File

@ -293,6 +293,7 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
@Deprecated // testing old PAPI
@Test
public void testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
final String storeName = "connectedStore";
@ -355,6 +356,7 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
@Deprecated // testing old PAPI
@Test
public void shouldDriveGlobalStore() {
final String storeName = "my-store";
@ -621,6 +623,7 @@ public class ProcessorTopologyTest {
return topology.getInternalBuilder("anyAppId").buildTopology();
}
@Deprecated // testing old PAPI
private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) {
final TopologyWrapper topology = new TopologyWrapper();
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
@ -682,6 +685,7 @@ public class ProcessorTopologyTest {
.addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), "child2");
}
@Deprecated // testing old PAPI
private Topology createMultiplexingTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@ -690,6 +694,7 @@ public class ProcessorTopologyTest {
.addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
@Deprecated // testing old PAPI
private Topology createMultiplexByNameTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@ -698,6 +703,7 @@ public class ProcessorTopologyTest {
.addSink("sink1", OUTPUT_TOPIC_2, "processor");
}
@Deprecated // testing old PAPI
private Topology createStatefulTopology(final String storeName) {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
@ -706,6 +712,7 @@ public class ProcessorTopologyTest {
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}
@Deprecated // testing old PAPI
private Topology createConnectedStateStoreTopology(final String storeName) {
final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), Serdes.String(), Serdes.String());
return topology