diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index fbe26e8a673..6e95e36020b 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -55,6 +55,7 @@ import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException; import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index a76104c79ce..5de2ebb4667 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -19,7 +19,6 @@ package kafka.cluster import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.Optional import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList} -import kafka.common.UnexpectedAppendOffsetException import kafka.controller.{KafkaController, StateChangeLogger} import kafka.log._ import kafka.log.remote.RemoteLogManager @@ -47,7 +46,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, Lead import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey -import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} +import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.slf4j.event.Level diff --git a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala b/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala deleted file mode 100644 index e719a93006d..00000000000 --- a/core/src/main/scala/kafka/common/UnexpectedAppendOffsetException.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Indicates the follower or the future replica received records from the leader (or current - * replica) with first offset less than expected next offset. - * @param firstOffset The first offset of the records to append - * @param lastOffset The last offset of the records to append - */ -class UnexpectedAppendOffsetException(val message: String, - val firstOffset: Long, - val lastOffset: Long) extends RuntimeException(message) { -} diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index db3fe936312..925c9d057c4 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -17,7 +17,6 @@ package kafka.log -import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager import kafka.utils._ import org.apache.kafka.common.errors._ @@ -35,12 +34,12 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.record.BrokerCompressionType -import org.apache.kafka.server.storage.log.FetchIsolation +import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException} import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import java.io.{File, IOException} diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 0cb298eda1c..2ad21666dd0 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -18,7 +18,6 @@ package kafka.cluster import java.net.InetAddress import com.yammer.metrics.core.Metric -import kafka.common.UnexpectedAppendOffsetException import kafka.log._ import kafka.server._ import kafka.utils._ @@ -60,7 +59,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey -import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} +import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException} import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 819f4a2533a..4f188e25b1c 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -17,7 +17,6 @@ package kafka.log -import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager import kafka.server.{DelayedRemoteListOffsets, KafkaConfig} import kafka.utils.TestUtils @@ -38,11 +37,11 @@ import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.purgatory.DelayedOperationPurgatory -import org.apache.kafka.server.storage.log.FetchIsolation +import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException} import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.junit.jupiter.api.Assertions._ diff --git a/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java new file mode 100644 index 00000000000..652b6745ca0 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/storage/log/UnexpectedAppendOffsetException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.storage.log; + +public class UnexpectedAppendOffsetException extends RuntimeException { + + public final long firstOffset; + public final long lastOffset; + + /** + * Indicates the follower or the future replica received records from the leader (or current + * replica) with first offset less than expected next offset. + * @param firstOffset The first offset of the records to append + * @param lastOffset The last offset of the records to append + */ + public UnexpectedAppendOffsetException(String message, long firstOffset, long lastOffset) { + super(message); + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } +} diff --git a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java similarity index 86% rename from core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java rename to storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java index c893f3488de..98462d93b09 100644 --- a/core/src/main/java/kafka/log/remote/CustomMetadataSizeLimitExceededException.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/CustomMetadataSizeLimitExceededException.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; +package org.apache.kafka.server.log.remote.storage; -class CustomMetadataSizeLimitExceededException extends Exception { +public class CustomMetadataSizeLimitExceededException extends Exception { } diff --git a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java similarity index 69% rename from core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala rename to storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java index f8daaa4a181..39f8d494979 100644 --- a/core/src/main/scala/kafka/common/OffsetsOutOfOrderException.scala +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetsOutOfOrderException.java @@ -1,11 +1,11 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with + * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * the License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -14,12 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package kafka.common +package org.apache.kafka.storage.internals.log; /** * Indicates the follower received records with non-monotonically increasing offsets */ -class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) { -} +public class OffsetsOutOfOrderException extends RuntimeException { + public OffsetsOutOfOrderException(String message) { + super(message); + } +}