KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog` (#9332)

There is a minor difference in behavior between the epoch caching logic in `MockLog` from the behavior in `LeaderEpochFileCache`. The latter ensures that every new epoch/start offset entry added to the cache increases monotonically over the previous entries. This patch brings the behavior of `MockLog` in line. 

It also simplifies the `assignEpochStartOffset` api in `ReplicatedLog`. We always intend to use the log end offset, so this patch removes the start offset parameter.

Reviewers: Boyang Chen <boyang@confluent.io>
This commit is contained in:
Jason Gustafson 2020-09-28 17:16:55 -07:00 committed by GitHub
parent 17fa3d9296
commit dbe3e4a4cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 38 additions and 21 deletions

View File

@ -82,7 +82,6 @@ class KafkaMetadataLog(log: Log,
}
override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = {
// TODO: Does this handle empty log case (when epoch is None) as we expect?
val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch =>
new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch)
}
@ -107,8 +106,8 @@ class KafkaMetadataLog(log: Log,
log.truncateTo(offset)
}
override def assignEpochStartOffset(epoch: Int, startOffset: Long): Unit = {
log.maybeAssignEpochStartOffset(epoch, startOffset)
override def initializeLeaderEpoch(epoch: Int): Unit = {
log.maybeAssignEpochStartOffset(epoch, log.logEndOffset)
}
override def updateHighWatermark(offsetMetadata: LogOffsetMetadata): Unit = {

View File

@ -274,7 +274,7 @@ public class KafkaRaftClient implements RaftClient {
private void onBecomeLeader(long currentTimeMs) {
LeaderState state = quorum.leaderStateOrThrow();
log.assignEpochStartOffset(quorum.epoch(), log.endOffset().offset);
log.initializeLeaderEpoch(quorum.epoch());
// The high watermark can only be advanced once we have written a record
// from the new leader's epoch. Hence we write a control message immediately

View File

@ -78,9 +78,13 @@ public interface ReplicatedLog extends Closeable {
long startOffset();
/**
* Assign a start offset to a given epoch.
* Initialize a new leader epoch beginning at the current log end offset. This API is invoked
* after becoming a leader and ensures that we can always determine the end offset and epoch
* with {@link #endOffsetForEpoch(int)} for any previous epoch.
*
* @param epoch Epoch of the newly elected leader
*/
void assignEpochStartOffset(int epoch, long startOffset);
void initializeLeaderEpoch(int epoch);
/**
* Truncate the log to the given offset. All records with offsets greater than or equal to
@ -90,6 +94,13 @@ public interface ReplicatedLog extends Closeable {
*/
void truncateTo(long offset);
/**
* Update the high watermark and associated metadata (which is used to avoid
* index lookups when handling reads with {@link #read(long, Isolation)} with
* the {@link Isolation#COMMITTED} isolation level.
*
* @param offsetMetadata The offset and optional metadata
*/
void updateHighWatermark(LogOffsetMetadata offsetMetadata);
/**

View File

@ -310,10 +310,10 @@ public class MockLog implements ReplicatedLog {
}
@Override
public void assignEpochStartOffset(int epoch, long startOffset) {
if (startOffset != endOffset().offset)
throw new IllegalArgumentException(
"Can only assign epoch for the end offset " + endOffset().offset + ", but get offset " + startOffset);
public void initializeLeaderEpoch(int epoch) {
long startOffset = endOffset().offset;
epochStartOffsets.removeIf(epochStartOffset ->
epochStartOffset.startOffset >= startOffset || epochStartOffset.epoch >= epoch);
epochStartOffsets.add(new EpochStartOffset(epoch, startOffset));
}

View File

@ -141,15 +141,10 @@ public class MockLogTest {
@Test
public void testAssignEpochStartOffset() {
log.assignEpochStartOffset(2, 0);
log.initializeLeaderEpoch(2);
assertEquals(2, log.lastFetchedEpoch());
}
@Test
public void testAssignEpochStartOffsetNotEqualToEndOffset() {
assertThrows(IllegalArgumentException.class, () -> log.assignEpochStartOffset(2, 1));
}
@Test
public void testAppendAsLeader() {
// The record passed-in offsets are not going to affect the eventual offsets.
@ -370,6 +365,23 @@ public class MockLogTest {
Isolation.UNCOMMITTED));
}
@Test
public void testMonotonicEpochStartOffset() {
appendBatch(5, 1);
assertEquals(5L, log.endOffset().offset);
log.initializeLeaderEpoch(2);
assertEquals(Optional.of(new OffsetAndEpoch(5L, 1)), log.endOffsetForEpoch(1));
assertEquals(Optional.of(new OffsetAndEpoch(5L, 2)), log.endOffsetForEpoch(2));
// Initialize a new epoch at the same end offset. The epoch cache ensures
// that the start offset of each retained epoch increases monotonically.
log.initializeLeaderEpoch(3);
assertEquals(Optional.of(new OffsetAndEpoch(5L, 1)), log.endOffsetForEpoch(1));
assertEquals(Optional.of(new OffsetAndEpoch(5L, 1)), log.endOffsetForEpoch(2));
assertEquals(Optional.of(new OffsetAndEpoch(5L, 3)), log.endOffsetForEpoch(3));
}
private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
Records records = log.read(startOffset, isolation).records;
long firstReadOffset = -1L;

View File

@ -712,11 +712,6 @@ public class RaftEventSimulationTest {
nodes.values().forEach(state -> {
state.store.writeElectionState(election);
if (election.hasLeader()) {
Optional<OffsetAndEpoch> endOffset = state.log.endOffsetForEpoch(election.epoch);
if (!endOffset.isPresent())
state.log.assignEpochStartOffset(election.epoch, state.log.endOffset().offset);
}
});
}