mirror of https://github.com/apache/kafka.git
KAFKA-4099; Fix the potential frequent log rolling
Author: Jiangjie Qin <becket.qin@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com> Closes #1809 from becketqin/KAFKA-4099
This commit is contained in:
parent
8f3462552f
commit
af9fc503de
|
@ -401,7 +401,8 @@ class Log(val dir: File,
|
|||
}
|
||||
|
||||
// maybe roll the log if this segment is full
|
||||
val segment = maybeRoll(validMessages.sizeInBytes)
|
||||
val segment = maybeRoll(messagesSize = validMessages.sizeInBytes,
|
||||
maxTimestampInMessages = appendInfo.maxTimestamp)
|
||||
|
||||
// now append to the log
|
||||
segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
|
||||
|
@ -736,6 +737,7 @@ class Log(val dir: File,
|
|||
* Roll the log over to a new empty log segment if necessary.
|
||||
*
|
||||
* @param messagesSize The messages set size in bytes
|
||||
* @param maxTimestampInMessages The maximum timestamp in the messages.
|
||||
* logSegment will be rolled if one of the following conditions met
|
||||
* <ol>
|
||||
* <li> The logSegment is full
|
||||
|
@ -745,16 +747,17 @@ class Log(val dir: File,
|
|||
* </ol>
|
||||
* @return The currently active segment after (perhaps) rolling to a new segment
|
||||
*/
|
||||
private def maybeRoll(messagesSize: Int): LogSegment = {
|
||||
private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long): LogSegment = {
|
||||
val segment = activeSegment
|
||||
val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs
|
||||
val now = time.milliseconds
|
||||
val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
|
||||
if (segment.size > config.segmentSize - messagesSize ||
|
||||
(segment.size > 0 && reachedRollMs) ||
|
||||
segment.index.isFull || segment.timeIndex.isFull) {
|
||||
debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
|
||||
s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
|
||||
s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
|
||||
s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).")
|
||||
s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).")
|
||||
roll()
|
||||
} else {
|
||||
segment
|
||||
|
|
|
@ -339,20 +339,25 @@ class LogSegment(val log: FileMessageSet,
|
|||
}
|
||||
|
||||
/**
|
||||
* The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp,
|
||||
* the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message.
|
||||
* The time this segment has waited to be rolled.
|
||||
* If the first message has a timestamp we use the message timestamp to determine when to roll a segment. A segment
|
||||
* is rolled if the difference between the new message's timestamp and the first message's timestamp exceeds the
|
||||
* segment rolling time.
|
||||
* If the first message does not have a timestamp, we use the wall clock time to determine when to roll a segment. A
|
||||
* segment is rolled if the difference between the current wall clock time and the segment create time exceeds the
|
||||
* segment rolling time.
|
||||
*/
|
||||
def timeWaitedForRoll(now: Long) : Long= {
|
||||
def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
|
||||
// Load the timestamp of the first message into memory
|
||||
if (!rollingBasedTimestamp.isDefined) {
|
||||
if (rollingBasedTimestamp.isEmpty) {
|
||||
val iter = log.iterator
|
||||
if (iter.hasNext)
|
||||
rollingBasedTimestamp = Some(iter.next.message.timestamp)
|
||||
else
|
||||
// If the log is empty, we return time elapsed since the segment is created.
|
||||
return now - created
|
||||
}
|
||||
now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created}
|
||||
rollingBasedTimestamp match {
|
||||
case Some(t) if t >= 0 => messageTimestamp - t
|
||||
case _ => now - created
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -75,6 +75,7 @@ class LogTest extends JUnitSuite {
|
|||
scheduler = time.scheduler,
|
||||
time = time)
|
||||
assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments)
|
||||
// Test the segment rolling behavior when messages do not have a timestamp.
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
log.append(set)
|
||||
assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments)
|
||||
|
@ -88,19 +89,25 @@ class LogTest extends JUnitSuite {
|
|||
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
|
||||
}
|
||||
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
// Append a message with timestamp to a segment whose first messgae do not have a timestamp.
|
||||
val setWithTimestamp =
|
||||
TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
|
||||
log.append(setWithTimestamp)
|
||||
assertEquals("Segment should not have been rolled out because the log rolling should be based on wall clock.", 4, log.numberOfSegments)
|
||||
|
||||
// Test the segment rolling behavior when messages have timestamps.
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
log.append(setWithTimestamp)
|
||||
assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
|
||||
|
||||
// move the wall clock beyond log rolling time
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
log.append(set)
|
||||
assertEquals("Log should not roll because the roll should depend on the index of the first time index entry.", 5, log.numberOfSegments)
|
||||
log.append(setWithTimestamp)
|
||||
assertEquals("Log should not roll because the roll should depend on timestamp of the first message.", 5, log.numberOfSegments)
|
||||
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
log.append(set)
|
||||
assertEquals("Log should roll because the time since the timestamp of first time index entry has expired.", 6, log.numberOfSegments)
|
||||
val setWithExpiredTimestamp = TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds)
|
||||
log.append(setWithExpiredTimestamp)
|
||||
assertEquals("Log should roll because the timestamp in the message should make the log segment expire.", 6, log.numberOfSegments)
|
||||
|
||||
val numSegments = log.numberOfSegments
|
||||
time.sleep(log.config.segmentMs + 1)
|
||||
|
|
|
@ -26,7 +26,7 @@ However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking c
|
|||
<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5>
|
||||
<ul>
|
||||
<li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
|
||||
<li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp of the first message in a log segment. i.e. if the timestamp of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li>
|
||||
<li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp in the messages. More specifically. if the timestamp of the first message in the segment is T, the log will be rolled out when a new message has a timestamp greater than or equal to T + log.roll.ms </li>
|
||||
<li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
|
||||
<li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
|
||||
<li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
|
||||
|
|
Loading…
Reference in New Issue