MINOR: Improve raft log4j messages a bit (#13553)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
José Armando García Sancio 2023-04-14 10:05:22 -07:00 committed by GitHub
parent 20028e24cc
commit 1f1900b380
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 26 deletions

View File

@ -192,17 +192,26 @@ final class KafkaMetadataLog private (
} }
override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = { override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {
offsetMetadata.metadata.asScala match { // This API returns the new high watermark, which may be different from the passed offset
case Some(segmentPosition: SegmentPosition) => log.updateHighWatermark( val logHighWatermark = offsetMetadata.metadata.asScala match {
new internals.log.LogOffsetMetadata( case Some(segmentPosition: SegmentPosition) =>
offsetMetadata.offset, log.updateHighWatermark(
segmentPosition.baseOffset, new internals.log.LogOffsetMetadata(
segmentPosition.relativePosition) offsetMetadata.offset,
) segmentPosition.baseOffset,
segmentPosition.relativePosition
)
)
case _ => case _ =>
// FIXME: This API returns the new high watermark, which may be different from the passed offset
log.updateHighWatermark(offsetMetadata.offset) log.updateHighWatermark(offsetMetadata.offset)
} }
// Temporary log message until we fix KAFKA-14825
if (logHighWatermark != offsetMetadata.offset) {
warn(
s"Log's high watermark ($logHighWatermark) is different from the local replica's high watermark ($offsetMetadata)"
)
}
} }
override def highWatermark: LogOffsetMetadata = { override def highWatermark: LogOffsetMetadata = {

View File

@ -1091,7 +1091,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}); });
long truncationOffset = log.truncateToEndOffset(divergingOffsetAndEpoch); long truncationOffset = log.truncateToEndOffset(divergingOffsetAndEpoch);
logger.info("Truncated to offset {} from Fetch response from leader {}", truncationOffset, quorum.leaderIdOrSentinel()); logger.info(
"Truncated to offset {} from Fetch response from leader {}",
truncationOffset,
quorum.leaderIdOrSentinel()
);
} else if (partitionResponse.snapshotId().epoch() >= 0 || } else if (partitionResponse.snapshotId().epoch() >= 0 ||
partitionResponse.snapshotId().endOffset() >= 0) { partitionResponse.snapshotId().endOffset() >= 0) {
// The leader is asking us to fetch a snapshot // The leader is asking us to fetch a snapshot
@ -1120,6 +1124,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
// since this snapshot is expected to reference offsets and epochs // since this snapshot is expected to reference offsets and epochs
// greater than the log end offset and high-watermark // greater than the log end offset and high-watermark
state.setFetchingSnapshot(log.storeSnapshot(snapshotId)); state.setFetchingSnapshot(log.storeSnapshot(snapshotId));
logger.info(
"Fetching snapshot {} from Fetch response from leader {}",
snapshotId,
quorum.leaderIdOrSentinel()
);
} }
} else { } else {
Records records = FetchResponse.recordsOrFail(partitionResponse); Records records = FetchResponse.recordsOrFail(partitionResponse);
@ -1407,6 +1416,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
state.setFetchingSnapshot(Optional.empty()); state.setFetchingSnapshot(Optional.empty());
if (log.truncateToLatestSnapshot()) { if (log.truncateToLatestSnapshot()) {
logger.info(
"Fully truncated the log at ({}, {}) after downloading snapshot {} from leader {}",
log.endOffset(),
log.lastFetchedEpoch(),
snapshot.snapshotId(),
quorum.leaderIdOrSentinel()
);
updateFollowerHighWatermark(state, OptionalLong.of(log.highWatermark().offset)); updateFollowerHighWatermark(state, OptionalLong.of(log.highWatermark().offset));
} else { } else {
throw new IllegalStateException( throw new IllegalStateException(

View File

@ -212,7 +212,7 @@ public class LeaderState<T> implements EpochState {
List<ReplicaState> followersByDescendingFetchOffset List<ReplicaState> followersByDescendingFetchOffset
) { ) {
if (oldHighWatermark.isPresent()) { if (oldHighWatermark.isPresent()) {
log.trace( log.debug(
"High watermark set to {} from {} based on indexOfHw {} and voters {}", "High watermark set to {} from {} based on indexOfHw {} and voters {}",
newHighWatermark, newHighWatermark,
oldHighWatermark.get(), oldHighWatermark.get(),

View File

@ -209,7 +209,7 @@ public class QuorumState {
); );
} }
transitionTo(initialState); durableTransitionTo(initialState);
} }
public Set<Integer> remoteVoters() { public Set<Integer> remoteVoters() {
@ -277,16 +277,17 @@ public class QuorumState {
// The Resigned state is a soft state which does not need to be persisted. // The Resigned state is a soft state which does not need to be persisted.
// A leader will always be re-initialized in this state. // A leader will always be re-initialized in this state.
int epoch = state.epoch(); int epoch = state.epoch();
this.state = new ResignedState( memoryTransitionTo(
time, new ResignedState(
localIdOrThrow(), time,
epoch, localIdOrThrow(),
voters, epoch,
randomElectionTimeoutMs(), voters,
preferredSuccessors, randomElectionTimeoutMs(),
logContext preferredSuccessors,
logContext
)
); );
log.info("Completed transition to {}", state);
} }
/** /**
@ -313,7 +314,7 @@ public class QuorumState {
electionTimeoutMs = randomElectionTimeoutMs(); electionTimeoutMs = randomElectionTimeoutMs();
} }
transitionTo(new UnattachedState( durableTransitionTo(new UnattachedState(
time, time,
epoch, epoch,
voters, voters,
@ -356,7 +357,7 @@ public class QuorumState {
// Note that we reset the election timeout after voting for a candidate because we // Note that we reset the election timeout after voting for a candidate because we
// know that the candidate has at least as good of a chance of getting elected as us // know that the candidate has at least as good of a chance of getting elected as us
transitionTo(new VotedState( durableTransitionTo(new VotedState(
time, time,
epoch, epoch,
candidateId, candidateId,
@ -392,7 +393,7 @@ public class QuorumState {
" and epoch=" + epoch + " from state " + state); " and epoch=" + epoch + " from state " + state);
} }
transitionTo(new FollowerState( durableTransitionTo(new FollowerState(
time, time,
epoch, epoch,
leaderId, leaderId,
@ -416,7 +417,7 @@ public class QuorumState {
int newEpoch = epoch() + 1; int newEpoch = epoch() + 1;
int electionTimeoutMs = randomElectionTimeoutMs(); int electionTimeoutMs = randomElectionTimeoutMs();
transitionTo(new CandidateState( durableTransitionTo(new CandidateState(
time, time,
localIdOrThrow(), localIdOrThrow(),
newEpoch, newEpoch,
@ -460,11 +461,11 @@ public class QuorumState {
accumulator, accumulator,
logContext logContext
); );
transitionTo(state); durableTransitionTo(state);
return state; return state;
} }
private void transitionTo(EpochState state) { private void durableTransitionTo(EpochState state) {
if (this.state != null) { if (this.state != null) {
try { try {
this.state.close(); this.state.close();
@ -475,6 +476,10 @@ public class QuorumState {
} }
this.store.writeElectionState(state.election()); this.store.writeElectionState(state.election());
memoryTransitionTo(state);
}
private void memoryTransitionTo(EpochState state) {
EpochState from = this.state; EpochState from = this.state;
this.state = state; this.state = state;
log.info("Completed transition to {} from {}", state, from); log.info("Completed transition to {} from {}", state, from);