mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-9617 Replica Fetcher can mark partition as failed when max.message.bytes is changed (#8659)
Skip to check the size of record if the record is already accepted by leader. Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
		
							parent
							
								
									62fa8fc9a9
								
							
						
					
					
						commit
						78e18b575c
					
				|  | @ -1010,7 +1010,7 @@ class Log(@volatile private var _dir: File, | |||
|                      leaderEpoch: Int, | ||||
|                      origin: AppendOrigin = AppendOrigin.Client, | ||||
|                      interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { | ||||
|     append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) | ||||
|     append(records, origin, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch, ignoreRecordSize = false) | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|  | @ -1025,7 +1025,9 @@ class Log(@volatile private var _dir: File, | |||
|       origin = AppendOrigin.Replication, | ||||
|       interBrokerProtocolVersion = ApiVersion.latestVersion, | ||||
|       assignOffsets = false, | ||||
|       leaderEpoch = -1) | ||||
|       leaderEpoch = -1, | ||||
|       // disable to check the validation of record size since the record is already accepted by leader. | ||||
|       ignoreRecordSize = true) | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|  | @ -1039,6 +1041,7 @@ class Log(@volatile private var _dir: File, | |||
|    * @param interBrokerProtocolVersion Inter-broker message protocol version | ||||
|    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given | ||||
|    * @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader | ||||
|    * @param ignoreRecordSize true to skip validation of record size. | ||||
|    * @throws KafkaStorageException If the append fails due to an I/O error. | ||||
|    * @throws OffsetsOutOfOrderException If out of order offsets found in 'records' | ||||
|    * @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset | ||||
|  | @ -1048,9 +1051,10 @@ class Log(@volatile private var _dir: File, | |||
|                      origin: AppendOrigin, | ||||
|                      interBrokerProtocolVersion: ApiVersion, | ||||
|                      assignOffsets: Boolean, | ||||
|                      leaderEpoch: Int): LogAppendInfo = { | ||||
|                      leaderEpoch: Int, | ||||
|                      ignoreRecordSize: Boolean): LogAppendInfo = { | ||||
|     maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { | ||||
|       val appendInfo = analyzeAndValidateRecords(records, origin) | ||||
|       val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize) | ||||
| 
 | ||||
|       // return if we have no valid messages or if this is a duplicate of the last appended entry | ||||
|       if (appendInfo.shallowCount == 0) | ||||
|  | @ -1097,7 +1101,7 @@ class Log(@volatile private var _dir: File, | |||
| 
 | ||||
|           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message | ||||
|           // format conversion) | ||||
|           if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { | ||||
|           if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) { | ||||
|             for (batch <- validRecords.batches.asScala) { | ||||
|               if (batch.sizeInBytes > config.maxMessageSize) { | ||||
|                 // we record the original message set size instead of the trimmed size | ||||
|  | @ -1321,7 +1325,7 @@ class Log(@volatile private var _dir: File, | |||
|    * Validate the following: | ||||
|    * <ol> | ||||
|    * <li> each message matches its CRC | ||||
|    * <li> each message size is valid | ||||
|    * <li> each message size is valid (if ignoreRecordSize is false) | ||||
|    * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other. | ||||
|    * </ol> | ||||
|    * | ||||
|  | @ -1335,7 +1339,9 @@ class Log(@volatile private var _dir: File, | |||
|    * <li> Whether any compression codec is used (if many are used, then the last one is given) | ||||
|    * </ol> | ||||
|    */ | ||||
|   private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = { | ||||
|   private def analyzeAndValidateRecords(records: MemoryRecords, | ||||
|                                         origin: AppendOrigin, | ||||
|                                         ignoreRecordSize: Boolean): LogAppendInfo = { | ||||
|     var shallowMessageCount = 0 | ||||
|     var validBytesCount = 0 | ||||
|     var firstOffset: Option[Long] = None | ||||
|  | @ -1375,7 +1381,7 @@ class Log(@volatile private var _dir: File, | |||
| 
 | ||||
|       // Check if the message sizes are valid. | ||||
|       val batchSize = batch.sizeInBytes | ||||
|       if (batchSize > config.maxMessageSize) { | ||||
|       if (!ignoreRecordSize && batchSize > config.maxMessageSize) { | ||||
|         brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) | ||||
|         brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) | ||||
|         throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + | ||||
|  |  | |||
|  | @ -2078,6 +2078,22 @@ class LogTest { | |||
|       case _: RecordTooLargeException => // this is good | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   def testMessageSizeCheckInAppendAsFollower(): Unit = { | ||||
|     val first = MemoryRecords.withRecords(0, CompressionType.NONE, 0, | ||||
|       new SimpleRecord("You".getBytes), new SimpleRecord("bethe".getBytes)) | ||||
|     val second = MemoryRecords.withRecords(5, CompressionType.NONE, 0, | ||||
|       new SimpleRecord("change (I need more bytes)... blah blah blah.".getBytes), | ||||
|       new SimpleRecord("More padding boo hoo".getBytes)) | ||||
| 
 | ||||
|     val log = createLog(logDir, LogTest.createLogConfig(maxMessageBytes = second.sizeInBytes - 1)) | ||||
| 
 | ||||
|     log.appendAsFollower(first) | ||||
|     // the second record is larger then limit but appendAsFollower does not validate the size. | ||||
|     log.appendAsFollower(second) | ||||
|   } | ||||
| 
 | ||||
|   /** | ||||
|    * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. | ||||
|    */ | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue