KAFKA-3584; Fix synchronization issue between deleteOldSegments() and delete() methods

This PR is to fix synchronization issue between deleteOldSegments() and delete() method calls. log.deleteOldSegments() call throws NullPointerException after log.delete() method call.

cc ijuma junrao

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1367 from omkreddy/KAFKA-3584
This commit is contained in:
Manikumar reddy O 2016-05-13 12:57:09 +01:00 committed by Ismael Juma
parent fb421dbcfe
commit 1c4b943f2d
2 changed files with 39 additions and 9 deletions

View File

@ -563,21 +563,23 @@ class Log(val dir: File,
* @return The number of segments deleted * @return The number of segments deleted
*/ */
def deleteOldSegments(predicate: LogSegment => Boolean): Int = { def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
// find any segments that match the user-supplied predicate UNLESS it is the final segment lock synchronized {
// and it is empty (since we would just end up re-creating it //find any segments that match the user-supplied predicate UNLESS it is the final segment
val lastSegment = activeSegment //and it is empty (since we would just end up re-creating it)
val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) val lastEntry = segments.lastEntry
val numToDelete = deletable.size val deletable =
if(numToDelete > 0) { if (lastEntry == null) Seq.empty
lock synchronized { else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if(segments.size == numToDelete) if (segments.size == numToDelete)
roll() roll()
// remove the segments for lookups // remove the segments for lookups
deletable.foreach(deleteSegment(_)) deletable.foreach(deleteSegment(_))
} }
numToDelete
} }
numToDelete
} }
/** /**

View File

@ -930,4 +930,32 @@ class LogTest extends JUnitSuite {
def topicPartitionName(topic: String, partition: String): String = def topicPartitionName(topic: String, partition: String): String =
File.separator + topic + "-" + partition File.separator + topic + "-" + partition
@Test
def testDeleteOldSegmentsMethod() {
val set = TestUtils.singleMessageSet("test".getBytes)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
val config = LogConfig(logProps)
val log = new Log(logDir,
config,
recoveryPoint = 0L,
time.scheduler,
time)
// append some messages to create some segments
for (i <- 0 until 100)
log.append(set)
log.deleteOldSegments(_ => true)
assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
// append some messages to create some segments
for (i <- 0 until 100)
log.append(set)
log.delete()
assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
assertEquals("The number of deleted segments shoud be zero.", 0, log.deleteOldSegments(_ => true))
}
} }