diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java index 902bc62cbb9..1e851f2f4fb 100644 --- a/core/src/main/java/kafka/server/TierStateMachine.java +++ b/core/src/main/java/kafka/server/TierStateMachine.java @@ -96,7 +96,7 @@ public class TierStateMachine { PartitionFetchState currentFetchState, PartitionData fetchPartitionData) throws Exception { OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch()); - int epoch = epochAndLeaderLocalStartOffset.leaderEpoch(); + int epoch = epochAndLeaderLocalStartOffset.epoch(); long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset(); long offsetToFetch; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1e19a37cb2a..930c08d607d 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1730,7 +1730,7 @@ class Partition(val topicPartition: TopicPartition, case Some(epochAndOffset) => new EpochEndOffset() .setPartition(partitionId) .setErrorCode(Errors.NONE.code) - .setLeaderEpoch(epochAndOffset.leaderEpoch) + .setLeaderEpoch(epochAndOffset.epoch()) .setEndOffset(epochAndOffset.offset) case None => new EpochEndOffset() .setPartition(partitionId) diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 01f69b374bc..fdb822210a7 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -28,7 +28,8 @@ import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.record.{MemoryRecords, Records} import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} -import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} +import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} +import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.util.Scheduler @@ -139,14 +140,14 @@ final class KafkaMetadataLog private ( (log.endOffsetForEpoch(epoch).toScala, earliestSnapshotId().toScala) match { case (Some(offsetAndEpoch), Some(snapshotId)) if ( offsetAndEpoch.offset == snapshotId.offset && - offsetAndEpoch.leaderEpoch == epoch) => + offsetAndEpoch.epoch() == epoch) => // The epoch is smaller than the smallest epoch on the log. Override the diverging // epoch to the oldest snapshot which should be the snapshot at the log start offset new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch) case (Some(offsetAndEpoch), _) => - new OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + new OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.epoch()) case (None, _) => new OffsetAndEpoch(endOffset.offset, lastFetchedEpoch) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index a65464706c2..06cc94b5979 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -619,7 +619,7 @@ abstract class AbstractFetcherThread(name: String, if (endOffsetForEpochOpt.isPresent) { val offsetAndEpoch = endOffsetForEpochOpt.get val followerEndOffset = offsetAndEpoch.offset - val followerEpoch = offsetAndEpoch.leaderEpoch + val followerEpoch = offsetAndEpoch.epoch() if (followerEpoch != leaderEpochOffset.leaderEpoch) { // the follower does not know about the epoch that leader replied with // we truncate to the end offset of the largest epoch that is smaller than the diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala index 1d3b3493cce..3b7aa0decef 100644 --- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala +++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.record.ArbitraryMemoryRecords import org.apache.kafka.common.record.InvalidMemoryRecordsProvider import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} +import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch} import org.apache.kafka.raft.internals.BatchBuilder import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs} @@ -40,10 +40,10 @@ import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ArgumentsSource - import net.jqwik.api.AfterFailureMode import net.jqwik.api.ForAll import net.jqwik.api.Property +import org.apache.kafka.server.common.OffsetAndEpoch import java.io.File import java.nio.ByteBuffer diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 1e297fc33c9..3abea688468 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -86,7 +86,7 @@ class OffsetsForLeaderEpochTest { //Then assertEquals( - Seq(newOffsetForLeaderTopicResult(tp, Errors.NONE, offsetAndEpoch.leaderEpoch, offsetAndEpoch.offset)), + Seq(newOffsetForLeaderTopicResult(tp, Errors.NONE, offsetAndEpoch.epoch(), offsetAndEpoch.offset)), response) } diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala index 5b3e9abe111..4320e469016 100644 --- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala +++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala @@ -43,8 +43,8 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue} import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.MetadataRecordSerde -import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, OffsetAndEpoch, VoterSetTest} -import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion} +import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, VoterSetTest} +import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, OffsetAndEpoch} import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata, RemotePartitionDeleteState} diff --git a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java index 5c496877747..9fad1d0d88b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java @@ -24,8 +24,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.Snapshots; import org.apache.kafka.timeline.SnapshotRegistry; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index b03501eb7bb..ded837ec6a3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -111,12 +111,12 @@ import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.BatchReader; import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.server.authorizer.AclCreateResult; import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.server.policy.AlterConfigPolicy; diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index fceca8f564e..fc14eb1c5b1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -20,7 +20,7 @@ package org.apache.kafka.image; import org.apache.kafka.image.node.MetadataImageNode; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.util.Objects; diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java index 9e0414d8c82..e9cef3e70e6 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataProvenance.java @@ -17,7 +17,7 @@ package org.apache.kafka.image; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.Snapshots; import java.util.Objects; diff --git a/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java index 57f0df436c1..aebdbca1730 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/OffsetControlManagerTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.metadata.EndTransactionRecord; import org.apache.kafka.common.metadata.NoOpRecord; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.timeline.TrackingSnapshotRegistry; import org.junit.jupiter.api.Test; diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index 39444fb7389..4a2e8232425 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -102,12 +102,12 @@ import org.apache.kafka.metadata.util.BatchFileWriter; import org.apache.kafka.metalog.LocalLogManager; import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.TopicIdPartition; import org.apache.kafka.snapshot.FileRawSnapshotReader; import org.apache.kafka.snapshot.Snapshots; diff --git a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java index 43c999c04ab..a524214bf04 100644 --- a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java +++ b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java @@ -17,8 +17,8 @@ package org.apache.kafka.image; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.SnapshotWriter; import java.util.ArrayList; diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index 8c4526de562..beaeefdc38c 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -36,9 +36,9 @@ import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.BatchReader; import org.apache.kafka.raft.ControlRecord; import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.test.TestUtils; diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java index 9647c4b031f..cf123078eae 100644 --- a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.image.FakeSnapshotWriter; import org.apache.kafka.image.MetadataImageTest; import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.SnapshotWriter; import org.junit.jupiter.api.Test; diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index 0d5eb8dba07..37116f209e6 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -27,13 +27,13 @@ import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.LeaderAndEpoch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClient; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.raft.internals.MemoryBatchReader; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.MockRawSnapshotReader; import org.apache.kafka.snapshot.MockRawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotReader; diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index b88b8cb11a2..16a8e5a7d2e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -86,6 +86,7 @@ import org.apache.kafka.raft.internals.RequestSendResult; import org.apache.kafka.raft.internals.ThresholdPurgatory; import org.apache.kafka.raft.internals.UpdateVoterHandler; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotReader; diff --git a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java deleted file mode 100644 index cba6108fd5d..00000000000 --- a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -public record OffsetAndEpoch(long offset, int epoch) implements Comparable { - - @Override - public int compareTo(OffsetAndEpoch o) { - if (epoch == o.epoch) - return Long.compare(offset, o.offset); - return Integer.compare(epoch, o.epoch); - } -} diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index d0450a9b047..d4b81bd3ce7 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KafkaRaftMetrics; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.slf4j.Logger; diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java index 90131ce4575..273a2ec56c5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java @@ -19,6 +19,7 @@ package org.apache.kafka.raft; import org.apache.kafka.raft.errors.BufferAllocationException; import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 80ad5134370..caa087378c5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.net.InetSocketAddress; import java.util.Collection; diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index 8f5ba31a45d..0bbd351ff4d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -19,6 +19,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.record.Records; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; diff --git a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java index 1de7bffb9ab..96baf3b9636 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java +++ b/raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.server.common.OffsetAndEpoch; + import java.util.Objects; public final class ValidOffsetAndEpoch { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java index 124c1a307fc..87c5b217d8e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java @@ -26,9 +26,9 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.raft.LogOffsetMetadata; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; import org.apache.kafka.raft.ReplicaKey; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.util.List; import java.util.OptionalLong; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index 7d2955dbecf..03b41a3446c 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -19,7 +19,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.UnalignedRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.io.IOException; import java.io.UncheckedIOException; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java index 1f4af694981..6dc9e1c8c79 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java @@ -19,7 +19,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.io.IOException; import java.io.UncheckedIOException; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriter.java index 76b27f45c82..0eaa848278d 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriter.java @@ -18,7 +18,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.util.function.Consumer; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java index e303a5889f1..5c6d53adc22 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotReader.java @@ -18,7 +18,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.UnalignedRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; /** * Interface for reading snapshots as a sequence of records. diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java index 07d8271e953..c11972c15be 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java @@ -19,7 +19,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; /** * Interface for writing snapshot as a sequence of records. diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index fc815621d83..00a2d925b5b 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -21,8 +21,8 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.internals.RecordsIterator; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import java.util.NoSuchElementException; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java index 2162fc673c0..09f88306638 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java @@ -27,11 +27,11 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import java.util.List; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java index 58ca000595b..f013fc79712 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.snapshot; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.nio.file.Path; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java index ffc0cf41b06..3a77d01f035 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java @@ -17,7 +17,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.util.Iterator; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java index 117bcdda336..2ccb1d758f4 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java @@ -17,7 +17,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.message.SnapshotFooterRecord; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.util.List; diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java index 716c6e00f16..6947b7d5d0a 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java @@ -19,8 +19,8 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.KafkaRaftClient; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.internals.IdentitySerde; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java index f68b4b6f061..e56b9c94b49 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.VoteResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index a0d45a349ee..bc4be592a20 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.requests.FetchSnapshotRequest; import org.apache.kafka.raft.internals.StringSerde; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; import org.apache.kafka.snapshot.RecordsSnapshotWriter; diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java index 209accdf7a2..6f0968202b7 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.MockRawSnapshotReader; import org.apache.kafka.snapshot.MockRawSnapshotWriter; import org.apache.kafka.snapshot.RawSnapshotReader; diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index 3abfe5a1b12..21d253a375c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 8376000f4ad..76d942d2bd9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KafkaRaftMetrics; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index c104af0ce76..9f0a5084326 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -67,6 +67,7 @@ import org.apache.kafka.raft.internals.BatchBuilder; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotReader; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index e440be1dcb9..89faf338721 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.OffsetAndEpoch; import com.fasterxml.jackson.databind.JsonNode; diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java index ac4da6d1486..d877a7ec308 100644 --- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java +++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java @@ -19,6 +19,7 @@ package org.apache.kafka.raft; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriter; diff --git a/raft/src/test/java/org/apache/kafka/raft/ValidOffsetAndEpochTest.java b/raft/src/test/java/org/apache/kafka/raft/ValidOffsetAndEpochTest.java index 4c5463fef0c..05b044ad8c5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/ValidOffsetAndEpochTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/ValidOffsetAndEpochTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.server.common.OffsetAndEpoch; + import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java index bab2f6af0e1..2f7f9f2f508 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java @@ -26,10 +26,10 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.ExternalKRaftMetrics; import org.apache.kafka.raft.MockLog; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.RecordsSnapshotWriter; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index f531e6f6fe7..c6a53742f7c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -24,13 +24,13 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.MockQuorumStateStore; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; import org.apache.kafka.raft.ReplicaKey; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 8fd8925ae54..5b71b2ce263 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -40,10 +40,10 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.ControlRecord; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import org.apache.kafka.snapshot.MockRawSnapshotWriter; import org.apache.kafka.snapshot.RecordsSnapshotWriter; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java index edd812e8dd6..990e7e9720a 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.record.UnalignedFileRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java index b1c84c3d459..fb5df264ca7 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotReader.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.record.UnalignedRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.nio.ByteBuffer; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java index edc33ee4e29..3b99178a1fd 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java @@ -19,7 +19,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import java.nio.ByteBuffer; import java.util.function.Consumer; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java index 50c0fa0408b..1cb4a092881 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.UnalignedMemoryRecords; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.junit.jupiter.api.Test; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java index 3934b9f96f2..69e0cabd1ea 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java @@ -24,11 +24,11 @@ import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.raft.Batch; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.VoterSet; import org.apache.kafka.raft.VoterSetTest; import org.apache.kafka.raft.internals.StringSerde; import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.serialization.RecordSerde; import org.junit.jupiter.api.Test; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index f8350222d5e..96d7aa286ea 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -25,9 +25,9 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier; import org.apache.kafka.raft.Batch; import org.apache.kafka.raft.ControlRecord; -import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.RaftClientTestContext; import org.apache.kafka.raft.internals.StringSerde; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.junit.jupiter.api.Test; diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java index 5c96b044b42..36c537ddb5f 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.snapshot; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java index a5953ae70bc..a5f8c007a6f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/OffsetAndEpoch.java @@ -16,40 +16,11 @@ */ package org.apache.kafka.server.common; -public class OffsetAndEpoch { - private final long offset; - private final int leaderEpoch; - - public OffsetAndEpoch(long offset, int leaderEpoch) { - this.offset = offset; - this.leaderEpoch = leaderEpoch; - } - - public long offset() { - return offset; - } - - public int leaderEpoch() { - return leaderEpoch; - } - +public record OffsetAndEpoch(long offset, int epoch) implements Comparable { @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OffsetAndEpoch that = (OffsetAndEpoch) o; - return offset == that.offset && leaderEpoch == that.leaderEpoch; + public int compareTo(OffsetAndEpoch o) { + if (epoch == o.epoch) + return Long.compare(offset, o.offset); + return Integer.compare(epoch, o.epoch); } - - @Override - public int hashCode() { - int result = leaderEpoch; - result = 31 * result + Long.hashCode(offset); - return result; - } - - @Override - public String toString() { - return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")"; - } -} +} \ No newline at end of file