mirror of https://github.com/apache/kafka.git
kafka-1982; change kafka.examples.Producer to use the new java producer; patched by Ashish Singh; reviewed by Gwen Shapira, Mayuresh Gharat and Jun Rao
This commit is contained in:
parent
185eb9b59a
commit
5408931a29
|
@ -54,6 +54,26 @@ public class SerializationTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIntegerSerializer() {
|
||||
Integer[] integers = new Integer[]{
|
||||
423412424,
|
||||
-41243432
|
||||
};
|
||||
String mytopic = "testTopic";
|
||||
|
||||
Serializer<Integer> serializer = new IntegerSerializer();
|
||||
Deserializer<Integer> deserializer = new IntegerDeserializer();
|
||||
|
||||
for (Integer integer : integers) {
|
||||
assertEquals("Should get the original integer after serialization and deserialization",
|
||||
integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic, integer)));
|
||||
}
|
||||
|
||||
assertEquals("Should support null in serialization and deserialization",
|
||||
null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null)));
|
||||
}
|
||||
|
||||
private SerDeser<String> getStringSerDeser(String encoder) {
|
||||
Map<String, Object> serializerConfigs = new HashMap<String, Object>();
|
||||
serializerConfigs.put("key.serializer.encoding", encoder);
|
||||
|
|
|
@ -3,6 +3,7 @@ This directory contains examples of client code that uses kafka.
|
|||
To run the demo:
|
||||
|
||||
1. Start Zookeeper and the Kafka server
|
||||
2. For simple consumer demo, run bin/java-simple-consumer-demo.sh
|
||||
3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh
|
||||
2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh`
|
||||
3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync`
|
||||
4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh`
|
||||
|
||||
|
|
|
@ -17,14 +17,15 @@
|
|||
package kafka.examples;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.ConsumerIterator;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import kafka.message.MessageAndMetadata;
|
||||
|
||||
|
||||
public class Consumer extends Thread
|
||||
|
@ -54,11 +55,13 @@ public class Consumer extends Thread
|
|||
|
||||
public void run() {
|
||||
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
|
||||
topicCountMap.put(topic, new Integer(1));
|
||||
topicCountMap.put(topic, 1);
|
||||
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
|
||||
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
|
||||
ConsumerIterator<byte[], byte[]> it = stream.iterator();
|
||||
while(it.hasNext())
|
||||
System.out.println(new String(it.next().message()));
|
||||
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
|
||||
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream) {
|
||||
System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() +
|
||||
", " +
|
||||
"" + new String(messageAndMetadata.message()) + ")");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,8 @@ public class KafkaConsumerProducerDemo implements KafkaProperties
|
|||
{
|
||||
public static void main(String[] args)
|
||||
{
|
||||
Producer producerThread = new Producer(KafkaProperties.topic);
|
||||
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
|
||||
Producer producerThread = new Producer(KafkaProperties.topic, isAsync);
|
||||
producerThread.start();
|
||||
|
||||
Consumer consumerThread = new Consumer(KafkaProperties.topic);
|
||||
|
|
|
@ -18,33 +18,88 @@ package kafka.examples;
|
|||
|
||||
|
||||
import java.util.Properties;
|
||||
import kafka.producer.KeyedMessage;
|
||||
import kafka.producer.ProducerConfig;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
public class Producer extends Thread
|
||||
{
|
||||
private final kafka.javaapi.producer.Producer<Integer, String> producer;
|
||||
private final KafkaProducer<Integer, String> producer;
|
||||
private final String topic;
|
||||
private final Properties props = new Properties();
|
||||
private final Boolean isAsync;
|
||||
|
||||
public Producer(String topic)
|
||||
public Producer(String topic, Boolean isAsync)
|
||||
{
|
||||
props.put("serializer.class", "kafka.serializer.StringEncoder");
|
||||
props.put("metadata.broker.list", "localhost:9092");
|
||||
// Use random partitioner. Don't need the key type. Just set it to Integer.
|
||||
// The message is of type String.
|
||||
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("client.id", "DemoProducer");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
producer = new KafkaProducer<Integer, String>(props);
|
||||
this.topic = topic;
|
||||
this.isAsync = isAsync;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
int messageNo = 1;
|
||||
while(true)
|
||||
{
|
||||
String messageStr = new String("Message_" + messageNo);
|
||||
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
|
||||
messageNo++;
|
||||
String messageStr = "Message_" + messageNo;
|
||||
long startTime = System.currentTimeMillis();
|
||||
if (isAsync) { // Send asynchronously
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
|
||||
} else { // Send synchronously
|
||||
try {
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr)).get();
|
||||
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
++messageNo;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DemoCallBack implements Callback {
|
||||
|
||||
private long startTime;
|
||||
private int key;
|
||||
private String message;
|
||||
|
||||
public DemoCallBack(long startTime, int key, String message) {
|
||||
this.startTime = startTime;
|
||||
this.key = key;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
/**
|
||||
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
|
||||
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
|
||||
* non-null.
|
||||
*
|
||||
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
|
||||
* occurred.
|
||||
* @param exception The exception thrown during processing of this record. Null if no error occurred.
|
||||
*/
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (metadata != null) {
|
||||
System.out.println(
|
||||
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
|
||||
"), " +
|
||||
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
|
||||
} else {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -42,9 +42,9 @@ public class SimpleConsumerDemo {
|
|||
}
|
||||
|
||||
private static void generateData() {
|
||||
Producer producer2 = new Producer(KafkaProperties.topic2);
|
||||
Producer producer2 = new Producer(KafkaProperties.topic2, false);
|
||||
producer2.start();
|
||||
Producer producer3 = new Producer(KafkaProperties.topic3);
|
||||
Producer producer3 = new Producer(KafkaProperties.topic3, false);
|
||||
producer3.start();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
|
|
Loading…
Reference in New Issue