diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index 3fa1e4ba041..b23e567f848 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -20,58 +20,65 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; + +import static java.time.Duration.ofMillis; +import static java.util.Collections.emptyList; +import static java.util.Collections.singleton; /** - * A demo class for how to write a customized EOS app. It takes a consume-process-produce loop. - * Important configurations and APIs are commented. + * This class implements a read-process-write application. */ -public class ExactlyOnceMessageProcessor extends Thread { - - private static final boolean READ_COMMITTED = true; - +public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener { + private final String bootstrapServers; private final String inputTopic; private final String outputTopic; - private final String transactionalId; private final String groupInstanceId; + private final CountDownLatch latch; + private final String transactionalId; + private volatile boolean closed; private final KafkaProducer producer; private final KafkaConsumer consumer; - private final CountDownLatch latch; - - public ExactlyOnceMessageProcessor(final String inputTopic, - final String outputTopic, - final int instanceIdx, - final CountDownLatch latch) { + public ExactlyOnceMessageProcessor(String threadName, + String bootstrapServers, + String inputTopic, + String outputTopic, + CountDownLatch latch) { + super(threadName); + this.bootstrapServers = bootstrapServers; this.inputTopic = inputTopic; this.outputTopic = outputTopic; - this.transactionalId = "Processor-" + instanceIdx; + this.transactionalId = "tid-" + threadName; // It is recommended to have a relatively short txn timeout in order to clear pending offsets faster. - final int transactionTimeoutMs = 10000; + int transactionTimeoutMs = 10_000; // A unique transactional.id must be provided in order to properly use EOS. producer = new Producer( "processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null) .createKafkaProducer(); // Consumer must be in read_committed mode, which means it won't be able to read uncommitted data. // Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances. - this.groupInstanceId = "Txn-consumer-" + instanceIdx; + this.groupInstanceId = "giid-" + threadName; boolean readCommitted = true; consumer = new Consumer( "processor-consumer", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null) @@ -81,70 +88,95 @@ public class ExactlyOnceMessageProcessor extends Thread { @Override public void run() { - // Init transactions call should always happen first in order to clear zombie transactions from previous generation. - producer.initTransactions(); + int processedRecords = 0; + long remainingRecords = Long.MAX_VALUE; + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster + int transactionTimeoutMs = 10_000; + // consumer must be in read_committed mode, which means it won't be able to read uncommitted data + boolean readCommitted = true; + try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, + true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); + KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, + "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { + // called first and once to fence zombies and abort any pending transaction + producer.initTransactions(); - final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE); + consumer.subscribe(singleton(inputTopic), this); - consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() { - @Override - public void onPartitionsRevoked(Collection partitions) { - printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions); - } + Utils.printOut("Processing new records"); + while (!closed && remainingRecords > 0) { + try { + ConsumerRecords records = consumer.poll(ofMillis(200)); + if (!records.isEmpty()) { + // begin a new transaction session + producer.beginTransaction(); - @Override - public void onPartitionsAssigned(Collection partitions) { - printWithTxnId("Received partition assignment after rebalancing: " + partitions); - messageRemaining.set(messagesRemaining(consumer)); - } - }); + for (ConsumerRecord record : records) { + // process the record and send downstream + ProducerRecord newRecord = + new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok"); + producer.send(newRecord); + } - int messageProcessed = 0; - while (messageRemaining.get() > 0) { - try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); - if (records.count() > 0) { - // Begin a new transaction session. - producer.beginTransaction(); - for (ConsumerRecord record : records) { - // Process the record and send to downstream. - ProducerRecord customizedRecord = transform(record); - producer.send(customizedRecord); + // checkpoint the progress by sending offsets to group coordinator broker + // note that this API is only available for broker >= 2.5 + producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); + + // commit the transaction including offsets + producer.commitTransaction(); + processedRecords += records.count(); } - - Map offsets = consumerOffsets(); - - // Checkpoint the progress by sending offsets to group coordinator broker. - // Note that this API is only available for broker >= 2.5. - producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); - - // Finish the transaction. All sent records should be visible for consumption now. - producer.commitTransaction(); - messageProcessed += records.count(); + } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException + | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { + // we can't recover from these exceptions + Utils.printErr(e.getMessage()); + shutdown(); + } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) { + // invalid or no offset found without auto.reset.policy + Utils.printOut("Invalid or no offset found, using latest"); + consumer.seekToEnd(emptyList()); + consumer.commitSync(); + } catch (KafkaException e) { + // abort the transaction and try to continue + Utils.printOut("Aborting transaction: %s", e); + producer.abortTransaction(); + } + remainingRecords = getRemainingRecords(consumer); + if (remainingRecords != Long.MAX_VALUE) { + Utils.printOut("Remaining records: %d", remainingRecords); } - } catch (ProducerFencedException e) { - throw new KafkaException(String.format("The transactional.id %s has been claimed by another process", transactionalId)); - } catch (FencedInstanceIdException e) { - throw new KafkaException(String.format("The group.instance.id %s has been claimed by another process", groupInstanceId)); - } catch (KafkaException e) { - // If we have not been fenced, try to abort the transaction and continue. This will raise immediately - // if the producer has hit a fatal error. - producer.abortTransaction(); - - // The consumer fetch position needs to be restored to the committed offset - // before the transaction started. - resetToLastCommittedPositions(consumer); } - - messageRemaining.set(messagesRemaining(consumer)); - printWithTxnId("Message remaining: " + messageRemaining); + } catch (Throwable e) { + Utils.printOut("Unhandled exception"); + e.printStackTrace(); } - - printWithTxnId("Finished processing " + messageProcessed + " records"); - latch.countDown(); + Utils.printOut("Processed %d records", processedRecords); + shutdown(); } - private Map consumerOffsets() { + @Override + public void onPartitionsRevoked(Collection partitions) { + Utils.printOut("Revoked partitions: %s", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + Utils.printOut("Assigned partitions: %s", partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) { + Utils.printOut("Lost partitions: %s", partitions); + } + + public void shutdown() { + if (!closed) { + closed = true; + latch.countDown(); + } + } + + private Map getOffsetsToCommit(KafkaConsumer consumer) { Map offsets = new HashMap<>(); for (TopicPartition topicPartition : consumer.assignment()) { offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null)); @@ -152,40 +184,19 @@ public class ExactlyOnceMessageProcessor extends Thread { return offsets; } - private void printWithTxnId(final String message) { - System.out.println(transactionalId + ": " + message); - } - - private ProducerRecord transform(final ConsumerRecord record) { - printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")"); - return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value()); - } - - private long messagesRemaining(final KafkaConsumer consumer) { + private long getRemainingRecords(KafkaConsumer consumer) { final Map fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment())); - // If we couldn't detect any end offset, that means we are still not able to fetch offsets. + // if we can't detect any end offset, that means we are still not able to fetch offsets if (fullEndOffsets.isEmpty()) { return Long.MAX_VALUE; } - return consumer.assignment().stream().mapToLong(partition -> { long currentPosition = consumer.position(partition); - printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets); if (fullEndOffsets.containsKey(partition)) { return fullEndOffsets.get(partition) - currentPosition; + } else { + return 0; } - return 0; }).sum(); } - - private static void resetToLastCommittedPositions(KafkaConsumer consumer) { - final Map committed = consumer.committed(consumer.assignment()); - consumer.assignment().forEach(tp -> { - OffsetAndMetadata offsetAndMetadata = committed.get(tp); - if (offsetAndMetadata != null) - consumer.seek(tp, offsetAndMetadata.offset()); - else - consumer.seekToBeginning(Collections.singleton(tp)); - }); - } } diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java index af0cfce3dd3..1a2cfcb8a24 100644 --- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * This exactly once demo driver takes 3 arguments: @@ -71,7 +73,7 @@ import java.util.concurrent.TimeUnit; * {@link org.apache.kafka.common.errors.UnsupportedVersionException}. */ public class KafkaExactlyOnceDemo { - + public static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; @@ -102,11 +104,12 @@ public class KafkaExactlyOnceDemo { CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances); /* Stage 3: transactionally process all messages */ - for (int instanceIdx = 0; instanceIdx < numInstances; instanceIdx++) { - ExactlyOnceMessageProcessor messageProcessor = new ExactlyOnceMessageProcessor( - INPUT_TOPIC, OUTPUT_TOPIC, instanceIdx, transactionalCopyLatch); - messageProcessor.start(); - } + CountDownLatch processorsLatch = new CountDownLatch(numInstances); + List processors = IntStream.range(0, numInstances) + .mapToObj(id -> new ExactlyOnceMessageProcessor( + "processor-" + id, BOOTSTRAP_SERVERS, INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)) + .collect(Collectors.toList()); + processors.forEach(ExactlyOnceMessageProcessor::start); if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) { throw new TimeoutException("Timeout after 5 minutes waiting for transactionally message copy");