KAFKA-14752: Kafka examples improvements - processor changes (#13516)

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Federico Valeri 2023-05-11 11:19:32 +02:00 committed by GitHub
parent a263627adb
commit ee41328635
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 104 deletions

View File

@ -20,58 +20,65 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import static java.time.Duration.ofMillis;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
/**
* A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
* Important configurations and APIs are commented.
* This class implements a read-process-write application.
*/
public class ExactlyOnceMessageProcessor extends Thread {
private static final boolean READ_COMMITTED = true;
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener {
private final String bootstrapServers;
private final String inputTopic;
private final String outputTopic;
private final String transactionalId;
private final String groupInstanceId;
private final CountDownLatch latch;
private final String transactionalId;
private volatile boolean closed;
private final KafkaProducer<Integer, String> producer;
private final KafkaConsumer<Integer, String> consumer;
private final CountDownLatch latch;
public ExactlyOnceMessageProcessor(final String inputTopic,
final String outputTopic,
final int instanceIdx,
final CountDownLatch latch) {
public ExactlyOnceMessageProcessor(String threadName,
String bootstrapServers,
String inputTopic,
String outputTopic,
CountDownLatch latch) {
super(threadName);
this.bootstrapServers = bootstrapServers;
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.transactionalId = "Processor-" + instanceIdx;
this.transactionalId = "tid-" + threadName;
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
int transactionTimeoutMs = 10_000;
// A unique transactional.id must be provided in order to properly use EOS.
producer = new Producer(
"processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null)
.createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
this.groupInstanceId = "giid-" + threadName;
boolean readCommitted = true;
consumer = new Consumer(
"processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null)
@ -81,70 +88,95 @@ public class ExactlyOnceMessageProcessor extends Thread {
@Override
public void run() {
// Init transactions call should always happen first in order to clear zombie transactions from previous generation.
producer.initTransactions();
int processedRecords = 0;
long remainingRecords = Long.MAX_VALUE;
// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;
try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
// called first and once to fence zombies and abort any pending transaction
producer.initTransactions();
final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
consumer.subscribe(singleton(inputTopic), this);
consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
}
Utils.printOut("Processing new records");
while (!closed && remainingRecords > 0) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
if (!records.isEmpty()) {
// begin a new transaction session
producer.beginTransaction();
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printWithTxnId("Received partition assignment after rebalancing: " + partitions);
messageRemaining.set(messagesRemaining(consumer));
}
});
for (ConsumerRecord<Integer, String> record : records) {
// process the record and send downstream
ProducerRecord<Integer, String> newRecord =
new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
producer.send(newRecord);
}
int messageProcessed = 0;
while (messageRemaining.get() > 0) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
// Begin a new transaction session.
producer.beginTransaction();
for (ConsumerRecord<Integer, String> record : records) {
// Process the record and send to downstream.
ProducerRecord<Integer, String> customizedRecord = transform(record);
producer.send(customizedRecord);
// checkpoint the progress by sending offsets to group coordinator broker
// note that this API is only available for broker >= 2.5
producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());
// commit the transaction including offsets
producer.commitTransaction();
processedRecords += records.count();
}
Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
// Checkpoint the progress by sending offsets to group coordinator broker.
// Note that this API is only available for broker >= 2.5.
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
// Finish the transaction. All sent records should be visible for consumption now.
producer.commitTransaction();
messageProcessed += records.count();
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
// we can't recover from these exceptions
Utils.printErr(e.getMessage());
shutdown();
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
// invalid or no offset found without auto.reset.policy
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(emptyList());
consumer.commitSync();
} catch (KafkaException e) {
// abort the transaction and try to continue
Utils.printOut("Aborting transaction: %s", e);
producer.abortTransaction();
}
remainingRecords = getRemainingRecords(consumer);
if (remainingRecords != Long.MAX_VALUE) {
Utils.printOut("Remaining records: %d", remainingRecords);
}
} catch (ProducerFencedException e) {
throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId));
} catch (FencedInstanceIdException e) {
throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId));
} catch (KafkaException e) {
// If we have not been fenced, try to abort the transaction and continue. This will raise immediately
// if the producer has hit a fatal error.
producer.abortTransaction();
// The consumer fetch position needs to be restored to the committed offset
// before the transaction started.
resetToLastCommittedPositions(consumer);
}
messageRemaining.set(messagesRemaining(consumer));
printWithTxnId("Message remaining: " + messageRemaining);
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
e.printStackTrace();
}
printWithTxnId("Finished processing " + messageProcessed + " records");
latch.countDown();
Utils.printOut("Processed %d records", processedRecords);
shutdown();
}
private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Utils.printOut("Revoked partitions: %s", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Utils.printOut("Assigned partitions: %s", partitions);
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
Utils.printOut("Lost partitions: %s", partitions);
}
public void shutdown() {
if (!closed) {
closed = true;
latch.countDown();
}
}
private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
@ -152,40 +184,19 @@ public class ExactlyOnceMessageProcessor extends Thread {
return offsets;
}
private void printWithTxnId(final String message) {
System.out.println(transactionalId + ": " + message);
}
private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
}
private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
// If we couldn't detect any end offset, that means we are still not able to fetch offsets.
// if we can't detect any end offset, that means we are still not able to fetch offsets
if (fullEndOffsets.isEmpty()) {
return Long.MAX_VALUE;
}
return consumer.assignment().stream().mapToLong(partition -> {
long currentPosition = consumer.position(partition);
printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
if (fullEndOffsets.containsKey(partition)) {
return fullEndOffsets.get(partition) - currentPosition;
} else {
return 0;
}
return 0;
}).sum();
}
private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
consumer.assignment().forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = committed.get(tp);
if (offsetAndMetadata != null)
consumer.seek(tp, offsetAndMetadata.offset());
else
consumer.seekToBeginning(Collections.singleton(tp));
});
}
}

View File

@ -31,6 +31,8 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* This exactly once demo driver takes 3 arguments:
@ -71,7 +73,7 @@ import java.util.concurrent.TimeUnit;
* {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
*/
public class KafkaExactlyOnceDemo {
public static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String INPUT_TOPIC = "input-topic";
private static final String OUTPUT_TOPIC = "output-topic";
@ -102,11 +104,12 @@ public class KafkaExactlyOnceDemo {
CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
/* Stage 3: transactionally process all messages */
for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) {
ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor(
INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch);
messageProcessor.start();
}
CountDownLatch processorsLatch = new CountDownLatch(numInstances);
List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances)
.mapToObj(id -> new ExactlyOnceMessageProcessor(
"processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch))
.collect(Collectors.toList());
processors.forEach(ExactlyOnceMessageProcessor::start);
if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");