KAFKA-14753: Improve kafka producer example (#13354)

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Philip Nee 2023-03-07 16:25:49 -08:00 committed by GitHub
parent b19ae7857b
commit 4527e54647
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 21 deletions

View File

@ -27,7 +27,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Demo producer that demonstrate two modes of KafkaProducer.
* If the user uses the Async mode: The messages will be printed to stdout upon successful completion
* If the user uses the sync mode (isAsync = false): Each send loop will block until completion.
*/
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
@ -54,8 +60,8 @@ public class Producer extends Thread {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
this.numRecords = numRecords;
@ -70,28 +76,45 @@ public class Producer extends Thread {
public void run() {
int messageKey = 0;
int recordsSent = 0;
while (recordsSent < numRecords) {
String messageStr = "Message_" + messageKey;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr), new DemoCallBack(startTime, messageKey, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr)).get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
try {
while (recordsSent < numRecords) {
final long currentTimeMs = System.currentTimeMillis();
produceOnce(messageKey, recordsSent, currentTimeMs);
messageKey += 2;
recordsSent += 1;
}
messageKey += 2;
recordsSent += 1;
} catch (Exception e) {
System.out.println("Producer encountered exception:" + e);
} finally {
System.out.println("Producer sent " + numRecords + " records successfully");
this.producer.close();
latch.countDown();
}
System.out.println("Producer sent " + numRecords + " records successfully");
latch.countDown();
}
private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException {
String messageStr = "Message_" + messageKey;
if (isAsync) { // Send asynchronously
sendAsync(messageKey, messageStr, currentTimeMs);
return;
}
Future<RecordMetadata> future = send(messageKey, messageStr);
future.get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
}
private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) {
this.producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr),
new DemoCallBack(currentTimeMs, messageKey, messageStr));
}
private Future<RecordMetadata> send(final int messageKey, final String messageStr) {
return producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr));
}
}