diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java index f531273ab9e..fa664ea9063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java @@ -27,7 +27,10 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.*; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; @@ -36,28 +39,37 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.api.*; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.hamcrest.Matchers; -import org.junit.*; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.Timeout; import java.io.IOException; import java.time.Duration; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.stream.Collectors; -import static java.time.Duration.ofSeconds; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertEquals; @Category({IntegrationTest.class}) public class TaskIdlingIntegrationTest { @@ -78,14 +90,14 @@ public class TaskIdlingIntegrationTest { Utils.mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") ) ); - private static Properties CONSUMER_CONFIG; - private static Properties PRODUCER_CONFIG_1; + private static Properties consumerConfig; + private static Properties producerConfig; @BeforeClass public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); //Use multiple partitions to ensure distribution of keys. - CONSUMER_CONFIG = Utils.mkProperties( + consumerConfig = Utils.mkProperties( Utils.mkMap( Utils.mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), Utils.mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), @@ -93,7 +105,7 @@ public class TaskIdlingIntegrationTest { ) ); - PRODUCER_CONFIG_1 = Utils.mkProperties( + producerConfig = Utils.mkProperties( Utils.mkMap( Utils.mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), Utils.mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()), @@ -106,10 +118,10 @@ public class TaskIdlingIntegrationTest { CLUSTER.createTopic(STREAM_3, 1, 1); CLUSTER.createTopic(STREAM_4, 1, 1); - try (final Producer producer = new KafkaProducer<>(PRODUCER_CONFIG_1)) { - String[] inputs = {STREAM_1, STREAM_2, STREAM_3}; + try (final Producer producer = new KafkaProducer<>(producerConfig)) { + final String[] inputs = {STREAM_1, STREAM_2, STREAM_3}; for (int i = 0; i < 10_000; i++) { - for (String input : inputs) { + for (final String input : inputs) { producer.send( new ProducerRecord<>( input, @@ -156,7 +168,7 @@ public class TaskIdlingIntegrationTest { final KStream merge = stream1.merge(stream2).merge(stream3); final ConcurrentLinkedDeque> mapSeen = new ConcurrentLinkedDeque<>(); final KStream map = merge.map((key, value) -> { - KeyValue keyValue = new KeyValue<>(key, value); + final KeyValue keyValue = new KeyValue<>(key, value); mapSeen.offer(keyValue); return keyValue; }); @@ -168,22 +180,22 @@ public class TaskIdlingIntegrationTest { ) ); final ConcurrentLinkedDeque> processSeen = new ConcurrentLinkedDeque<>(); - KStream process = map.process( + final KStream process = map.process( () -> new ContextualProcessor() { private KeyValueStore store; @Override - public void init(ProcessorContext context) { + public void init(final ProcessorContext context) { super.init(context); store = context.getStateStore("STORE"); } @Override - public void process(Record record) { + public void process(final Record record) { processSeen.offer(record); store.put(record.key(), record.value()); - String topic = String.format( + final String topic = String.format( "%s %d %d", context().recordMetadata().get().topic(), context().recordMetadata().get().partition(), @@ -201,10 +213,10 @@ public class TaskIdlingIntegrationTest { try ( final KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(streamsConfig, streamsBuilder, true); - final KafkaConsumer consumer = new KafkaConsumer<>(CONSUMER_CONFIG); + final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); ) { - Map> topics = consumer.listTopics(); - List partitions = + final Map> topics = consumer.listTopics(); + final List partitions = topics .get(STREAM_4) .stream() @@ -215,8 +227,8 @@ public class TaskIdlingIntegrationTest { consumer.seekToBeginning(partitions); while (consumerSeen.size() < (10_000 * 3)) { - ConsumerRecords poll = consumer.poll(Duration.ofMillis(100L)); - for (ConsumerRecord record : poll) { + final ConsumerRecords poll = consumer.poll(Duration.ofMillis(100L)); + for (final ConsumerRecord record : poll) { System.out.println(record.key() + " " + record.value()); consumerSeen.add(record); } @@ -228,11 +240,11 @@ public class TaskIdlingIntegrationTest { for (int i = 0; i < 10_000; i++) { for (int j = 0; j < 3; j++) { assertThat(mapSeen.poll().key, Matchers.is(i)); - Record processRecord = processSeen.poll(); + final Record processRecord = processSeen.poll(); assertThat(processRecord.key(), Matchers.is(i)); assertThat(processRecord.timestamp(), Matchers.greaterThan(lastTimestamp)); lastTimestamp = processRecord.timestamp(); - ConsumerRecord consumerRecord = consumerSeen.get(consumeIdx++); + final ConsumerRecord consumerRecord = consumerSeen.get(consumeIdx++); assertThat(consumerRecord.key(), Matchers.is(i)); } }