From 5338f8432f74054671e9da59ba6f97abb81a03f2 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Thu, 15 Oct 2015 17:28:23 -0700 Subject: [PATCH] KAFKA-2487: change kafka.examples.Consumer to use the new java consumer Author: Ashish Singh Reviewers: Guozhang Wang Closes #297 from SinghAsDev/KAFKA-2487 --- .../main/java/kafka/examples/Consumer.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 8af64d886d9..3bb93ee2992 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -16,52 +16,52 @@ */ package kafka.examples; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.Collections; import java.util.Properties; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; +import kafka.utils.ShutdownableThread; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; -public class Consumer extends Thread +public class Consumer extends ShutdownableThread { - private final ConsumerConnector consumer; + private final KafkaConsumer consumer; private final String topic; - + public Consumer(String topic) { - consumer = kafka.consumer.Consumer.createJavaConsumerConnector( - createConsumerConfig()); + super("KafkaConsumerExample", false); + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + consumer = new KafkaConsumer<>(props); this.topic = topic; } - private static ConsumerConfig createConsumerConfig() - { - Properties props = new Properties(); - props.put("zookeeper.connect", KafkaProperties.zkConnect); - props.put("group.id", KafkaProperties.groupId); - props.put("zookeeper.session.timeout.ms", "400"); - props.put("zookeeper.sync.time.ms", "200"); - props.put("auto.commit.interval.ms", "1000"); - - return new ConsumerConfig(props); - - } - - public void run() { - Map topicCountMap = new HashMap(); - topicCountMap.put(topic, 1); - Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaStream stream = consumerMap.get(topic).get(0); - for (MessageAndMetadata messageAndMetadata : stream) { - System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() + - ", " + - "" + new String(messageAndMetadata.message()) + ")"); + @Override + public void doWork() { + consumer.subscribe(Collections.singletonList(this.topic)); + ConsumerRecords records = consumer.poll(1000); + for (ConsumerRecord record : records) { + System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } -} + + @Override + public String name() { + return null; + } + + @Override + public boolean isInterruptible() { + return false; + } +} \ No newline at end of file