This commit is contained in:
John Roesler 2022-09-30 12:08:01 -05:00
parent e15bb6c3df
commit 1248149817
1 changed files with 39 additions and 27 deletions

View File

@ -27,7 +27,10 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
@ -36,28 +39,37 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.*; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.*; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.time.Duration.ofSeconds;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@Category({IntegrationTest.class}) @Category({IntegrationTest.class})
public class TaskIdlingIntegrationTest { public class TaskIdlingIntegrationTest {
@ -78,14 +90,14 @@ public class TaskIdlingIntegrationTest {
Utils.mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") Utils.mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
) )
); );
private static Properties CONSUMER_CONFIG; private static Properties consumerConfig;
private static Properties PRODUCER_CONFIG_1; private static Properties producerConfig;
@BeforeClass @BeforeClass
public static void startCluster() throws IOException, InterruptedException { public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start(); CLUSTER.start();
//Use multiple partitions to ensure distribution of keys. //Use multiple partitions to ensure distribution of keys.
CONSUMER_CONFIG = Utils.mkProperties( consumerConfig = Utils.mkProperties(
Utils.mkMap( Utils.mkMap(
Utils.mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), Utils.mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Utils.mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), Utils.mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
@ -93,7 +105,7 @@ public class TaskIdlingIntegrationTest {
) )
); );
PRODUCER_CONFIG_1 = Utils.mkProperties( producerConfig = Utils.mkProperties(
Utils.mkMap( Utils.mkMap(
Utils.mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), Utils.mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Utils.mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()), Utils.mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()),
@ -106,10 +118,10 @@ public class TaskIdlingIntegrationTest {
CLUSTER.createTopic(STREAM_3, 1, 1); CLUSTER.createTopic(STREAM_3, 1, 1);
CLUSTER.createTopic(STREAM_4, 1, 1); CLUSTER.createTopic(STREAM_4, 1, 1);
try (final Producer<Object, Object> producer = new KafkaProducer<>(PRODUCER_CONFIG_1)) { try (final Producer<Object, Object> producer = new KafkaProducer<>(producerConfig)) {
String[] inputs = {STREAM_1, STREAM_2, STREAM_3}; final String[] inputs = {STREAM_1, STREAM_2, STREAM_3};
for (int i = 0; i < 10_000; i++) { for (int i = 0; i < 10_000; i++) {
for (String input : inputs) { for (final String input : inputs) {
producer.send( producer.send(
new ProducerRecord<>( new ProducerRecord<>(
input, input,
@ -156,7 +168,7 @@ public class TaskIdlingIntegrationTest {
final KStream<Integer, Integer> merge = stream1.merge(stream2).merge(stream3); final KStream<Integer, Integer> merge = stream1.merge(stream2).merge(stream3);
final ConcurrentLinkedDeque<KeyValue<Integer, Integer>> mapSeen = new ConcurrentLinkedDeque<>(); final ConcurrentLinkedDeque<KeyValue<Integer, Integer>> mapSeen = new ConcurrentLinkedDeque<>();
final KStream<Integer, Integer> map = merge.map((key, value) -> { final KStream<Integer, Integer> map = merge.map((key, value) -> {
KeyValue<Integer, Integer> keyValue = new KeyValue<>(key, value); final KeyValue<Integer, Integer> keyValue = new KeyValue<>(key, value);
mapSeen.offer(keyValue); mapSeen.offer(keyValue);
return keyValue; return keyValue;
}); });
@ -168,22 +180,22 @@ public class TaskIdlingIntegrationTest {
) )
); );
final ConcurrentLinkedDeque<Record<Integer, Integer>> processSeen = new ConcurrentLinkedDeque<>(); final ConcurrentLinkedDeque<Record<Integer, Integer>> processSeen = new ConcurrentLinkedDeque<>();
KStream<Integer, String> process = map.process( final KStream<Integer, String> process = map.process(
() -> new ContextualProcessor<Integer, Integer, Integer, String>() { () -> new ContextualProcessor<Integer, Integer, Integer, String>() {
private KeyValueStore<Integer, Integer> store; private KeyValueStore<Integer, Integer> store;
@Override @Override
public void init(ProcessorContext<Integer, String> context) { public void init(final ProcessorContext<Integer, String> context) {
super.init(context); super.init(context);
store = context.getStateStore("STORE"); store = context.getStateStore("STORE");
} }
@Override @Override
public void process(Record<Integer, Integer> record) { public void process(final Record<Integer, Integer> record) {
processSeen.offer(record); processSeen.offer(record);
store.put(record.key(), record.value()); store.put(record.key(), record.value());
String topic = String.format( final String topic = String.format(
"%s %d %d", "%s %d %d",
context().recordMetadata().get().topic(), context().recordMetadata().get().topic(),
context().recordMetadata().get().partition(), context().recordMetadata().get().partition(),
@ -201,10 +213,10 @@ public class TaskIdlingIntegrationTest {
try ( try (
final KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(streamsConfig, streamsBuilder, true); final KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(streamsConfig, streamsBuilder, true);
final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(CONSUMER_CONFIG); final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerConfig);
) { ) {
Map<String, List<PartitionInfo>> topics = consumer.listTopics(); final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
List<TopicPartition> partitions = final List<TopicPartition> partitions =
topics topics
.get(STREAM_4) .get(STREAM_4)
.stream() .stream()
@ -215,8 +227,8 @@ public class TaskIdlingIntegrationTest {
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
while (consumerSeen.size() < (10_000 * 3)) { while (consumerSeen.size() < (10_000 * 3)) {
ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100L)); final ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<Integer, String> record : poll) { for (final ConsumerRecord<Integer, String> record : poll) {
System.out.println(record.key() + " " + record.value()); System.out.println(record.key() + " " + record.value());
consumerSeen.add(record); consumerSeen.add(record);
} }
@ -228,11 +240,11 @@ public class TaskIdlingIntegrationTest {
for (int i = 0; i < 10_000; i++) { for (int i = 0; i < 10_000; i++) {
for (int j = 0; j < 3; j++) { for (int j = 0; j < 3; j++) {
assertThat(mapSeen.poll().key, Matchers.is(i)); assertThat(mapSeen.poll().key, Matchers.is(i));
Record<Integer, Integer> processRecord = processSeen.poll(); final Record<Integer, Integer> processRecord = processSeen.poll();
assertThat(processRecord.key(), Matchers.is(i)); assertThat(processRecord.key(), Matchers.is(i));
assertThat(processRecord.timestamp(), Matchers.greaterThan(lastTimestamp)); assertThat(processRecord.timestamp(), Matchers.greaterThan(lastTimestamp));
lastTimestamp = processRecord.timestamp(); lastTimestamp = processRecord.timestamp();
ConsumerRecord<Integer, String> consumerRecord = consumerSeen.get(consumeIdx++); final ConsumerRecord<Integer, String> consumerRecord = consumerSeen.get(consumeIdx++);
assertThat(consumerRecord.key(), Matchers.is(i)); assertThat(consumerRecord.key(), Matchers.is(i));
} }
} }