MINOR: Add retry mechanism to EOS example (#15561)

In the initial EOS example, a retry logic was implemented within the resetToLastCommittedPositions method. During refactoring, this logic was removed becasue a poison pill prevented the example from reaching the final phase of consuming from the output topic.

In this change, I suggest to add it back, but with a retry limit defined as MAX_RETRIES. Once this limit is reached, the problematic batch will be logged and skipped, allowing the processor to move on and process remaining records. If some records are skipped, the example will still hit the hard timeout (2 minutes), but after consuming all processed records.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Federico Valeri 2024-03-27 09:31:27 +01:00 committed by GitHub
parent 9326476065
commit 4cb20379c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 53 additions and 6 deletions

View File

@ -109,7 +109,7 @@ public class Consumer extends Thread implements ConsumerRebalanceListener {
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Fetched %d records", numRecords - remainingRecords);

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -49,6 +50,8 @@ import static java.util.Collections.singleton;
* This class implements a read-process-write application.
*/
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener, AutoCloseable {
private static final int MAX_RETRIES = 5;
private final String bootstrapServers;
private final String inputTopic;
private final String outputTopic;
@ -103,19 +106,21 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal
@Override
public void run() {
int retries = 0;
int processedRecords = 0;
long remainingRecords = Long.MAX_VALUE;
// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
int transactionTimeoutMs = 10_000;
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
boolean readCommitted = true;
try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic,
true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic,
"processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {
// called first and once to fence zombies and abort any pending transaction
producer.initTransactions();
consumer.subscribe(singleton(inputTopic), this);
Utils.printOut("Processing new records");
@ -140,6 +145,7 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal
// commit the transaction including offsets
producer.commitTransaction();
processedRecords += records.count();
retries = 0;
}
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
@ -151,18 +157,21 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal
Utils.printOut("Invalid or no offset found, using latest");
consumer.seekToEnd(emptyList());
consumer.commitSync();
retries = 0;
} catch (KafkaException e) {
// abort the transaction and try to continue
Utils.printOut("Aborting transaction: %s", e);
// abort the transaction
Utils.printOut("Aborting transaction: %s", e.getMessage());
producer.abortTransaction();
retries = maybeRetry(retries, consumer);
}
remainingRecords = getRemainingRecords(consumer);
if (remainingRecords != Long.MAX_VALUE) {
Utils.printOut("Remaining records: %d", remainingRecords);
}
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Processed %d records", processedRecords);
@ -215,6 +224,44 @@ public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebal
}).sum();
}
/**
* When we get a generic {@code KafkaException} while processing records, we retry up to {@code MAX_RETRIES} times.
* If we exceed this threshold, we log an error and move on to the next batch of records.
* In a real world application you may want to to send these records to a dead letter topic (DLT) for further processing.
*
* @param retries Current number of retries
* @param consumer Consumer instance
* @return Updated number of retries
*/
private int maybeRetry(int retries, KafkaConsumer<Integer, String> consumer) {
if (retries < 0) {
Utils.printErr("The number of retries must be greater than zero");
shutdown();
}
if (retries < MAX_RETRIES) {
// retry: reset fetch offset
// the consumer fetch position needs to be restored to the committed offset before the transaction started
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
consumer.assignment().forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = committed.get(tp);
if (offsetAndMetadata != null) {
consumer.seek(tp, offsetAndMetadata.offset());
} else {
consumer.seekToBeginning(Collections.singleton(tp));
}
});
retries++;
} else {
// continue: skip records
// the consumer fetch position needs to be committed as if records were processed successfully
Utils.printErr("Skipping records after %d retries", MAX_RETRIES);
consumer.commitSync();
retries = 0;
}
return retries;
}
@Override
public void close() throws Exception {
if (producer != null) {

View File

@ -89,7 +89,7 @@ public class Producer extends Thread {
sentRecords++;
}
} catch (Throwable e) {
Utils.printOut("Unhandled exception");
Utils.printErr("Unhandled exception");
e.printStackTrace();
}
Utils.printOut("Sent %d records", sentRecords);