mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-3330; Truncate log cleaner offset checkpoint if the log is truncated
becketqin Can you take a look? Author: Dong Lin <lindong28@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com> Closes #1009 from lindong28/KAFKA-3330
This commit is contained in:
		
							parent
							
								
									61281f5c53
								
							
						
					
					
						commit
						579d473ce9
					
				| 
						 | 
				
			
			@ -133,6 +133,13 @@ class LogCleaner(val config: CleanerConfig,
 | 
			
		|||
    cleanerManager.updateCheckpoints(dataDir, update=None)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset
 | 
			
		||||
   */
 | 
			
		||||
  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
 | 
			
		||||
    cleanerManager.maybeTruncateCheckpoint(dataDir, topicAndPartition, offset)
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   *  Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition.
 | 
			
		||||
   *  This call blocks until the cleaning of the partition is aborted and paused.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -210,6 +210,18 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
 | 
			
		|||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) {
 | 
			
		||||
    inLock(lock) {
 | 
			
		||||
      if (logs.get(topicAndPartition).config.compact) {
 | 
			
		||||
        val checkpoint = checkpoints(dataDir)
 | 
			
		||||
        val existing = checkpoint.read()
 | 
			
		||||
 | 
			
		||||
        if (existing.getOrElse(topicAndPartition, 0L) > offset)
 | 
			
		||||
          checkpoint.write(existing + (topicAndPartition -> offset))
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
 | 
			
		||||
   */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -286,10 +286,12 @@ class LogManager(val logDirs: Array[File],
 | 
			
		|||
        if (needToStopCleaner && cleaner != null)
 | 
			
		||||
          cleaner.abortAndPauseCleaning(topicAndPartition)
 | 
			
		||||
        log.truncateTo(truncateOffset)
 | 
			
		||||
        if (needToStopCleaner && cleaner != null)
 | 
			
		||||
        if (needToStopCleaner && cleaner != null) {
 | 
			
		||||
          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
 | 
			
		||||
          cleaner.resumeCleaning(topicAndPartition)
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    checkpointRecoveryPointOffsets()
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -305,9 +307,11 @@ class LogManager(val logDirs: Array[File],
 | 
			
		|||
      if (cleaner != null)
 | 
			
		||||
        cleaner.abortAndPauseCleaning(topicAndPartition)
 | 
			
		||||
      log.truncateFullyAndStartAt(newOffset)
 | 
			
		||||
      if (cleaner != null)
 | 
			
		||||
      if (cleaner != null) {
 | 
			
		||||
        cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset)
 | 
			
		||||
        cleaner.resumeCleaning(topicAndPartition)
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    checkpointRecoveryPointOffsets()
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue