From e3ab96b2f0b429e0fe5991a185cd980b0d490e25 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Mon, 22 Feb 2016 11:40:33 -0800 Subject: [PATCH] MINOR: Example style improvements These are minor, but no reason to make our example code look worse than it has to. Author: Ismael Juma Reviewers: Grant Henke Closes #940 from ijuma/example-style-improvements --- .../examples/KafkaConsumerProducerDemo.java | 4 ++-- .../java/kafka/examples/KafkaProperties.java | 23 +++++++++---------- .../main/java/kafka/examples/Producer.java | 16 ++++++------- .../kafka/examples/SimpleConsumerDemo.java | 2 +- 4 files changed, 21 insertions(+), 24 deletions(-) diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index e732d5c1d77..414a6f7deea 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -16,9 +16,9 @@ */ package kafka.examples; -public class KafkaConsumerProducerDemo implements KafkaProperties { +public class KafkaConsumerProducerDemo { public static void main(String[] args) { - final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true; + boolean isAsync = args.length == 0 || !args[0].trim().toLowerCase().equals("sync"); Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync); producerThread.start(); diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index b57e1bdc4d6..853d6bf5358 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -16,16 +16,15 @@ */ package kafka.examples; -public interface KafkaProperties { - String ZK_CONNECT = "127.0.0.1:2181"; - String GROUP_ID = "group1"; - String TOPIC = "topic1"; - String KAFKA_SERVER_URL = "localhost"; - int KAFKA_SERVER_PORT = 9092; - int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; - int CONNECTION_TIMEOUT = 100000; - int RECONNECT_INTERVAL = 10000; - String TOPIC2 = "topic2"; - String TOPIC3 = "topic3"; - String CLIENT_ID = "SimpleConsumerDemoClient"; +public class KafkaProperties { + public static final String TOPIC = "topic1"; + public static final String KAFKA_SERVER_URL = "localhost"; + public static final int KAFKA_SERVER_PORT = 9092; + public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; + public static final int CONNECTION_TIMEOUT = 100000; + public static final String TOPIC2 = "topic2"; + public static final String TOPIC3 = "topic3"; + public static final String CLIENT_ID = "SimpleConsumerDemoClient"; + + private KafkaProperties() {} } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 393bf1e6ec1..b83c0294617 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -35,7 +35,7 @@ public class Producer extends Thread { props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producer = new KafkaProducer(props); + producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; } @@ -46,18 +46,16 @@ public class Producer extends Thread { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously - producer.send(new ProducerRecord(topic, + producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { - producer.send(new ProducerRecord(topic, + producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } @@ -68,9 +66,9 @@ public class Producer extends Thread { class DemoCallBack implements Callback { - private long startTime; - private int key; - private String message; + private final long startTime; + private final int key; + private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index 1c568674526..7eef51ebcfd 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -71,7 +71,7 @@ public class SimpleConsumerDemo { printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0)); System.out.println("Testing single multi-fetch"); - Map> topicMap = new HashMap>(); + Map> topicMap = new HashMap<>(); topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0)); topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0)); req = new FetchRequestBuilder()