mirror of https://github.com/apache/kafka.git
KAFKA-19474 Move WARN log on log truncation below HWM (#20106)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
#5608 introduced a regression where the check for `targetOffset < log.highWatermark` to emit a `WARN` log was made incorrectly after truncating the log. This change moves the check for `targetOffset < log.highWatermark` to `UnifiedLog#truncateTo` and ensures we emit a `WARN` log on truncation below the replica's HWM by both the `ReplicaFetcherThread` and `ReplicaAlterLogDirsThread` Reviewers: Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d86ba7f54a
commit
36b9bb94f1
|
@ -164,14 +164,9 @@ class ReplicaFetcherThread(name: String,
|
||||||
*/
|
*/
|
||||||
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
|
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
|
||||||
val partition = replicaMgr.getPartitionOrException(tp)
|
val partition = replicaMgr.getPartitionOrException(tp)
|
||||||
val log = partition.localLogOrException
|
|
||||||
|
|
||||||
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
|
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
|
||||||
|
|
||||||
if (offsetTruncationState.offset < log.highWatermark)
|
|
||||||
warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " +
|
|
||||||
s"${log.highWatermark}")
|
|
||||||
|
|
||||||
// mark the future replica for truncation only when we do last truncation
|
// mark the future replica for truncation only when we do last truncation
|
||||||
if (offsetTruncationState.truncationCompleted)
|
if (offsetTruncationState.truncationCompleted)
|
||||||
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp,
|
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp,
|
||||||
|
|
|
@ -2238,6 +2238,12 @@ public class UnifiedLog implements AutoCloseable {
|
||||||
if (targetOffset < 0) {
|
if (targetOffset < 0) {
|
||||||
throw new IllegalArgumentException("Cannot truncate partition " + topicPartition() + " to a negative offset (" + targetOffset + ").");
|
throw new IllegalArgumentException("Cannot truncate partition " + topicPartition() + " to a negative offset (" + targetOffset + ").");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long hwm = highWatermark();
|
||||||
|
if (targetOffset < hwm) {
|
||||||
|
logger.warn("Truncating {}{} to offset {} below high watermark {}", isFuture() ? "future " : "", topicPartition(), targetOffset, hwm);
|
||||||
|
}
|
||||||
|
|
||||||
if (targetOffset >= localLog.logEndOffset()) {
|
if (targetOffset >= localLog.logEndOffset()) {
|
||||||
logger.info("Truncating to {} has no effect as the largest offset in the log is {}", targetOffset, localLog.logEndOffset() - 1);
|
logger.info("Truncating to {} has no effect as the largest offset in the log is {}", targetOffset, localLog.logEndOffset() - 1);
|
||||||
// Always truncate epoch cache since we may have a conflicting epoch entry at the
|
// Always truncate epoch cache since we may have a conflicting epoch entry at the
|
||||||
|
|
Loading…
Reference in New Issue