KAFKA-13710: bring the InvalidTimestampException back for record error (#11853)

Reviewers: Guozhang Wang <guozhang@confluent.io>, Ricardo Brasil <anribrasil@gmail.com>
This commit is contained in:
Luke Chen 2022-03-08 14:28:16 +08:00 committed by GitHub
parent 539f006e65
commit 1848f049e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 9 deletions

View File

@ -22,7 +22,7 @@ import kafka.common.{LongRef, RecordValidationException}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{CorruptRecordException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{CorruptRecordException, InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.InvalidRecordException
import org.apache.kafka.common.TopicPartition
@ -571,9 +571,14 @@ private[log] object LogValidator extends Logging {
private def processRecordErrors(recordErrors: Seq[ApiRecordError]): Unit = {
if (recordErrors.nonEmpty) {
val errors = recordErrors.map(_.recordError)
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected due to " + errors.size + " record errors " +
"in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
throw new RecordValidationException(new InvalidTimestampException(
"One or more records have been rejected due to invalid timestamp"), errors)
} else {
throw new RecordValidationException(new InvalidRecordException(
"One or more records have been rejected due to " + errors.size + " record errors " +
"in total, and only showing the first three errors at most: " + errors.asJava.subList(0, math.min(errors.size, 3))), errors)
}
}
}

View File

@ -25,7 +25,7 @@ import kafka.message._
import kafka.metrics.KafkaYammerMetrics
import kafka.server.{BrokerTopicStats, RequestLocal}
import kafka.utils.TestUtils.meterCount
import org.apache.kafka.common.errors.{UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{InvalidRecordException, TopicPartition}
@ -1352,7 +1352,7 @@ class LogValidatorTest {
requestLocal = RequestLocal.withThreadConfinedCaching)
)
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(e.recordErrors.size, 3)
}
@ -1397,9 +1397,8 @@ class LogValidatorTest {
RecordBatch.MAGIC_VALUE_V0, CompressionType.GZIP, CompressionType.GZIP)
)
// if there is a mix of both regular InvalidRecordException and InvalidTimestampException,
// InvalidTimestampException is no longer takes precedence. The type of invalidException
// is unified as InvalidRecordException
assertTrue(e.invalidException.isInstanceOf[InvalidRecordException])
// InvalidTimestampException takes precedence
assertTrue(e.invalidException.isInstanceOf[InvalidTimestampException])
assertTrue(e.recordErrors.nonEmpty)
assertEquals(6, e.recordErrors.size)
}