From d0ff8697189bda65b5381a95cccf6c4428b08637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Wed, 19 Oct 2022 12:07:07 -0700 Subject: [PATCH] MINOR; Add accessor methods to OffsetAndEpoch (#12770) Accessor are preferred over fields because they compose better with Java's lambda syntax. Reviewers: Jason Gustafson --- .../org/apache/kafka/image/MetadataDelta.java | 4 +- .../apache/kafka/image/MetadataImageTest.java | 4 +- .../image/writer/RaftSnapshotWriterTest.java | 4 +- .../kafka/metadata/RecordTestUtils.java | 4 +- .../apache/kafka/metalog/LocalLogManager.java | 6 +- .../apache/kafka/raft/KafkaRaftClient.java | 30 +-- .../org/apache/kafka/raft/OffsetAndEpoch.java | 12 +- .../org/apache/kafka/raft/QuorumState.java | 6 +- .../org/apache/kafka/raft/ReplicatedLog.java | 16 +- .../raft/internals/KafkaRaftMetrics.java | 4 +- .../kafka/snapshot/RecordsSnapshotReader.java | 4 +- .../kafka/snapshot/RecordsSnapshotWriter.java | 8 +- .../org/apache/kafka/snapshot/Snapshots.java | 2 +- .../raft/KafkaRaftClientSnapshotTest.java | 206 +++++++++--------- .../java/org/apache/kafka/raft/MockLog.java | 32 +-- .../org/apache/kafka/raft/MockLogTest.java | 30 +-- .../kafka/raft/RaftClientTestContext.java | 4 +- .../kafka/raft/RaftEventSimulationTest.java | 16 +- .../snapshot/SnapshotWriterReaderTest.java | 14 +- 19 files changed, 207 insertions(+), 199 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index d12885ce7bc..a82216769d0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -71,8 +71,8 @@ public final class MetadataDelta { public MetadataDelta(MetadataImage image) { this.image = image; - this.highestOffset = image.highestOffsetAndEpoch().offset; - this.highestEpoch = image.highestOffsetAndEpoch().epoch; + this.highestOffset = image.highestOffsetAndEpoch().offset(); + this.highestEpoch = image.highestOffsetAndEpoch().epoch(); } public MetadataImage image() { diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index e471ae499e0..290214369cc 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -91,8 +91,8 @@ public class MetadataImageTest { image.write(writer, new ImageWriterOptions.Builder().build()); MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); RecordTestUtils.replayAll(delta, - image.highestOffsetAndEpoch().offset, - image.highestOffsetAndEpoch().epoch, + image.highestOffsetAndEpoch().offset(), + image.highestOffsetAndEpoch().epoch(), writer.records()); MetadataImage nextImage = delta.apply(); assertEquals(image, nextImage); diff --git a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java index 3a8643c9a6f..4cdacfe7e1e 100644 --- a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java @@ -48,12 +48,12 @@ public class RaftSnapshotWriterTest { @Override public long lastContainedLogOffset() { - return snapshotId().offset; + return snapshotId().offset(); } @Override public int lastContainedLogEpoch() { - return snapshotId().epoch; + return snapshotId().epoch(); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index e1c8d4b6c83..7fd2a29d5b1 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -60,8 +60,8 @@ public class RecordTestUtils { if (target instanceof MetadataDelta) { MetadataDelta delta = (MetadataDelta) target; replayAll(delta, - delta.image().highestOffsetAndEpoch().offset, - delta.image().highestOffsetAndEpoch().epoch, + delta.image().highestOffsetAndEpoch().offset(), + delta.image().highestOffsetAndEpoch().epoch(), recordsAndVersions); return; } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 6457ede9ba2..6822ea72eb9 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -198,7 +198,7 @@ public final class LocalLogManager implements RaftClient, public SharedLogData(Optional snapshot) { if (snapshot.isPresent()) { RawSnapshotReader initialSnapshot = snapshot.get(); - prevOffset = initialSnapshot.snapshotId().offset - 1; + prevOffset = initialSnapshot.snapshotId().offset() - 1; snapshots.put(prevOffset, initialSnapshot); } else { prevOffset = -1; @@ -305,14 +305,14 @@ public final class LocalLogManager implements RaftClient, * Stores a new snapshot and notifies all threads waiting for a snapshot. */ synchronized void addSnapshot(RawSnapshotReader newSnapshot) { - if (newSnapshot.snapshotId().offset - 1 > prevOffset) { + if (newSnapshot.snapshotId().offset() - 1 > prevOffset) { log.error( "Ignored attempt to add a snapshot {} that is greater than the latest offset {}", newSnapshot, prevOffset ); } else { - snapshots.put(newSnapshot.snapshotId().offset - 1, newSnapshot); + snapshots.put(newSnapshot.snapshotId().offset() - 1, newSnapshot); this.notifyAll(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index e73f998d0ec..e08213a0fef 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -262,7 +262,7 @@ public class KafkaRaftClient implements RaftClient { OptionalLong highWatermarkOpt ) { highWatermarkOpt.ifPresent(highWatermark -> { - long newHighWatermark = Math.min(endOffset().offset, highWatermark); + long newHighWatermark = Math.min(endOffset().offset(), highWatermark); if (state.updateHighWatermark(OptionalLong.of(newHighWatermark))) { logger.debug("Follower high watermark updated to {}", newHighWatermark); log.updateHighWatermark(new LogOffsetMetadata(newHighWatermark)); @@ -884,13 +884,13 @@ public class KafkaRaftClient implements RaftClient { switch (validOffsetAndEpoch.kind()) { case DIVERGING: partitionData.divergingEpoch() - .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch) - .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset); + .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()) + .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset()); break; case SNAPSHOT: partitionData.snapshotId() - .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch) - .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset); + .setEpoch(validOffsetAndEpoch.offsetAndEpoch().epoch()) + .setEndOffset(validOffsetAndEpoch.offsetAndEpoch().offset()); break; default: } @@ -1081,9 +1081,9 @@ public class KafkaRaftClient implements RaftClient { divergingEpoch.endOffset(), divergingEpoch.epoch()); state.highWatermark().ifPresent(highWatermark -> { - if (divergingOffsetAndEpoch.offset < highWatermark.offset) { + if (divergingOffsetAndEpoch.offset() < highWatermark.offset) { throw new KafkaException("The leader requested truncation to offset " + - divergingOffsetAndEpoch.offset + ", which is below the current high watermark" + + divergingOffsetAndEpoch.offset() + ", which is below the current high watermark" + " " + highWatermark); } }); @@ -1288,8 +1288,8 @@ public class KafkaRaftClient implements RaftClient { responsePartitionSnapshot -> { addQuorumLeader(responsePartitionSnapshot) .snapshotId() - .setEndOffset(snapshotId.offset) - .setEpoch(snapshotId.epoch); + .setEndOffset(snapshotId.offset()) + .setEpoch(snapshotId.epoch()); return responsePartitionSnapshot .setSize(snapshotSize) @@ -1771,8 +1771,8 @@ public class KafkaRaftClient implements RaftClient { clusterId, quorum.epoch(), quorum.localIdOrThrow(), - endOffset.epoch, - endOffset.offset + endOffset.epoch(), + endOffset.offset() ); } @@ -1805,8 +1805,8 @@ public class KafkaRaftClient implements RaftClient { private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapshotId, long snapshotSize) { FetchSnapshotRequestData.SnapshotId requestSnapshotId = new FetchSnapshotRequestData.SnapshotId() - .setEpoch(snapshotId.epoch) - .setEndOffset(snapshotId.offset); + .setEpoch(snapshotId.epoch()) + .setEndOffset(snapshotId.offset()); FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( clusterId, @@ -1852,7 +1852,7 @@ public class KafkaRaftClient implements RaftClient { LogAppendInfo info = appendAsLeader(batch.data); OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch); CompletableFuture future = appendPurgatory.await( - offsetAndEpoch.offset + 1, Integer.MAX_VALUE); + offsetAndEpoch.offset() + 1, Integer.MAX_VALUE); future.whenComplete((commitTimeMs, exception) -> { if (exception != null) { @@ -2494,7 +2494,7 @@ public class KafkaRaftClient implements RaftClient { */ private void fireHandleSnapshot(SnapshotReader reader) { synchronized (this) { - nextOffset = reader.snapshotId().offset; + nextOffset = reader.snapshotId().offset(); lastSent = null; } diff --git a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java index a4b98d757c1..32e3daee8cf 100644 --- a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java +++ b/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java @@ -17,14 +17,22 @@ package org.apache.kafka.raft; public class OffsetAndEpoch implements Comparable { - public final long offset; - public final int epoch; + private final long offset; + private final int epoch; public OffsetAndEpoch(long offset, int epoch) { this.offset = offset; this.epoch = epoch; } + public long offset() { + return offset; + } + + public int epoch() { + return epoch; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 23447a91907..55a27c97c8c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -139,13 +139,13 @@ public class QuorumState { throw new IllegalStateException("Initialized quorum state " + election + " with a voted candidate, which indicates this node was previously " + " a voter, but the local id " + localIdDescription); - } else if (election.epoch < logEndOffsetAndEpoch.epoch) { + } else if (election.epoch < logEndOffsetAndEpoch.epoch()) { log.warn("Epoch from quorum-state file is {}, which is " + "smaller than last written epoch {} in the log", - election.epoch, logEndOffsetAndEpoch.epoch); + election.epoch, logEndOffsetAndEpoch.epoch()); initialState = new UnattachedState( time, - logEndOffsetAndEpoch.epoch, + logEndOffsetAndEpoch.epoch(), voters, Optional.empty(), randomElectionTimeoutMs(), diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index b71de32b755..7e1bd02c1b5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -88,8 +88,8 @@ public interface ReplicatedLog extends AutoCloseable { Optional earliestSnapshotId = earliestSnapshotId(); if (earliestSnapshotId.isPresent() && ((offset < startOffset()) || - (offset == startOffset() && epoch != earliestSnapshotId.get().epoch) || - (epoch < earliestSnapshotId.get().epoch)) + (offset == startOffset() && epoch != earliestSnapshotId.get().epoch()) || + (epoch < earliestSnapshotId.get().epoch())) ) { /* Send a snapshot if the leader has a snapshot at the log start offset and * 1. the fetch offset is less than the log start offset or @@ -108,7 +108,7 @@ public interface ReplicatedLog extends AutoCloseable { } else { OffsetAndEpoch endOffsetAndEpoch = endOffsetForEpoch(epoch); - if (endOffsetAndEpoch.epoch != epoch || endOffsetAndEpoch.offset < offset) { + if (endOffsetAndEpoch.epoch() != epoch || endOffsetAndEpoch.offset() < offset) { return ValidOffsetAndEpoch.diverging(endOffsetAndEpoch); } else { return ValidOffsetAndEpoch.valid(new OffsetAndEpoch(offset, epoch)); @@ -217,15 +217,15 @@ public interface ReplicatedLog extends AutoCloseable { */ default long truncateToEndOffset(OffsetAndEpoch endOffset) { final long truncationOffset; - int leaderEpoch = endOffset.epoch; + int leaderEpoch = endOffset.epoch(); if (leaderEpoch == 0) { - truncationOffset = Math.min(endOffset.offset, endOffset().offset); + truncationOffset = Math.min(endOffset.offset(), endOffset().offset); } else { OffsetAndEpoch localEndOffset = endOffsetForEpoch(leaderEpoch); - if (localEndOffset.epoch == leaderEpoch) { - truncationOffset = Math.min(localEndOffset.offset, endOffset.offset); + if (localEndOffset.epoch() == leaderEpoch) { + truncationOffset = Math.min(localEndOffset.offset(), endOffset.offset()); } else { - truncationOffset = localEndOffset.offset; + truncationOffset = localEndOffset.offset(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java index 96eb87f3def..b6e03381106 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java @@ -102,10 +102,10 @@ public class KafkaRaftMetrics implements AutoCloseable { metrics.addMetric(this.highWatermarkMetricName, (mConfig, currentTimeMs) -> state.highWatermark().map(hw -> hw.offset).orElse(-1L)); this.logEndOffsetMetricName = metrics.metricName("log-end-offset", metricGroupName, "The current raft log end offset."); - metrics.addMetric(this.logEndOffsetMetricName, (mConfig, currentTimeMs) -> logEndOffset.offset); + metrics.addMetric(this.logEndOffsetMetricName, (mConfig, currentTimeMs) -> logEndOffset.offset()); this.logEndEpochMetricName = metrics.metricName("log-end-epoch", metricGroupName, "The current raft log end epoch."); - metrics.addMetric(this.logEndEpochMetricName, (mConfig, currentTimeMs) -> logEndOffset.epoch); + metrics.addMetric(this.logEndEpochMetricName, (mConfig, currentTimeMs) -> logEndOffset.epoch()); this.numUnknownVoterConnectionsMetricName = metrics.metricName("number-unknown-voter-connections", metricGroupName, "Number of unknown voters whose connection information is not cached; would never be larger than quorum-size."); diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 92b695146c3..71380038ad7 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -49,12 +49,12 @@ public final class RecordsSnapshotReader implements SnapshotReader { @Override public long lastContainedLogOffset() { - return snapshotId.offset - 1; + return snapshotId.offset() - 1; } @Override public int lastContainedLogEpoch() { - return snapshotId.epoch; + return snapshotId.epoch(); } @Override diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java index 1b0e2402a27..859a0259445 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -52,7 +52,7 @@ final public class RecordsSnapshotWriter implements SnapshotWriter { this.lastContainedLogTimestamp = lastContainedLogTimestamp; this.accumulator = new BatchAccumulator<>( - snapshot.snapshotId().epoch, + snapshot.snapshotId().epoch(), 0, Integer.MAX_VALUE, maxBatchSize, @@ -140,12 +140,12 @@ final public class RecordsSnapshotWriter implements SnapshotWriter { @Override public long lastContainedLogOffset() { - return snapshot.snapshotId().offset - 1; + return snapshot.snapshotId().offset() - 1; } @Override public int lastContainedLogEpoch() { - return snapshot.snapshotId().epoch; + return snapshot.snapshotId().epoch(); } @Override @@ -164,7 +164,7 @@ final public class RecordsSnapshotWriter implements SnapshotWriter { throw new IllegalStateException(message); } - accumulator.append(snapshot.snapshotId().epoch, records); + accumulator.append(snapshot.snapshotId().epoch(), records); if (accumulator.needsDrain(time.milliseconds())) { appendBatches(accumulator.drain()); diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index a41f6485fa2..e7a57ae183b 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -53,7 +53,7 @@ public final class Snapshots { } static String filenameFromSnapshotId(OffsetAndEpoch snapshotId) { - return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset), EPOCH_FORMATTER.format(snapshotId.epoch)); + return String.format("%s-%s", OFFSET_FORMATTER.format(snapshotId.offset()), EPOCH_FORMATTER.format(snapshotId.epoch())); } static Path moveRename(Path source, OffsetAndEpoch snapshotId) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 754c2e2c55e..ea8677b1df4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -60,8 +60,8 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) .deleteBeforeSnapshot(snapshotId) .build(); @@ -92,8 +92,8 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) .deleteBeforeSnapshot(snapshotId) .withElectedLeader(epoch, leaderId) @@ -103,7 +103,7 @@ final public class KafkaRaftClientSnapshotTest { long localLogEndOffset = context.log.endOffset().offset; context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch); + context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); context.deliverResponse( fetchRequest.correlationId, fetchRequest.destinationId(), @@ -111,7 +111,7 @@ final public class KafkaRaftClientSnapshotTest { ); context.pollUntilRequest(); - context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch); + context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch()); // Check that listener was notified of the new snapshot try (SnapshotReader snapshot = context.listener.drainHandledSnapshot().get()) { @@ -129,8 +129,8 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) .deleteBeforeSnapshot(snapshotId) .withElectedLeader(epoch, leaderId) @@ -140,7 +140,7 @@ final public class KafkaRaftClientSnapshotTest { long localLogEndOffset = context.log.endOffset().offset; context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch); + context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); context.deliverResponse( fetchRequest.correlationId, fetchRequest.destinationId(), @@ -148,7 +148,7 @@ final public class KafkaRaftClientSnapshotTest { ); context.pollUntilRequest(); - context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch); + context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch()); RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId)); context.client.register(secondListener); @@ -169,9 +169,9 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(snapshotId.epoch, Arrays.asList("d", "e", "f")) - .appendToLog(snapshotId.epoch, Arrays.asList("g", "h", "i")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) + .appendToLog(snapshotId.epoch(), Arrays.asList("g", "h", "i")) .withEmptySnapshot(snapshotId) .deleteBeforeSnapshot(snapshotId) .build(); @@ -197,7 +197,7 @@ final public class KafkaRaftClientSnapshotTest { // Generate a new snapshot OffsetAndEpoch secondSnapshotId = new OffsetAndEpoch(localLogEndOffset, epoch); - try (SnapshotWriter snapshot = context.client.createSnapshot(secondSnapshotId.offset - 1, secondSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(secondSnapshotId.offset() - 1, secondSnapshotId.epoch(), 0).get()) { assertEquals(secondSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -243,7 +243,7 @@ final public class KafkaRaftClientSnapshotTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch); - try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset() - 1, snapshotId.epoch(), 0).get()) { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -257,8 +257,8 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); assertEquals(localId, partitionResponse.currentLeader().leaderId()); - assertEquals(snapshotId.epoch, partitionResponse.snapshotId().epoch()); - assertEquals(snapshotId.offset, partitionResponse.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), partitionResponse.snapshotId().epoch()); + assertEquals(snapshotId.offset(), partitionResponse.snapshotId().endOffset()); } @Test @@ -270,20 +270,20 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3, 2); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("d", "e", "f")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")) .withAppendLingerMs(1) .build(); context.becomeLeader(); int epoch = context.currentEpoch(); - assertEquals(oldestSnapshotId.epoch + 1, epoch); + assertEquals(oldestSnapshotId.epoch() + 1, epoch); // Advance the highWatermark context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark - try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset() - 1, oldestSnapshotId.epoch(), 0).get()) { assertEquals(oldestSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -294,7 +294,7 @@ final public class KafkaRaftClientSnapshotTest { context.client.poll(); // It is an invalid request to send an last fetched epoch greater than the current epoch - context.deliverRequest(context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset + 1, epoch + 1, 0)); + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset() + 1, epoch + 1, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); } @@ -309,20 +309,20 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3, 2); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(oldestSnapshotId.epoch + 2, Arrays.asList("d", "e", "f")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("d", "e", "f")) .withAppendLingerMs(1) .build(); context.becomeLeader(); int epoch = context.currentEpoch(); - assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); + assertEquals(oldestSnapshotId.epoch() + 2 + 1, epoch); // Advance the highWatermark context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark - try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset() - 1, oldestSnapshotId.epoch(), 0).get()) { assertEquals(oldestSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -330,15 +330,15 @@ final public class KafkaRaftClientSnapshotTest { // This should truncate to the old snapshot context.deliverRequest( - context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset + 1, oldestSnapshotId.epoch + 1, 0) + context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset() + 1, oldestSnapshotId.epoch() + 1, 0) ); context.pollUntilResponse(); FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse(); assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); assertEquals(localId, partitionResponse.currentLeader().leaderId()); - assertEquals(oldestSnapshotId.epoch, partitionResponse.divergingEpoch().epoch()); - assertEquals(oldestSnapshotId.offset, partitionResponse.divergingEpoch().endOffset()); + assertEquals(oldestSnapshotId.epoch(), partitionResponse.divergingEpoch().epoch()); + assertEquals(oldestSnapshotId.offset(), partitionResponse.divergingEpoch().endOffset()); } @Test @@ -351,21 +351,21 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3, 2); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("d", "e", "f")) - .appendToLog(oldestSnapshotId.epoch + 2, Arrays.asList("g", "h", "i")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")) + .appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")) .withAppendLingerMs(1) .build(); context.becomeLeader(); int epoch = context.currentEpoch(); - assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); + assertEquals(oldestSnapshotId.epoch() + 2 + 1, epoch); // Advance the highWatermark context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark - try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset() - 1, oldestSnapshotId.epoch(), 0).get()) { assertEquals(oldestSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -373,7 +373,7 @@ final public class KafkaRaftClientSnapshotTest { // Send fetch request at log start offset with valid last fetched epoch context.deliverRequest( - context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset, oldestSnapshotId.epoch, 0) + context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset(), oldestSnapshotId.epoch(), 0) ); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); @@ -389,21 +389,21 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3, 2); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("d", "e", "f")) - .appendToLog(oldestSnapshotId.epoch + 2, Arrays.asList("g", "h", "i")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")) + .appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")) .withAppendLingerMs(1) .build(); context.becomeLeader(); int epoch = context.currentEpoch(); - assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); + assertEquals(oldestSnapshotId.epoch() + 2 + 1, epoch); // Advance the highWatermark context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark - try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset() - 1, oldestSnapshotId.epoch(), 0).get()) { assertEquals(oldestSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -412,15 +412,15 @@ final public class KafkaRaftClientSnapshotTest { // Send fetch with log start offset and invalid last fetched epoch context.deliverRequest( - context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset, oldestSnapshotId.epoch + 1, 0) + context.fetchRequest(epoch, otherNodeId, oldestSnapshotId.offset(), oldestSnapshotId.epoch() + 1, 0) ); context.pollUntilResponse(); FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse(); assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); assertEquals(localId, partitionResponse.currentLeader().leaderId()); - assertEquals(oldestSnapshotId.epoch, partitionResponse.snapshotId().epoch()); - assertEquals(oldestSnapshotId.offset, partitionResponse.snapshotId().endOffset()); + assertEquals(oldestSnapshotId.epoch(), partitionResponse.snapshotId().epoch()); + assertEquals(oldestSnapshotId.offset(), partitionResponse.snapshotId().endOffset()); } @Test @@ -433,21 +433,21 @@ final public class KafkaRaftClientSnapshotTest { OffsetAndEpoch oldestSnapshotId = new OffsetAndEpoch(3, 2); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("a", "b", "c")) - .appendToLog(oldestSnapshotId.epoch, Arrays.asList("d", "e", "f")) - .appendToLog(oldestSnapshotId.epoch + 2, Arrays.asList("g", "h", "i")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("a", "b", "c")) + .appendToLog(oldestSnapshotId.epoch(), Arrays.asList("d", "e", "f")) + .appendToLog(oldestSnapshotId.epoch() + 2, Arrays.asList("g", "h", "i")) .withAppendLingerMs(1) .build(); context.becomeLeader(); int epoch = context.currentEpoch(); - assertEquals(oldestSnapshotId.epoch + 2 + 1, epoch); + assertEquals(oldestSnapshotId.epoch() + 2 + 1, epoch); // Advance the highWatermark context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create a snapshot at the high watermark - try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset - 1, oldestSnapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(oldestSnapshotId.offset() - 1, oldestSnapshotId.epoch(), 0).get()) { assertEquals(oldestSnapshotId, snapshot.snapshotId()); snapshot.freeze(); } @@ -459,7 +459,7 @@ final public class KafkaRaftClientSnapshotTest { epoch, otherNodeId, context.log.endOffset().offset, - oldestSnapshotId.epoch - 1, + oldestSnapshotId.epoch() - 1, 0 ) ); @@ -468,8 +468,8 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); assertEquals(localId, partitionResponse.currentLeader().leaderId()); - assertEquals(oldestSnapshotId.epoch, partitionResponse.snapshotId().epoch()); - assertEquals(oldestSnapshotId.offset, partitionResponse.snapshotId().endOffset()); + assertEquals(oldestSnapshotId.epoch(), partitionResponse.snapshotId().epoch()); + assertEquals(oldestSnapshotId.offset(), partitionResponse.snapshotId().endOffset()); } @Test @@ -529,7 +529,7 @@ final public class KafkaRaftClientSnapshotTest { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a")) .build(); context.becomeLeader(); @@ -537,7 +537,7 @@ final public class KafkaRaftClientSnapshotTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset() - 1, snapshotId.epoch(), 0).get()) { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.append(records); snapshot.freeze(); @@ -578,7 +578,7 @@ final public class KafkaRaftClientSnapshotTest { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, records) + .appendToLog(snapshotId.epoch(), records) .build(); context.becomeLeader(); @@ -586,7 +586,7 @@ final public class KafkaRaftClientSnapshotTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset() - 1, snapshotId.epoch(), 0).get()) { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.append(records); snapshot.freeze(); @@ -687,7 +687,7 @@ final public class KafkaRaftClientSnapshotTest { List records = Arrays.asList("foo", "bar"); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .appendToLog(snapshotId.epoch, Arrays.asList("a")) + .appendToLog(snapshotId.epoch(), Arrays.asList("a")) .build(); context.becomeLeader(); @@ -695,7 +695,7 @@ final public class KafkaRaftClientSnapshotTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset - 1, snapshotId.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId.offset() - 1, snapshotId.epoch(), 0).get()) { assertEquals(snapshotId, snapshot.snapshotId()); snapshot.append(records); snapshot.freeze(); @@ -882,8 +882,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); List records = Arrays.asList("foo", "bar"); @@ -909,7 +909,7 @@ final public class KafkaRaftClientSnapshotTest { context.pollUntilRequest(); fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch); + context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch()); // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); @@ -953,8 +953,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); List records = Arrays.asList("foo", "bar"); @@ -989,8 +989,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(sendingBuffer.limit(), request.position()); sendingBuffer = memorySnapshot.buffer().slice(); @@ -1012,7 +1012,7 @@ final public class KafkaRaftClientSnapshotTest { context.pollUntilRequest(); fetchRequest = context.assertSentFetchRequest(); - context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset, snapshotId.epoch); + context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch()); // Check that the snapshot was written to the log RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); @@ -1056,8 +1056,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with a snapshot not found error @@ -1114,8 +1114,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with new leader response @@ -1171,8 +1171,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with new leader epoch @@ -1228,8 +1228,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with unknown leader epoch @@ -1260,8 +1260,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); } @@ -1295,8 +1295,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with an invalid snapshot id endOffset @@ -1314,7 +1314,7 @@ final public class KafkaRaftClientSnapshotTest { responsePartitionSnapshot .snapshotId() .setEndOffset(-1) - .setEpoch(snapshotId.epoch); + .setEpoch(snapshotId.epoch()); return responsePartitionSnapshot; } @@ -1342,8 +1342,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Reply with an invalid snapshot id epoch @@ -1360,7 +1360,7 @@ final public class KafkaRaftClientSnapshotTest { responsePartitionSnapshot .snapshotId() - .setEndOffset(snapshotId.offset) + .setEndOffset(snapshotId.offset()) .setEpoch(-1); return responsePartitionSnapshot; @@ -1406,8 +1406,8 @@ final public class KafkaRaftClientSnapshotTest { localId, Integer.MAX_VALUE ).get(); - assertEquals(snapshotId.offset, request.snapshotId().endOffset()); - assertEquals(snapshotId.epoch, request.snapshotId().epoch()); + assertEquals(snapshotId.offset(), request.snapshotId().endOffset()); + assertEquals(snapshotId.epoch(), request.snapshotId().epoch()); assertEquals(0, request.position()); // Sleeping for fetch timeout should transition to candidate @@ -1432,8 +1432,8 @@ final public class KafkaRaftClientSnapshotTest { responsePartitionSnapshot .snapshotId() - .setEndOffset(snapshotId.offset) - .setEpoch(snapshotId.epoch); + .setEndOffset(snapshotId.offset()) + .setEpoch(snapshotId.epoch()); return responsePartitionSnapshot; } @@ -1532,7 +1532,7 @@ final public class KafkaRaftClientSnapshotTest { // When leader creating snapshot: // 1.1 high watermark cannot be empty assertEquals(OptionalLong.empty(), context.client.highWatermark()); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset(), invalidSnapshotId1.epoch(), 0)); // 1.2 high watermark must larger than or equal to the snapshotId's endOffset context.advanceLocalLeaderHighWatermarkToLogEndOffset(); @@ -1544,17 +1544,17 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong() + newRecords.size()); OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset(), invalidSnapshotId2.epoch(), 0)); // 2 the quorum epoch must larger than or equal to the snapshotId's epoch OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() - 2, currentEpoch + 1); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset(), invalidSnapshotId3.epoch(), 0)); // 3 the snapshotId should be validated against endOffsetForEpoch OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(epoch); - assertEquals(epoch, endOffsetForEpoch.epoch); - OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0)); + assertEquals(epoch, endOffsetForEpoch.epoch()); + OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset(), invalidSnapshotId4.epoch(), 0)); } @Test @@ -1574,7 +1574,7 @@ final public class KafkaRaftClientSnapshotTest { // 1) The high watermark cannot be empty assertEquals(OptionalLong.empty(), context.client.highWatermark()); OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(0, 0); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset, invalidSnapshotId1.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId1.offset(), invalidSnapshotId1.epoch(), 0)); // Poll for our first fetch request context.pollUntilRequest(); @@ -1592,11 +1592,11 @@ final public class KafkaRaftClientSnapshotTest { // 2) The high watermark must be larger than or equal to the snapshotId's endOffset int currentEpoch = context.currentEpoch(); OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset, invalidSnapshotId2.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2.offset(), invalidSnapshotId2.epoch(), 0)); // 3) The quorum epoch must be larger than or equal to the snapshotId's epoch OffsetAndEpoch invalidSnapshotId3 = new OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset, invalidSnapshotId3.epoch, 0)); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId3.offset(), invalidSnapshotId3.epoch(), 0)); // The high watermark advances to be larger than log.endOffsetForEpoch(3), to test the case 3 context.pollUntilRequest(); @@ -1613,9 +1613,9 @@ final public class KafkaRaftClientSnapshotTest { // 4) The snapshotId should be validated against endOffsetForEpoch OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3); - assertEquals(3, endOffsetForEpoch.epoch); - OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset + 1, epoch); - assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset, invalidSnapshotId4.epoch, 0)); + assertEquals(3, endOffsetForEpoch.epoch()); + OffsetAndEpoch invalidSnapshotId4 = new OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch); + assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId4.offset(), invalidSnapshotId4.epoch(), 0)); } private static FetchSnapshotRequestData fetchSnapshotRequest( @@ -1637,8 +1637,8 @@ final public class KafkaRaftClientSnapshotTest { long position ) { FetchSnapshotRequestData.SnapshotId snapshotId = new FetchSnapshotRequestData.SnapshotId() - .setEndOffset(offsetAndEpoch.offset) - .setEpoch(offsetAndEpoch.epoch); + .setEndOffset(offsetAndEpoch.offset()) + .setEpoch(offsetAndEpoch.epoch()); FetchSnapshotRequestData request = FetchSnapshotRequest.singleton( clusterId, @@ -1671,8 +1671,8 @@ final public class KafkaRaftClientSnapshotTest { .setLeaderId(leaderId); partitionSnapshot.snapshotId() - .setEndOffset(snapshotId.offset) - .setEpoch(snapshotId.epoch); + .setEndOffset(snapshotId.offset()) + .setEpoch(snapshotId.epoch()); return partitionSnapshot .setSize(size) @@ -1698,8 +1698,8 @@ final public class KafkaRaftClientSnapshotTest { .setLeaderId(leaderId); partitionData.snapshotId() - .setEpoch(snapshotId.epoch) - .setEndOffset(snapshotId.offset); + .setEpoch(snapshotId.epoch()) + .setEndOffset(snapshotId.offset()); }); } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java index c4b4c102b83..85aaa598183 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java @@ -92,14 +92,14 @@ public class MockLog implements ReplicatedLog { public boolean truncateToLatestSnapshot() { AtomicBoolean truncated = new AtomicBoolean(false); latestSnapshotId().ifPresent(snapshotId -> { - if (snapshotId.epoch > logLastFetchedEpoch().orElse(0) || - (snapshotId.epoch == logLastFetchedEpoch().orElse(0) && - snapshotId.offset > endOffset().offset)) { + if (snapshotId.epoch() > logLastFetchedEpoch().orElse(0) || + (snapshotId.epoch() == logLastFetchedEpoch().orElse(0) && + snapshotId.offset() > endOffset().offset)) { batches.clear(); epochStartOffsets.clear(); snapshots.headMap(snapshotId, false).clear(); - updateHighWatermark(new LogOffsetMetadata(snapshotId.offset)); + updateHighWatermark(new LogOffsetMetadata(snapshotId.offset())); flush(false); truncated.set(true); @@ -188,7 +188,7 @@ public class MockLog implements ReplicatedLog { @Override public int lastFetchedEpoch() { - return logLastFetchedEpoch().orElseGet(() -> latestSnapshotId().map(id -> id.epoch).orElse(0)); + return logLastFetchedEpoch().orElseGet(() -> latestSnapshotId().map(OffsetAndEpoch::epoch).orElse(0)); } @Override @@ -201,7 +201,7 @@ public class MockLog implements ReplicatedLog { } private OffsetAndEpoch lastOffsetAndEpochFiltered(Predicate predicate) { - int epochLowerBound = earliestSnapshotId().map(id -> id.epoch).orElse(0); + int epochLowerBound = earliestSnapshotId().map(OffsetAndEpoch::epoch).orElse(0); for (EpochStartOffset epochStartOffset : epochStartOffsets) { if (!predicate.test(epochStartOffset)) { return new OffsetAndEpoch(epochStartOffset.startOffset, epochLowerBound); @@ -230,7 +230,7 @@ public class MockLog implements ReplicatedLog { .map(entry -> entry.offset + 1) .orElse( latestSnapshotId() - .map(id -> id.offset) + .map(OffsetAndEpoch::offset) .orElse(0L) ); return new LogOffsetMetadata(nextOffset, Optional.of(new MockOffsetMetadata(nextId))); @@ -242,7 +242,7 @@ public class MockLog implements ReplicatedLog { .map(entry -> entry.offset) .orElse( earliestSnapshotId() - .map(id -> id.offset) + .map(OffsetAndEpoch::offset) .orElse(0L) ); } @@ -436,7 +436,7 @@ public class MockLog implements ReplicatedLog { @Override public Optional createNewSnapshot(OffsetAndEpoch snapshotId) { - if (snapshotId.offset < startOffset()) { + if (snapshotId.offset() < startOffset()) { logger.info( "Cannot create a snapshot with an id ({}) less than the log start offset ({})", snapshotId, @@ -447,7 +447,7 @@ public class MockLog implements ReplicatedLog { } long highWatermarkOffset = highWatermark().offset; - if (snapshotId.offset > highWatermarkOffset) { + if (snapshotId.offset() > highWatermarkOffset) { throw new IllegalArgumentException( String.format( "Cannot create a snapshot with an id (%s) greater than the high-watermark (%s)", @@ -457,7 +457,7 @@ public class MockLog implements ReplicatedLog { ); } - ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset, snapshotId.epoch); + ValidOffsetAndEpoch validOffsetAndEpoch = validateOffsetAndEpoch(snapshotId.offset(), snapshotId.epoch()); if (validOffsetAndEpoch.kind() != ValidOffsetAndEpoch.Kind.VALID) { throw new IllegalArgumentException( String.format( @@ -511,7 +511,7 @@ public class MockLog implements ReplicatedLog { @Override public boolean deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { - if (startOffset() > snapshotId.offset) { + if (startOffset() > snapshotId.offset()) { throw new OffsetOutOfRangeException( String.format( "New log start (%s) is less than the curent log start offset (%s)", @@ -520,7 +520,7 @@ public class MockLog implements ReplicatedLog { ) ); } - if (highWatermark.offset < snapshotId.offset) { + if (highWatermark.offset < snapshotId.offset()) { throw new OffsetOutOfRangeException( String.format( "New log start (%s) is greater than the high watermark (%s)", @@ -534,11 +534,11 @@ public class MockLog implements ReplicatedLog { if (snapshots.containsKey(snapshotId)) { snapshots.headMap(snapshotId, false).clear(); - batches.removeIf(entry -> entry.lastOffset() < snapshotId.offset); + batches.removeIf(entry -> entry.lastOffset() < snapshotId.offset()); AtomicReference> last = new AtomicReference<>(Optional.empty()); epochStartOffsets.removeIf(epochStartOffset -> { - if (epochStartOffset.startOffset <= snapshotId.offset) { + if (epochStartOffset.startOffset <= snapshotId.offset()) { last.set(Optional.of(epochStartOffset)); return true; } @@ -549,7 +549,7 @@ public class MockLog implements ReplicatedLog { last.get().ifPresent(epochStartOffset -> { epochStartOffsets.add( 0, - new EpochStartOffset(epochStartOffset.epoch, snapshotId.offset) + new EpochStartOffset(epochStartOffset.epoch, snapshotId.offset()) ); }); diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index 88ee3a629f0..d70c051275d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -509,7 +509,7 @@ public class MockLogTest { } assertTrue(log.deleteBeforeSnapshot(snapshotId)); - assertEquals(snapshotId.offset, log.startOffset()); + assertEquals(snapshotId.offset(), log.startOffset()); assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, epoch))); } @@ -528,7 +528,7 @@ public class MockLogTest { } assertTrue(log.deleteBeforeSnapshot(snapshotId)); - assertEquals(snapshotId.offset, log.startOffset()); + assertEquals(snapshotId.offset(), log.startOffset()); assertThrows( IllegalArgumentException.class, @@ -578,7 +578,7 @@ public class MockLogTest { } assertTrue(log.deleteBeforeSnapshot(snapshotId)); - assertEquals(snapshotId.offset, log.startOffset()); + assertEquals(snapshotId.offset(), log.startOffset()); assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId)); } @@ -665,10 +665,10 @@ public class MockLogTest { } assertTrue(log.truncateToLatestSnapshot()); - assertEquals(sameEpochSnapshotId.offset, log.startOffset()); - assertEquals(sameEpochSnapshotId.epoch, log.lastFetchedEpoch()); - assertEquals(sameEpochSnapshotId.offset, log.endOffset().offset); - assertEquals(sameEpochSnapshotId.offset, log.highWatermark().offset); + assertEquals(sameEpochSnapshotId.offset(), log.startOffset()); + assertEquals(sameEpochSnapshotId.epoch(), log.lastFetchedEpoch()); + assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset); + assertEquals(sameEpochSnapshotId.offset(), log.highWatermark().offset); OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1); @@ -679,10 +679,10 @@ public class MockLogTest { } assertTrue(log.truncateToLatestSnapshot()); - assertEquals(greaterEpochSnapshotId.offset, log.startOffset()); - assertEquals(greaterEpochSnapshotId.epoch, log.lastFetchedEpoch()); - assertEquals(greaterEpochSnapshotId.offset, log.endOffset().offset); - assertEquals(greaterEpochSnapshotId.offset, log.highWatermark().offset); + assertEquals(greaterEpochSnapshotId.offset(), log.startOffset()); + assertEquals(greaterEpochSnapshotId.epoch(), log.lastFetchedEpoch()); + assertEquals(greaterEpochSnapshotId.offset(), log.endOffset().offset); + assertEquals(greaterEpochSnapshotId.offset(), log.highWatermark().offset); } @Test @@ -716,7 +716,7 @@ public class MockLogTest { OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch); appendBatch(numberOfRecords, epoch); - log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset)); + log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset())); try (RawSnapshotWriter snapshot = log.createNewSnapshot(sameEpochSnapshotId).get()) { snapshot.freeze(); @@ -740,15 +740,15 @@ public class MockLogTest { OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(numberOfRecords, epoch); appendBatch(numberOfRecords, epoch); - log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset)); + log.updateHighWatermark(new LogOffsetMetadata(sameEpochSnapshotId.offset())); try (RawSnapshotWriter snapshot = log.createNewSnapshot(sameEpochSnapshotId).get()) { snapshot.freeze(); } OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch + 1); - appendBatch(numberOfRecords, greaterEpochSnapshotId.epoch); - log.updateHighWatermark(new LogOffsetMetadata(greaterEpochSnapshotId.offset)); + appendBatch(numberOfRecords, greaterEpochSnapshotId.epoch()); + log.updateHighWatermark(new LogOffsetMetadata(greaterEpochSnapshotId.offset())); try (RawSnapshotWriter snapshot = log.createNewSnapshot(greaterEpochSnapshotId).get()) { snapshot.freeze(); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 3af4ba75dfd..c3fac2ebe8f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -196,8 +196,8 @@ public final class RaftClientTestContext { } Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) throws IOException { - if (snapshotId.offset > log.highWatermark().offset) { - log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset)); + if (snapshotId.offset() > log.highWatermark().offset) { + log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset())); } log.deleteBeforeSnapshot(snapshotId); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index a6117a33ca0..b12cd163d71 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -1024,16 +1024,16 @@ public class RaftEventSimulationTest { log.earliestSnapshotId().ifPresent(earliestSnapshotId -> { long logStartOffset = log.startOffset(); ValidOffsetAndEpoch validateOffsetAndEpoch = log.validateOffsetAndEpoch( - earliestSnapshotId.offset, - earliestSnapshotId.epoch + earliestSnapshotId.offset(), + earliestSnapshotId.epoch() ); assertTrue( - logStartOffset <= earliestSnapshotId.offset, + logStartOffset <= earliestSnapshotId.offset(), () -> String.format( "invalid log start offset (%s) and snapshotId offset (%s): nodeId = %s", logStartOffset, - earliestSnapshotId.offset, + earliestSnapshotId.offset(), nodeId ) ); @@ -1046,7 +1046,7 @@ public class RaftEventSimulationTest { if (logStartOffset > 0) { assertEquals( logStartOffset, - earliestSnapshotId.offset, + earliestSnapshotId.offset(), () -> String.format("mising snapshot at log start offset: nodeId = %s", nodeId) ); } @@ -1108,8 +1108,8 @@ public class RaftEventSimulationTest { AtomicLong startOffset = new AtomicLong(0); log.earliestSnapshotId().ifPresent(snapshotId -> { - assertTrue(snapshotId.offset <= highWatermark.getAsLong()); - startOffset.set(snapshotId.offset); + assertTrue(snapshotId.offset() <= highWatermark.getAsLong()); + startOffset.set(snapshotId.offset()); try (SnapshotReader snapshot = RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true)) { @@ -1120,7 +1120,7 @@ public class RaftEventSimulationTest { assertEquals(1, batch.records().size()); // The snapshotId offset is an "end offset" - long offset = snapshotId.offset - 1; + long offset = snapshotId.offset() - 1; int sequence = batch.records().get(0); committedSequenceNumbers.putIfAbsent(offset, sequence); diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index 3b5a795ed48..c0fd2dc3f97 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -64,7 +64,7 @@ final public class SnapshotWriterReaderTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); // Create an empty snapshot and freeze it immediately - try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset - 1, id.epoch, magicTimestamp).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset() - 1, id.epoch(), magicTimestamp).get()) { assertEquals(id, snapshot.snapshotId()); snapshot.freeze(); } @@ -88,7 +88,7 @@ final public class SnapshotWriterReaderTest { RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters); for (List batch : expected) { - contextBuilder.appendToLog(id.epoch, batch); + contextBuilder.appendToLog(id.epoch(), batch); } RaftClientTestContext context = contextBuilder.build(); @@ -97,7 +97,7 @@ final public class SnapshotWriterReaderTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset - 1, id.epoch, magicTimestamp).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset() - 1, id.epoch(), magicTimestamp).get()) { assertEquals(id, snapshot.snapshotId()); expected.forEach(batch -> assertDoesNotThrow(() -> snapshot.append(batch))); snapshot.freeze(); @@ -120,7 +120,7 @@ final public class SnapshotWriterReaderTest { RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters); for (List batch : expected) { - contextBuilder.appendToLog(id.epoch, batch); + contextBuilder.appendToLog(id.epoch(), batch); } RaftClientTestContext context = contextBuilder.build(); @@ -129,7 +129,7 @@ final public class SnapshotWriterReaderTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset - 1, id.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset() - 1, id.epoch(), 0).get()) { assertEquals(id, snapshot.snapshotId()); expected.forEach(batch -> { assertDoesNotThrow(() -> snapshot.append(batch)); @@ -148,7 +148,7 @@ final public class SnapshotWriterReaderTest { RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters); for (List batch : expected) { - contextBuilder.appendToLog(id.epoch, batch); + contextBuilder.appendToLog(id.epoch(), batch); } RaftClientTestContext context = contextBuilder.build(); @@ -157,7 +157,7 @@ final public class SnapshotWriterReaderTest { context.advanceLocalLeaderHighWatermarkToLogEndOffset(); - try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset - 1, id.epoch, 0).get()) { + try (SnapshotWriter snapshot = context.client.createSnapshot(id.offset() - 1, id.epoch(), 0).get()) { assertEquals(id, snapshot.snapshotId()); expected.forEach(batch -> { assertDoesNotThrow(() -> snapshot.append(batch));