KAFKA-19146 Merge OffsetAndEpoch from raft to server-common (#19475)
CI / build (push) Waiting to run Details

1. remove org.apache.kafka.raft.OffsetAndEpoch
2. rewrite org.apache.kafka.server.common.OffsetAndEpoch by record
keyword
3. rename OffsetAndEpoch#leaderEpoch to OffsetAndEpoch#epoch

Reviewers: PoAn Yang <payang@apache.org>, Xuan-Zhang Gong
 <gongxuanzhangmelt@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
YuChia Ma 2025-05-02 03:12:52 +08:00 committed by GitHub
parent 2022b4c480
commit 979f49f967
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
56 changed files with 67 additions and 105 deletions

View File

@ -96,7 +96,7 @@ public class TierStateMachine {
PartitionFetchState currentFetchState,
PartitionData fetchPartitionData) throws Exception {
OffsetAndEpoch epochAndLeaderLocalStartOffset = leader.fetchEarliestLocalOffset(topicPartition, currentFetchState.currentLeaderEpoch());
int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
int epoch = epochAndLeaderLocalStartOffset.epoch();
long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
long offsetToFetch;

View File

@ -1730,7 +1730,7 @@ class Partition(val topicPartition: TopicPartition,
case Some(epochAndOffset) => new EpochEndOffset()
.setPartition(partitionId)
.setErrorCode(Errors.NONE.code)
.setLeaderEpoch(epochAndOffset.leaderEpoch)
.setLeaderEpoch(epochAndOffset.epoch())
.setEndOffset(epochAndOffset.offset)
case None => new EpochEndOffset()
.setPartition(partitionId)

View File

@ -28,7 +28,8 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
@ -139,14 +140,14 @@ final class KafkaMetadataLog private (
(log.endOffsetForEpoch(epoch).toScala, earliestSnapshotId().toScala) match {
case (Some(offsetAndEpoch), Some(snapshotId)) if (
offsetAndEpoch.offset == snapshotId.offset &&
offsetAndEpoch.leaderEpoch == epoch) =>
offsetAndEpoch.epoch() == epoch) =>
// The epoch is smaller than the smallest epoch on the log. Override the diverging
// epoch to the oldest snapshot which should be the snapshot at the log start offset
new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch)
case (Some(offsetAndEpoch), _) =>
new OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch)
new OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.epoch())
case (None, _) =>
new OffsetAndEpoch(endOffset.offset, lastFetchedEpoch)

View File

@ -619,7 +619,7 @@ abstract class AbstractFetcherThread(name: String,
if (endOffsetForEpochOpt.isPresent) {
val offsetAndEpoch = endOffsetForEpochOpt.get
val followerEndOffset = offsetAndEpoch.offset
val followerEpoch = offsetAndEpoch.leaderEpoch
val followerEpoch = offsetAndEpoch.epoch()
if (followerEpoch != leaderEpochOffset.leaderEpoch) {
// the follower does not know about the epoch that leader replied with
// we truncate to the end offset of the largest epoch that is smaller than the

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.record.ArbitraryMemoryRecords
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo, LogOffsetMetadata, MetadataLogConfig, QuorumConfig, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
@ -40,10 +40,10 @@ import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ArgumentsSource
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
import org.apache.kafka.server.common.OffsetAndEpoch
import java.io.File
import java.nio.ByteBuffer

View File

@ -86,7 +86,7 @@ class OffsetsForLeaderEpochTest {
//Then
assertEquals(
Seq(newOffsetForLeaderTopicResult(tp, Errors.NONE, offsetAndEpoch.leaderEpoch, offsetAndEpoch.offset)),
Seq(newOffsetForLeaderTopicResult(tp, Errors.NONE, offsetAndEpoch.epoch(), offsetAndEpoch.offset)),
response)
}

View File

@ -43,8 +43,8 @@ import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata, RemotePartitionDeleteState}

View File

@ -24,8 +24,8 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.timeline.SnapshotRegistry;

View File

@ -111,12 +111,12 @@ import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.server.policy.AlterConfigPolicy;

View File

@ -20,7 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.image.node.MetadataImageNode;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.Objects;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.image;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.Snapshots;
import java.util.Objects;

View File

@ -23,8 +23,8 @@ import org.apache.kafka.common.metadata.EndTransactionRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.timeline.TrackingSnapshotRegistry;
import org.junit.jupiter.api.Test;

View File

@ -102,12 +102,12 @@ import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.snapshot.FileRawSnapshotReader;
import org.apache.kafka.snapshot.Snapshots;

View File

@ -17,8 +17,8 @@
package org.apache.kafka.image;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.SnapshotWriter;
import java.util.ArrayList;

View File

@ -36,9 +36,9 @@ import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestUtils;

View File

@ -21,10 +21,10 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.image.FakeSnapshotWriter;
import org.apache.kafka.image.MetadataImageTest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.junit.jupiter.api.Test;

View File

@ -27,13 +27,13 @@ import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;

View File

@ -86,6 +86,7 @@ import org.apache.kafka.raft.internals.RequestSendResult;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.raft.internals.UpdateVoterHandler;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
public record OffsetAndEpoch(long offset, int epoch) implements Comparable<OffsetAndEpoch> {
@Override
public int compareTo(OffsetAndEpoch o) {
if (epoch == o.epoch)
return Long.compare(offset, o.offset);
return Integer.compare(epoch, o.epoch);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.slf4j.Logger;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.net.InetSocketAddress;
import java.util.Collection;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.raft;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.Objects;
public final class ValidOffsetAndEpoch {

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.List;
import java.util.OptionalLong;

View File

@ -19,7 +19,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.io.IOException;
import java.io.UncheckedIOException;

View File

@ -19,7 +19,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.io.IOException;
import java.io.UncheckedIOException;

View File

@ -18,7 +18,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.function.Consumer;

View File

@ -18,7 +18,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
/**
* Interface for reading snapshots as a sequence of records.

View File

@ -19,7 +19,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
/**
* Interface for writing snapshot as a sequence of records.

View File

@ -21,8 +21,8 @@ import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import java.util.NoSuchElementException;

View File

@ -27,11 +27,11 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import java.util.List;

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.snapshot;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.nio.file.Path;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.snapshot;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.Iterator;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.snapshot;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.util.List;

View File

@ -19,8 +19,8 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.IdentitySerde;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;

View File

@ -26,6 +26,7 @@ import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

View File

@ -67,6 +67,7 @@ import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;

View File

@ -51,6 +51,7 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.OffsetAndEpoch;
import com.fasterxml.jackson.databind.JsonNode;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.raft;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@ -26,10 +26,10 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.ExternalKRaftMetrics;
import org.apache.kafka.raft.MockLog;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;

View File

@ -24,13 +24,13 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.MockQuorumStateStore;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;

View File

@ -40,10 +40,10 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.record.UnalignedFileRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.record.UnalignedRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.nio.ByteBuffer;

View File

@ -19,7 +19,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import java.nio.ByteBuffer;
import java.util.function.Consumer;

View File

@ -18,7 +18,7 @@ package org.apache.kafka.snapshot;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.UnalignedMemoryRecords;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.api.Test;

View File

@ -24,11 +24,11 @@ import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.VoterSetTest;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.junit.jupiter.api.Test;

View File

@ -25,9 +25,9 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.junit.jupiter.api.Test;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.snapshot;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;

View File

@ -16,40 +16,11 @@
*/
package org.apache.kafka.server.common;
public class OffsetAndEpoch {
private final long offset;
private final int leaderEpoch;
public OffsetAndEpoch(long offset, int leaderEpoch) {
this.offset = offset;
this.leaderEpoch = leaderEpoch;
}
public long offset() {
return offset;
}
public int leaderEpoch() {
return leaderEpoch;
}
public record OffsetAndEpoch(long offset, int epoch) implements Comparable<OffsetAndEpoch> {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OffsetAndEpoch that = (OffsetAndEpoch) o;
return offset == that.offset && leaderEpoch == that.leaderEpoch;
public int compareTo(OffsetAndEpoch o) {
if (epoch == o.epoch)
return Long.compare(offset, o.offset);
return Integer.compare(epoch, o.epoch);
}
@Override
public int hashCode() {
int result = leaderEpoch;
result = 31 * result + Long.hashCode(offset);
return result;
}
@Override
public String toString() {
return "(offset=" + offset + ", leaderEpoch=" + leaderEpoch + ")";
}
}
}