MINOR: improve StreamsProducer error handling (#20058)
CI / build (push) Waiting to run Details

StreamProducer may timeout in sendOffsetsToTransaction() or
commitTransaction() call. To distinguish both cases, we should make both
calls in individual try-catch blocks.

Reviewers: Bill Bejeck<bbejeck@apache.org>
This commit is contained in:
Matthias J. Sax 2025-06-30 15:03:35 -07:00 committed by GitHub
parent 64aebb5621
commit c8f83592b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 2 deletions

View File

@ -247,6 +247,22 @@ public class StreamsProducer {
maybeBeginTransaction();
try {
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to add offsets to a transaction"),
error
);
} catch (final TimeoutException timeoutException) {
// re-throw to trigger `task.timeout.ms`
throw timeoutException;
} catch (final KafkaException error) {
throw new StreamsException(
formatException("Error encountered trying to add offsets to a transaction"),
error
);
}
try {
producer.commitTransaction();
transactionInFlight = false;
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) {

View File

@ -684,7 +684,7 @@ public class StreamsProducerTest {
assertThat(thrown.getCause(), is(eosMockProducer.sendOffsetsToTransactionException));
assertThat(
thrown.getMessage(),
is("Producer got fenced trying to commit a transaction [test];" +
is("Producer got fenced trying to add offsets to a transaction [test];" +
" it means all tasks belonging to this thread should be migrated.")
);
}
@ -703,7 +703,7 @@ public class StreamsProducerTest {
assertThat(thrown.getCause(), is(eosMockProducer.sendOffsetsToTransactionException));
assertThat(
thrown.getMessage(),
is("Error encountered trying to commit a transaction [test]")
is("Error encountered trying to add offsets to a transaction [test]")
);
}