From 4cb20379c7abc07bb18075674553abc11954f286 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 27 Mar 2024 09:31:27 +0100 Subject: [PATCH] MINOR: Add retry mechanism to EOS example (#15561) In the initial EOS example, a retry logic was implemented within the resetToLastCommittedPositions method. During refactoring, this logic was removed becasue a poison pill prevented the example from reaching the final phase of consuming from the output topic. In this change, I suggest to add it back, but with a retry limit defined as MAX_RETRIES. Once this limit is reached, the problematic batch will be logged and skipped, allowing the processor to move on and process remaining records. If some records are skipped, the example will still hit the hard timeout (2 minutes), but after consuming all processed records. Reviewers: Luke Chen --- .../main/java/kafka/examples/Consumer.java | 2 +- .../examples/ExactlyOnceMessageProcessor.java | 55 +++++++++++++++++-- .../main/java/kafka/examples/Producer.java | 2 +- 3 files changed, 53 insertions(+), 6 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index bd652e0a32e..aa971812f87 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -109,7 +109,7 @@ public class Consumer extends Thread implements ConsumerRebalanceListener { } } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Fetched %d records", numRecords - remainingRecords); diff --git a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java index cf12cc9f158..15488c9c47d 100644 --- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java +++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -49,6 +50,8 @@ import static java.util.Collections.singleton; * This class implements a read-process-write application. */ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable { + private static final int MAX_RETRIES = 5; + private final String bootstrapServers; private final String inputTopic; private final String outputTopic; @@ -103,19 +106,21 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal @Override public void run() { + int retries = 0; int processedRecords = 0; long remainingRecords = Long.MAX_VALUE; + // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster int transactionTimeoutMs = 10_000; // consumer must be in read_committed mode, which means it won't be able to read uncommitted data boolean readCommitted = true; + try (KafkaProducer producer = new Producer("processor-producer", bootstrapServers, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer(); KafkaConsumer consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) { // called first and once to fence zombies and abort any pending transaction producer.initTransactions(); - consumer.subscribe(singleton(inputTopic), this); Utils.printOut("Processing new records"); @@ -140,6 +145,7 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal // commit the transaction including offsets producer.commitTransaction(); processedRecords += records.count(); + retries = 0; } } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) { @@ -151,18 +157,21 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal Utils.printOut("Invalid or no offset found, using latest"); consumer.seekToEnd(emptyList()); consumer.commitSync(); + retries = 0; } catch (KafkaException e) { - // abort the transaction and try to continue - Utils.printOut("Aborting transaction: %s", e); + // abort the transaction + Utils.printOut("Aborting transaction: %s", e.getMessage()); producer.abortTransaction(); + retries = maybeRetry(retries, consumer); } + remainingRecords = getRemainingRecords(consumer); if (remainingRecords != Long.MAX_VALUE) { Utils.printOut("Remaining records: %d", remainingRecords); } } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Processed %d records", processedRecords); @@ -215,6 +224,44 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal }).sum(); } + /** + * When we get a generic {@code KafkaException} while processing records, we retry up to {@code MAX_RETRIES} times. + * If we exceed this threshold, we log an error and move on to the next batch of records. + * In a real world application you may want to to send these records to a dead letter topic (DLT) for further processing. + * + * @param retries Current number of retries + * @param consumer Consumer instance + * @return Updated number of retries + */ + private int maybeRetry(int retries, KafkaConsumer consumer) { + if (retries < 0) { + Utils.printErr("The number of retries must be greater than zero"); + shutdown(); + } + + if (retries < MAX_RETRIES) { + // retry: reset fetch offset + // the consumer fetch position needs to be restored to the committed offset before the transaction started + Map committed = consumer.committed(consumer.assignment()); + consumer.assignment().forEach(tp -> { + OffsetAndMetadata offsetAndMetadata = committed.get(tp); + if (offsetAndMetadata != null) { + consumer.seek(tp, offsetAndMetadata.offset()); + } else { + consumer.seekToBeginning(Collections.singleton(tp)); + } + }); + retries++; + } else { + // continue: skip records + // the consumer fetch position needs to be committed as if records were processed successfully + Utils.printErr("Skipping records after %d retries", MAX_RETRIES); + consumer.commitSync(); + retries = 0; + } + return retries; + } + @Override public void close() throws Exception { if (producer != null) { diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 36a2583954c..d9d454c1559 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -89,7 +89,7 @@ public class Producer extends Thread { sentRecords++; } } catch (Throwable e) { - Utils.printOut("Unhandled exception"); + Utils.printErr("Unhandled exception"); e.printStackTrace(); } Utils.printOut("Sent %d records", sentRecords);