diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 2f5c983517e..0df48fe8d46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -741,6 +741,7 @@ public class KafkaProducer implements Producer { public void abortTransaction() throws ProducerFencedException { throwIfNoTransactionManager(); throwIfProducerClosed(); + log.info("Aborting incomplete transaction"); TransactionalRequestResult result = transactionManager.beginAbort(); sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 5f1a1f45c4a..e3ce1d12a5f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -478,10 +478,12 @@ public class TransactionManager { return; } + log.info("Transiting to abortable error state due to {}", exception.toString()); transitionTo(State.ABORTABLE_ERROR, exception); } synchronized void transitionToFatalError(RuntimeException exception) { + log.info("Transiting to fatal error state due to {}", exception.toString()); transitionTo(State.FATAL_ERROR, exception); if (pendingResult != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 124e6f64b20..180aae297f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -251,7 +251,7 @@ public class RecordCollectorImpl implements RecordCollector { */ @Override public void close() { - log.debug("Closing record collector"); + log.info("Closing record collector"); if (eosEnabled) { streamsProducer.abortTransaction();