mirror of https://github.com/apache/kafka.git
MINOR: simplify producer TX abort error handling (#18486)
Reviewers: Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@responsive.dev>
This commit is contained in:
parent
32dbbe6a1f
commit
f54cfff1dc
|
@ -462,17 +462,10 @@ public class Sender implements Runnable {
|
|||
return true;
|
||||
}
|
||||
|
||||
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
|
||||
if (accumulator.hasIncomplete()) {
|
||||
// Attempt to get the last error that caused this abort.
|
||||
RuntimeException exception = transactionManager.lastError();
|
||||
// If there was no error, but we are still aborting,
|
||||
// then this is most likely a case where there was no fatal error.
|
||||
if (exception == null) {
|
||||
exception = new TransactionAbortedException();
|
||||
}
|
||||
accumulator.abortUndrainedBatches(exception);
|
||||
}
|
||||
if (transactionManager.hasAbortableError()) {
|
||||
accumulator.abortUndrainedBatches(transactionManager.lastError());
|
||||
} else if (transactionManager.isAborting()) {
|
||||
accumulator.abortUndrainedBatches(new TransactionAbortedException());
|
||||
}
|
||||
|
||||
TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
|
||||
|
|
Loading…
Reference in New Issue