mirror of https://github.com/apache/kafka.git
KAFKA-2487: change kafka.examples.Consumer to use the new java consumer
Author: Ashish Singh <asingh@cloudera.com> Reviewers: Guozhang Wang Closes #297 from SinghAsDev/KAFKA-2487
This commit is contained in:
parent
50a076d1e9
commit
5338f8432f
|
@ -16,52 +16,52 @@
|
|||
*/
|
||||
package kafka.examples;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import kafka.message.MessageAndMetadata;
|
||||
|
||||
import kafka.utils.ShutdownableThread;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
public class Consumer extends Thread
|
||||
public class Consumer extends ShutdownableThread
|
||||
{
|
||||
private final ConsumerConnector consumer;
|
||||
private final KafkaConsumer<Integer, String> consumer;
|
||||
private final String topic;
|
||||
|
||||
|
||||
public Consumer(String topic)
|
||||
{
|
||||
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
|
||||
createConsumerConfig());
|
||||
super("KafkaConsumerExample", false);
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
||||
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
||||
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
consumer = new KafkaConsumer<>(props);
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
private static ConsumerConfig createConsumerConfig()
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.put("zookeeper.connect", KafkaProperties.zkConnect);
|
||||
props.put("group.id", KafkaProperties.groupId);
|
||||
props.put("zookeeper.session.timeout.ms", "400");
|
||||
props.put("zookeeper.sync.time.ms", "200");
|
||||
props.put("auto.commit.interval.ms", "1000");
|
||||
|
||||
return new ConsumerConfig(props);
|
||||
|
||||
}
|
||||
|
||||
public void run() {
|
||||
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
|
||||
topicCountMap.put(topic, 1);
|
||||
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
|
||||
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
|
||||
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
|
||||
System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() +
|
||||
", " +
|
||||
"" + new String(messageAndMetadata.message()) + ")");
|
||||
@Override
|
||||
public void doWork() {
|
||||
consumer.subscribe(Collections.singletonList(this.topic));
|
||||
ConsumerRecords<Integer, String> records = consumer.poll(1000);
|
||||
for (ConsumerRecord<Integer, String> record : records) {
|
||||
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInterruptible() {
|
||||
return false;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue