log.truncateTo needs to handle targetOffset smaller than the lowest offset in the log ; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-463

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1386641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-09-17 14:52:18 +00:00
parent fb0f176131
commit c8f7587072
5 changed files with 149 additions and 98 deletions

View File

@ -91,7 +91,7 @@ object Log {
/** /**
* A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size
*/ */
class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range { class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range {
var firstAppendTime: Option[Long] = None var firstAppendTime: Option[Long] = None
@ -345,20 +345,23 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag
roll() roll()
} }
private def rollSegment(newOffset: Long) {
val newFile = new File(dir, nameFromOffset(newOffset))
if (newFile.exists) {
warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
newFile.delete()
}
debug("Rolling log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time))
}
/** /**
* Create a new segment and make it active * Create a new segment and make it active
*/ */
def roll() { def roll() {
lock synchronized { lock synchronized {
flush flush
val newOffset = logEndOffset rollSegment(logEndOffset)
val newFile = new File(dir, nameFromOffset(newOffset))
if (newFile.exists) {
warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
newFile.delete()
}
debug("Rolling log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time))
} }
} }
@ -432,19 +435,11 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag
def truncateAndStartWithNewOffset(newOffset: Long) { def truncateAndStartWithNewOffset(newOffset: Long) {
lock synchronized { lock synchronized {
val deletedSegments = segments.trunc(segments.view.size) val deletedSegments = segments.trunc(segments.view.size)
val newFile = new File(dir, Log.nameFromOffset(newOffset)) rollSegment(newOffset)
debug("Truncate and start log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time))
deleteSegments(deletedSegments) deleteSegments(deletedSegments)
} }
} }
def deleteWholeLog():Unit = {
deleteSegments(segments.contents.get())
Utils.rm(dir)
}
/* Attempts to delete all provided segments from a log and returns how many it was able to */ /* Attempts to delete all provided segments from a log and returns how many it was able to */
def deleteSegments(segments: Seq[LogSegment]): Int = { def deleteSegments(segments: Seq[LogSegment]): Int = {
var total = 0 var total = 0
@ -461,26 +456,34 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag
} }
def truncateTo(targetOffset: Long) { def truncateTo(targetOffset: Long) {
// find the log segment that has this hw lock synchronized {
val segmentToBeTruncated = segments.view.find( val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset) val viewSize = segments.view.size
segmentToBeTruncated match {
case Some(segment) =>
val truncatedSegmentIndex = segments.view.indexOf(segment)
segments.truncLast(truncatedSegmentIndex)
segment.truncateTo(targetOffset)
info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
if(targetOffset > segments.view.last.absoluteEndOffset)
error("Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
}
val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
if(segmentsToBeDeleted.size < segments.view.size) {
val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted) val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
/* We should not hit this error because segments.view is locked in markedDeletedWhile() */
if(numSegmentsDeleted != segmentsToBeDeleted.size) if(numSegmentsDeleted != segmentsToBeDeleted.size)
error("Failed to delete some segments during log recovery") error("Failed to delete some segments during log recovery during truncateTo(" + targetOffset +")")
if (numSegmentsDeleted == viewSize) {
segments.trunc(segments.view.size)
rollSegment(targetOffset)
} else {
// find the log segment that has this hw
val segmentToBeTruncated =
segments.view.find(segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
segmentToBeTruncated match {
case Some(segment) =>
val truncatedSegmentIndex = segments.view.indexOf(segment)
segments.truncLast(truncatedSegmentIndex)
segment.truncateTo(targetOffset)
info("Truncated log segment %s to target offset %d".format(segment.file.getAbsolutePath, targetOffset))
case None =>
if(targetOffset > segments.view.last.absoluteEndOffset)
error("Target offset %d cannot be greater than the last message offset %d in the log %s".
format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
}
}
} }
} }

View File

@ -40,15 +40,12 @@ private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) {
* Append the given items to the end of the list * Append the given items to the end of the list
*/ */
def append(ts: T*)(implicit m: ClassManifest[T]) { def append(ts: T*)(implicit m: ClassManifest[T]) {
while(true){ val curr = contents.get()
val curr = contents.get() val updated = new Array[T](curr.length + ts.length)
val updated = new Array[T](curr.length + ts.length) Array.copy(curr, 0, updated, 0, curr.length)
Array.copy(curr, 0, updated, 0, curr.length) for(i <- 0 until ts.length)
for(i <- 0 until ts.length) updated(curr.length + i) = ts(i)
updated(curr.length + i) = ts(i) contents.set(updated)
if(contents.compareAndSet(curr, updated))
return
}
} }
@ -59,39 +56,33 @@ private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) {
if(newStart < 0) if(newStart < 0)
throw new KafkaException("Starting index must be positive."); throw new KafkaException("Starting index must be positive.");
var deleted: Array[T] = null var deleted: Array[T] = null
var done = false val curr = contents.get()
while(!done) { if (curr.length > 0) {
val curr = contents.get()
val newLength = max(curr.length - newStart, 0) val newLength = max(curr.length - newStart, 0)
val updated = new Array[T](newLength) val updated = new Array[T](newLength)
Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength) Array.copy(curr, min(newStart, curr.length - 1), updated, 0, newLength)
if(contents.compareAndSet(curr, updated)) { contents.set(updated)
deleted = new Array[T](newStart) deleted = new Array[T](newStart)
Array.copy(curr, 0, deleted, 0, curr.length - newLength) Array.copy(curr, 0, deleted, 0, curr.length - newLength)
done = true
}
} }
deleted deleted
} }
/** /**
* Delete the items from position newEnd until end of list * Delete the items from position (newEnd + 1) until end of list
*/ */
def truncLast(newEnd: Int): Seq[T] = { def truncLast(newEnd: Int): Seq[T] = {
if(newEnd >= contents.get().size-1) if (newEnd < 0 || newEnd > contents.get().length-1)
throw new KafkaException("End index must be segment list size - 1"); throw new KafkaException("End index must be positive and less than segment list size.");
var deleted: Array[T] = null var deleted: Array[T] = null
var done = false val curr = contents.get()
while(!done) { if (curr.length > 0) {
val curr = contents.get()
val newLength = newEnd + 1 val newLength = newEnd + 1
val updated = new Array[T](newLength) val updated = new Array[T](newLength)
Array.copy(curr, 0, updated, 0, newLength) Array.copy(curr, 0, updated, 0, newLength)
if(contents.compareAndSet(curr, updated)) { contents.set(updated)
deleted = new Array[T](curr.length - newLength) deleted = new Array[T](curr.length - newLength)
Array.copy(curr, newEnd + 1, deleted, 0, curr.length - newLength) Array.copy(curr, min(newEnd + 1, curr.length - 1), deleted, 0, curr.length - newLength)
done = true
}
} }
deleted deleted
} }

View File

@ -196,7 +196,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
} }
def truncateTo(targetSize: Long) = { def truncateTo(targetSize: Long) = {
if(targetSize >= sizeInBytes()) if(targetSize > sizeInBytes())
throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
" size of this log segment is only %d bytes".format(sizeInBytes())) " size of this log segment is only %d bytes".format(sizeInBytes()))
channel.truncate(targetSize) channel.truncate(targetSize)

View File

@ -279,6 +279,56 @@ class LogTest extends JUnitSuite {
assert(ret, "Second message set should throw MessageSizeTooLargeException.") assert(ret, "Second message set should throw MessageSizeTooLargeException.")
} }
@Test
def testTruncateTo() {
val set = TestUtils.singleMessageSet("test".getBytes())
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
// create a log
val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg) {
log.append(set)
}
assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments)
val lastOffset = log.logEndOffset
val size = log.size
log.truncateTo(log.logEndOffset) // keep the entire log
assertEquals("Should not change offset", lastOffset, log.logEndOffset)
assertEquals("Should not change log size", size, log.size)
log.truncateTo(log.logEndOffset + 1) // try to truncate beyond lastOffset
assertEquals("Should not change offset but should log error", lastOffset, log.logEndOffset)
assertEquals("Should not change log size", size, log.size)
log.truncateTo(log.logEndOffset - 10) // truncate somewhere in between
assertEquals("Should change offset", lastOffset, log.logEndOffset + 10)
assertEquals("Should change log size", size, log.size + 10)
log.truncateTo(log.logEndOffset - log.size) // truncate the entire log
assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
assertEquals("Should change log size", log.size, 0)
for (i<- 1 to msgPerSeg) {
log.append(set)
}
assertEquals("Should be back to original offset", log.logEndOffset, lastOffset)
assertEquals("Should be back to original size", log.size, size)
log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)*setSize)
assertEquals("Should change offset", log.logEndOffset, lastOffset - (msgPerSeg - 1)*setSize)
assertEquals("Should change log size", log.size, 0)
for (i<- 1 to msgPerSeg) {
log.append(set)
}
assertEquals("Should be ahead of to original offset", log.logEndOffset, lastOffset + setSize)
assertEquals("log size should be same as before", size, log.size)
log.truncateTo(log.logEndOffset - log.size - setSize) // truncate before first start offset in the log
assertEquals("Should change offset", log.logEndOffset, lastOffset - size)
assertEquals("Should change log size", log.size, 0)
}
def assertContains(ranges: Array[Range], offset: Long) = { def assertContains(ranges: Array[Range], offset: Long) = {
Log.findRange(ranges, offset) match { Log.findRange(ranges, offset) match {
case Some(range) => case Some(range) =>

View File

@ -31,22 +31,46 @@ class SegmentListTest extends JUnitSuite {
val view = sl.view val view = sl.view
assertEquals(list, view.iterator.toList) assertEquals(list, view.iterator.toList)
sl.append(5) sl.append(5)
assertEquals("Appending to both should result in list that are still equals", assertEquals("Appending to both should result in lists that are still equals",
list ::: List(5), sl.view.iterator.toList) list ::: List(5), sl.view.iterator.toList)
assertEquals("But the prior view should still equal the original list", list, view.iterator.toList) assertEquals("But the prior view should still equal the original list", list, view.iterator.toList)
} }
@Test @Test
def testTrunc() { def testTrunc() {
val hd = List(1,2,3) {
val tail = List(4,5,6) val hd = List(1,2,3)
val sl = new SegmentList(hd ::: tail) val tail = List(4,5,6)
val view = sl.view val sl = new SegmentList(hd ::: tail)
assertEquals(hd ::: tail, view.iterator.toList) val view = sl.view
val deleted = sl.trunc(3) assertEquals(hd ::: tail, view.iterator.toList)
assertEquals(tail, sl.view.iterator.toList) val deleted = sl.trunc(3)
assertEquals(hd, deleted.iterator.toList) assertEquals(tail, sl.view.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) assertEquals(hd, deleted.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
}
{
val hd = List(1,2,3,4,5)
val tail = List(6)
val sl = new SegmentList(hd ::: tail)
val view = sl.view
assertEquals(hd ::: tail, view.iterator.toList)
try {
sl.trunc(-1)
fail("Attempt to truncate with illegal index should fail")
} catch {
case e: KafkaException => // this is ok
}
val deleted = sl.truncLast(4)
assertEquals(hd, sl.view.iterator.toList)
assertEquals(tail, deleted.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
}
{
val sl = new SegmentList(List(1, 2))
sl.trunc(3)
assertEquals(0, sl.view.length)
}
} }
@Test @Test
@ -62,7 +86,6 @@ class SegmentListTest extends JUnitSuite {
assertEquals(tail, deleted.iterator.toList) assertEquals(tail, deleted.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
} }
{ {
val hd = List(1,2,3,4,5) val hd = List(1,2,3,4,5)
val tail = List(6) val tail = List(6)
@ -71,10 +94,14 @@ class SegmentListTest extends JUnitSuite {
assertEquals(hd ::: tail, view.iterator.toList) assertEquals(hd ::: tail, view.iterator.toList)
try { try {
sl.truncLast(6) sl.truncLast(6)
sl.truncLast(5) fail("Attempt to truncate with illegal index should fail")
} catch {
case e: KafkaException => // this is ok
}
try {
sl.truncLast(-1) sl.truncLast(-1)
fail("Attempt to truncate with illegal index should fail") fail("Attempt to truncate with illegal index should fail")
}catch { } catch {
case e: KafkaException => // this is ok case e: KafkaException => // this is ok
} }
val deleted = sl.truncLast(4) val deleted = sl.truncLast(4)
@ -82,25 +109,5 @@ class SegmentListTest extends JUnitSuite {
assertEquals(tail, deleted.iterator.toList) assertEquals(tail, deleted.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList) assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
} }
{
val hd = List(1)
val tail = List(2,3,4,5,6)
val sl = new SegmentList(hd ::: tail)
val view = sl.view
assertEquals(hd ::: tail, view.iterator.toList)
val deleted = sl.truncLast(0)
assertEquals(hd, sl.view.iterator.toList)
assertEquals(tail, deleted.iterator.toList)
assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
}
} }
@Test
def testTruncBeyondList() {
val sl = new SegmentList(List(1, 2))
sl.trunc(3)
assertEquals(0, sl.view.length)
}
} }