KAFKA-18144 Move the storage exceptions out of the core module (#18021)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-12-07 15:43:00 +01:00 committed by GitHub
parent 42f74a1c3a
commit 4630628701
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 54 additions and 49 deletions

View File

@ -55,6 +55,7 @@ import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics; import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException;
import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;

View File

@ -19,7 +19,6 @@ package kafka.cluster
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import java.util.Optional import java.util.Optional
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList} import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.{KafkaController, StateChangeLogger} import kafka.controller.{KafkaController, StateChangeLogger}
import kafka.log._ import kafka.log._
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
@ -47,7 +46,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, Lead
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.slf4j.event.Level import org.slf4j.event.Level

View File

@ -1,29 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Indicates the follower or the future replica received records from the leader (or current
* replica) with first offset less than expected next offset.
* @param firstOffset The first offset of the records to append
* @param lastOffset The last offset of the records to append
*/
class UnexpectedAppendOffsetException(val message: String,
val firstOffset: Long,
val lastOffset: Long) extends RuntimeException(message) {
}

View File

@ -17,7 +17,6 @@
package kafka.log package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
@ -35,12 +34,12 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult import org.apache.kafka.storage.internals.log.LocalLog.SplitSegmentResult
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, BatchMetadata, CompletedTxn, FetchDataInfo, LastRecord, LeaderHwChange, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, LogValidator, OffsetsOutOfOrderException, ProducerAppendInfo, ProducerStateManager, ProducerStateManagerConfig, RollParams, SegmentDeletionReason, VerificationGuard, UnifiedLog => JUnifiedLog}
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import java.io.{File, IOException} import java.io.{File, IOException}

View File

@ -18,7 +18,6 @@ package kafka.cluster
import java.net.InetAddress import java.net.InetAddress
import com.yammer.metrics.core.Metric import com.yammer.metrics.core.Metric
import kafka.common.UnexpectedAppendOffsetException
import kafka.log._ import kafka.log._
import kafka.server._ import kafka.server._
import kafka.utils._ import kafka.utils._
@ -60,7 +59,7 @@ import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache

View File

@ -17,7 +17,6 @@
package kafka.log package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.server.{DelayedRemoteListOffsets, KafkaConfig} import kafka.server.{DelayedRemoteListOffsets, KafkaConfig}
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -38,11 +37,11 @@ import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMe
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler import org.apache.kafka.storage.internals.utils.Throttler
import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats} import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics, BrokerTopicStats}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.storage.log;
public class UnexpectedAppendOffsetException extends RuntimeException {
public final long firstOffset;
public final long lastOffset;
/**
* Indicates the follower or the future replica received records from the leader (or current
* replica) with first offset less than expected next offset.
* @param firstOffset The first offset of the records to append
* @param lastOffset The last offset of the records to append
*/
public UnexpectedAppendOffsetException(String message, long firstOffset, long lastOffset) {
super(message);
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
class CustomMetadataSizeLimitExceededException extends Exception { public class CustomMetadataSizeLimitExceededException extends Exception {
} }

View File

@ -1,11 +1,11 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.kafka.storage.internals.log;
package kafka.common
/** /**
* Indicates the follower received records with non-monotonically increasing offsets * Indicates the follower received records with non-monotonically increasing offsets
*/ */
class OffsetsOutOfOrderException(message: String) extends RuntimeException(message) { public class OffsetsOutOfOrderException extends RuntimeException {
}
public OffsetsOutOfOrderException(String message) {
super(message);
}
}