diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index 1048b5a2ecf..c5fbdd788bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -247,6 +247,22 @@ public class StreamsProducer { maybeBeginTransaction(); try { producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata); + } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) { + throw new TaskMigratedException( + formatException("Producer got fenced trying to add offsets to a transaction"), + error + ); + } catch (final TimeoutException timeoutException) { + // re-throw to trigger `task.timeout.ms` + throw timeoutException; + } catch (final KafkaException error) { + throw new StreamsException( + formatException("Error encountered trying to add offsets to a transaction"), + error + ); + } + + try { producer.commitTransaction(); transactionInFlight = false; } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 1c084fa63e2..2628ca5b99f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -684,7 +684,7 @@ public class StreamsProducerTest { assertThat(thrown.getCause(), is(eosMockProducer.sendOffsetsToTransactionException)); assertThat( thrown.getMessage(), - is("Producer got fenced trying to commit a transaction [test];" + + is("Producer got fenced trying to add offsets to a transaction [test];" + " it means all tasks belonging to this thread should be migrated.") ); } @@ -703,7 +703,7 @@ public class StreamsProducerTest { assertThat(thrown.getCause(), is(eosMockProducer.sendOffsetsToTransactionException)); assertThat( thrown.getMessage(), - is("Error encountered trying to commit a transaction [test]") + is("Error encountered trying to add offsets to a transaction [test]") ); }