diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 6de6ab99d26..3fa1e4ba041 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -66,7 +66,9 @@ public class ExactlyOnceMessageProcessor extends Thread { // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster. final int transactionTimeoutMs = 10000; // A unique transactional.id must be provided in order to properly use EOS. - producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get(); + producer = new Producer( + "processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null) + .createKafkaProducer(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. this.groupInstanceId = "Txn-consumer-" + instanceIdx; diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 4087367f899..ff44efe492e 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -26,7 +26,8 @@ public class KafkaConsumerProducerDemo { public static void main(String[] args) throws InterruptedException { boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); CountDownLatch latch = new CountDownLatch(2); - Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); + Producer producerThread = new Producer( + "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 10000, -1, latch); producerThread.start(); Consumer consumerThread = new Consumer( diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index 03ee8e25498..af0cfce3dd3 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -91,7 +91,8 @@ public class KafkaExactlyOnceDemo { CountDownLatch prePopulateLatch = new CountDownLatch(1); /* Stage 2: pre-populate records */ - Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch); + Producer producerThread = new Producer( + "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, -1, prePopulateLatch); producerThread.start(); if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) { diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index e85fa16060e..36a2583954c 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -21,133 +21,165 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { - private final KafkaProducer producer; + private final String bootstrapServers; private final String topic; - private final Boolean isAsync; - private int numRecords; + private final boolean isAsync; + private final String transactionalId; + private final boolean enableIdempotency; + private final int numRecords; + private final int transactionTimeoutMs; private final CountDownLatch latch; + private volatile boolean closed; - public Producer(final String topic, - final Boolean isAsync, - final String transactionalId, - final boolean enableIdempotency, - final int numRecords, - final int transactionTimeoutMs, - final CountDownLatch latch) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - if (transactionTimeoutMs > 0) { - props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); - } - if (transactionalId != null) { - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - } - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); - producer = new KafkaProducer<>(props); - + public Producer(String threadName, + String bootstrapServers, + String topic, + boolean isAsync, + String transactionalId, + boolean enableIdempotency, + int numRecords, + int transactionTimeoutMs, + CountDownLatch latch) { + super(threadName); + this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; + this.transactionalId = transactionalId; + this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; + this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } - KafkaProducer get() { - return producer; - } - @Override public void run() { - int messageKey = 0; - int recordsSent = 0; - try { - while (recordsSent < numRecords) { - final long currentTimeMs = System.currentTimeMillis(); - produceOnce(messageKey, recordsSent, currentTimeMs); - messageKey += 2; - recordsSent += 1; + int key = 0; + int sentRecords = 0; + // the producer instance is thread safe + try (KafkaProducer producer = createKafkaProducer()) { + while (!closed && sentRecords < numRecords) { + if (isAsync) { + asyncSend(producer, key, "test" + key); + } else { + syncSend(producer, key, "test" + key); + } + key++; + sentRecords++; } - } catch (Exception e) { - System.out.println("Producer encountered exception:" + e); - } finally { - System.out.println("Producer sent " + numRecords + " records successfully"); - this.producer.close(); + } catch (Throwable e) { + Utils.printOut("Unhandled exception"); + e.printStackTrace(); + } + Utils.printOut("Sent %d records", sentRecords); + shutdown(); + } + + public void shutdown() { + if (!closed) { + closed = true; latch.countDown(); } } - private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException { - String messageStr = "Message_" + messageKey; - - if (isAsync) { // Send asynchronously - sendAsync(messageKey, messageStr, currentTimeMs); - return; + public KafkaProducer createKafkaProducer() { + Properties props = new Properties(); + // bootstrap server config is required for producer to connect to brokers + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // client id is not required, but it's good to track the source of requests beyond just ip/port + // by allowing a logical application name to be included in server-side request logging + props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); + // key and value are just byte arrays, so we need to set appropriate serializers + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + if (transactionTimeoutMs > 0) { + // max time before the transaction coordinator proactively aborts the ongoing transaction + props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); } - Future future = send(messageKey, messageStr); - future.get(); - System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); + if (transactionalId != null) { + // the transactional id must be static and unique + // it is used to identify the same producer instance across process restarts + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } + // enable duplicates protection at the partition level + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); + return new KafkaProducer<>(props); } - private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) { - this.producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr), - new DemoCallBack(currentTimeMs, messageKey, messageStr)); + private void asyncSend(KafkaProducer producer, int key, String value) { + // send the record asynchronously, setting a callback to be notified of the result + // note that, even if you set a small batch.size with linger.ms=0, the send operation + // will still be blocked when buffer.memory is full or metadata are not available + producer.send(new ProducerRecord<>(topic, key, value), new ProducerCallback(key, value)); } - private Future send(final int messageKey, final String messageStr) { - return producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr)); - } -} - -class DemoCallBack implements Callback { - - private final long startTime; - private final int key; - private final String message; - - public DemoCallBack(long startTime, int key, String message) { - this.startTime = startTime; - this.key = key; - this.message = message; + private RecordMetadata syncSend(KafkaProducer producer, int key, String value) + throws ExecutionException, InterruptedException { + try { + // send the record and then call get, which blocks waiting for the ack from the broker + RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get(); + Utils.maybePrintRecord(numRecords, key, value, metadata); + return metadata; + } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { + Utils.printErr(e.getMessage()); + // we can't recover from these exceptions + shutdown(); + } catch (KafkaException e) { + Utils.printErr(e.getMessage()); + } + return null; } - /** - * A callback method the user can implement to provide asynchronous handling of request completion. This method will - * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, - * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. - * - * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata - * with -1 value for all fields except for topicPartition will be returned if an error occurred. - * @param exception The exception thrown during processing of this record. Null if no error occurred. - */ - public void onCompletion(RecordMetadata metadata, Exception exception) { - long elapsedTime = System.currentTimeMillis() - startTime; - if (metadata != null) { - System.out.println( - "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + - "), " + - "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); - } else { - exception.printStackTrace(); + class ProducerCallback implements Callback { + private final int key; + private final String value; + + public ProducerCallback(int key, String value) { + this.key = key; + this.value = value; + } + + /** + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. When exception is not null in the callback, + * metadata will contain the special -1 value for all fields except for topicPartition, which will be valid. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). An empty metadata + * with -1 value for all fields except for topicPartition will be returned if an error occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. + */ + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + Utils.printErr(exception.getMessage()); + if (!(exception instanceof RetriableException)) { + // we can't recover from these exceptions + shutdown(); + } + } else { + Utils.maybePrintRecord(numRecords, key, value, metadata); + } } } }