MINOR; Add producer id in exceptions thrown by ProducerStateManager (#9827)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Jacot 2021-01-08 09:46:29 +01:00 committed by GitHub
parent e5733f3ade
commit 8716ba1ff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 6 additions and 5 deletions

View File

@ -201,8 +201,8 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
val message = s"Epoch of producer $producerId at offset $offset in $topicPartition is $producerEpoch, " +
s"which is smaller than the last seen epoch ${updatedEntry.producerEpoch}"
if (origin == AppendOrigin.Replication) {
warn(message)
@ -219,8 +219,9 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
if (producerEpoch != updatedEntry.producerEpoch) {
if (appendFirstSeq != 0) {
if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " +
s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)")
throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch of producer $producerId " +
s"at offset $offset in partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number), " +
s"${updatedEntry.producerEpoch} (current producer epoch)")
}
}
} else {
@ -234,7 +235,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
// If there is no current producer epoch (possibly because all producer records have been deleted due to
// retention or the DeleteRecords API) accept writes with any sequence number
if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " +
throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " +
s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
s"$currentLastSeq (current end sequence number)")
}