MINOR: Example style improvements

These are minor, but no reason to make our example code look worse than it has to.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Grant Henke

Closes #940 from ijuma/example-style-improvements
This commit is contained in:
Ismael Juma 2016-02-22 11:40:33 -08:00 committed by Gwen Shapira
parent d2527af99a
commit e3ab96b2f0
4 changed files with 21 additions and 24 deletions

View File

@ -16,9 +16,9 @@
*/
package kafka.examples;
public class KafkaConsumerProducerDemo implements KafkaProperties {
public class KafkaConsumerProducerDemo {
public static void main(String[] args) {
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
boolean isAsync = args.length == 0 || !args[0].trim().toLowerCase().equals("sync");
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
producerThread.start();

View File

@ -16,16 +16,15 @@
*/
package kafka.examples;
public interface KafkaProperties {
String ZK_CONNECT = "127.0.0.1:2181";
String GROUP_ID = "group1";
String TOPIC = "topic1";
String KAFKA_SERVER_URL = "localhost";
int KAFKA_SERVER_PORT = 9092;
int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
int CONNECTION_TIMEOUT = 100000;
int RECONNECT_INTERVAL = 10000;
String TOPIC2 = "topic2";
String TOPIC3 = "topic3";
String CLIENT_ID = "SimpleConsumerDemoClient";
public class KafkaProperties {
public static final String TOPIC = "topic1";
public static final String KAFKA_SERVER_URL = "localhost";
public static final int KAFKA_SERVER_PORT = 9092;
public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
public static final int CONNECTION_TIMEOUT = 100000;
public static final String TOPIC2 = "topic2";
public static final String TOPIC3 = "topic3";
public static final String CLIENT_ID = "SimpleConsumerDemoClient";
private KafkaProperties() {}
}

View File

@ -35,7 +35,7 @@ public class Producer extends Thread {
props.put("client.id", "DemoProducer");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<Integer, String>(props);
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
@ -46,18 +46,16 @@ public class Producer extends Thread {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<Integer, String>(topic,
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<Integer, String>(topic,
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr)).get();
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@ -68,9 +66,9 @@ public class Producer extends Thread {
class DemoCallBack implements Callback {
private long startTime;
private int key;
private String message;
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;

View File

@ -71,7 +71,7 @@ public class SimpleConsumerDemo {
printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));
System.out.println("Testing single multi-fetch");
Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
Map<String, List<Integer>> topicMap = new HashMap<>();
topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
req = new FetchRequestBuilder()