From 8716ba1ff18a6d969c44d73bbfb756c2046a9802 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 8 Jan 2021 09:46:29 +0100 Subject: [PATCH] MINOR; Add producer id in exceptions thrown by ProducerStateManager (#9827) Reviewers: Chia-Ping Tsai , Jason Gustafson --- .../main/scala/kafka/log/ProducerStateManager.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index b4fa26755f6..00f908bfd9c 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -201,8 +201,8 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = { if (producerEpoch < updatedEntry.producerEpoch) { - val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " + - s"smaller than the last seen epoch ${updatedEntry.producerEpoch}" + val message = s"Epoch of producer $producerId at offset $offset in $topicPartition is $producerEpoch, " + + s"which is smaller than the last seen epoch ${updatedEntry.producerEpoch}" if (origin == AppendOrigin.Replication) { warn(message) @@ -219,8 +219,9 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, if (producerEpoch != updatedEntry.producerEpoch) { if (appendFirstSeq != 0) { if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) { - throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch at offset $offset in " + - s"partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number)") + throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch of producer $producerId " + + s"at offset $offset in partition $topicPartition: $producerEpoch (request epoch), $appendFirstSeq (seq. number), " + + s"${updatedEntry.producerEpoch} (current producer epoch)") } } } else { @@ -234,7 +235,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition, // If there is no current producer epoch (possibly because all producer records have been deleted due to // retention or the DeleteRecords API) accept writes with any sequence number if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { - throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId at " + + throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " + s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " + s"$currentLastSeq (current end sequence number)") }