KAFKA-14523: Move kafka.log.remote classes to storage (#19474)

Pretty much a straight forward move of these classes. I just updated
`RemoteLogManagerTest` to not use `KafkaConfig`

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2025-04-17 11:05:14 +02:00 committed by GitHub
parent bb7d8eb2c8
commit c73d97de0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 68 additions and 106 deletions

View File

@ -2242,6 +2242,7 @@ project(':storage') {
testImplementation project(':clients').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core') testImplementation project(':core')
testImplementation project(':core').sourceSets.test.output testImplementation project(':core').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-util') testImplementation project(':test-common:test-common-util')

View File

@ -66,20 +66,6 @@
<allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" /> <allow class="com.fasterxml.jackson.annotation.JsonIgnoreProperties" />
</subpackage> </subpackage>
<subpackage name="log.remote">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log.remote" />
<allow pkg="org.apache.kafka.server.log.remote.quota" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="org.apache.kafka.storage.log.metrics" />
<allow pkg="kafka.log" />
<allow pkg="kafka.cluster" />
<allow pkg="kafka.server" />
<allow pkg="org.mockito" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="server"> <subpackage name="server">
<allow pkg="kafka" /> <allow pkg="kafka" />
<allow pkg="org.apache.kafka" /> <allow pkg="org.apache.kafka" />

View File

@ -29,6 +29,7 @@
<allow pkg="org.junit" /> <allow pkg="org.junit" />
<allow pkg="org.hamcrest" /> <allow pkg="org.hamcrest" />
<allow pkg="org.mockito" /> <allow pkg="org.mockito" />
<allow pkg="org.opentest4j" />
<allow pkg="java.security" /> <allow pkg="java.security" />
<allow pkg="javax.net.ssl" /> <allow pkg="javax.net.ssl" />
<allow pkg="javax.security" /> <allow pkg="javax.security" />
@ -74,8 +75,12 @@
</subpackage> </subpackage>
<subpackage name="storage"> <subpackage name="storage">
<allow pkg="com.yammer.metrics.core" /> <allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.common.test" /> <allow pkg="org.apache.kafka.common.test" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.purgatory" />
<allow pkg="org.apache.kafka.server.quota" />
<allow pkg="org.apache.kafka.server.storage.log" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage> </subpackage>
</subpackage> </subpackage>
</subpackage> </subpackage>

View File

@ -37,15 +37,11 @@
<!-- core --> <!-- core -->
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition|SharePartitionManager).java"/> <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition|SharePartitionManager).java"/>
<suppress checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling|JavaNCSS" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength" files="RemoteLogManager.java"/>
<suppress checks="MethodLength" files="RemoteLogManagerConfig.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/> files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/> <suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionManagerTest|SharePartitionTest).java"/> files="(SharePartitionManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/> <suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/> <suppress checks="CyclomaticComplexity" files="SharePartition.java"/>
@ -364,7 +360,9 @@
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig|UnifiedLog).java"/> files="(LogAppendInfo|LogLoader|RemoteLogManagerConfig|UnifiedLog).java"/>
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)" <suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"
files="(UnifiedLog).java"/> files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength" files="(RemoteLogManager|RemoteLogManagerConfig).java"/>
<suppress checks="JavaNCSS" files="RemoteLogManagerTest.java"/>
<!-- benchmarks --> <!-- benchmarks -->
<suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)" <suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)"

View File

@ -18,7 +18,6 @@
package kafka.server; package kafka.server;
import kafka.cluster.Partition; import kafka.cluster.Partition;
import kafka.log.remote.RemoteLogManager;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -29,6 +28,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile; import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

View File

@ -22,7 +22,6 @@ import java.util.Optional
import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList} import java.util.concurrent.{CompletableFuture, CopyOnWriteArrayList}
import kafka.controller.StateChangeLogger import kafka.controller.StateChangeLogger
import kafka.log._ import kafka.log._
import kafka.log.remote.RemoteLogManager
import kafka.server._ import kafka.server._
import kafka.server.share.DelayedShareFetch import kafka.server.share.DelayedShareFetch
import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache} import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}

View File

@ -20,7 +20,6 @@ package kafka.server
import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter} import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWriter}
import kafka.coordinator.transaction.TransactionCoordinator import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager import kafka.raft.KafkaRaftManager
import kafka.server.metadata._ import kafka.server.metadata._
@ -48,7 +47,7 @@ import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition} import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics} import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager} import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}

View File

@ -18,7 +18,6 @@
package kafka.server package kafka.server
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.SocketServer import kafka.network.SocketServer
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.ClusterResource import org.apache.kafka.common.ClusterResource
@ -32,6 +31,7 @@ import org.apache.kafka.metadata.{BrokerState, MetadataCache}
import org.apache.kafka.security.CredentialProvider import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.NodeToControllerChannelManager import org.apache.kafka.server.common.NodeToControllerChannelManager
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector} import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogDirFailureChannel import org.apache.kafka.storage.internals.log.LogDirFailureChannel

View File

@ -20,7 +20,6 @@ import com.yammer.metrics.core.Meter
import kafka.cluster.{Partition, PartitionListener} import kafka.cluster.{Partition, PartitionListener}
import kafka.controller.StateChangeLogger import kafka.controller.StateChangeLogger
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.server.HostedPartition.Online import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
@ -54,6 +53,7 @@ import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition} import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey} import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}

View File

@ -17,8 +17,6 @@
package kafka.log package kafka.log
import kafka.log.remote.RemoteLogManager
import java.io.File import java.io.File
import java.util.Properties import java.util.Properties
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -34,6 +32,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.storage.log.FetchIsolation import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG} import org.apache.kafka.storage.internals.log.LogConfig.{DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG, DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG}

View File

@ -17,7 +17,6 @@
package kafka.log package kafka.log
import kafka.log.remote.RemoteLogManager
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.compress.Compression
@ -36,7 +35,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.KRaftConfigs import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig} import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, DelayedRemoteListOffsets} import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, DelayedRemoteListOffsets}
import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException} import org.apache.kafka.server.storage.log.{FetchIsolation, UnexpectedAppendOffsetException}

View File

@ -22,7 +22,6 @@ import java.util.{Optional, Properties, Map => JMap}
import java.util.concurrent.{CompletionStage, TimeUnit} import java.util.concurrent.{CompletionStage, TimeUnit}
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.{Endpoint, Reconfigurable} import org.apache.kafka.common.{Endpoint, Reconfigurable}
@ -37,7 +36,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.DynamicThreadPool import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}

View File

@ -18,7 +18,6 @@ package kafka.server
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.log.remote.RemoteLogManager
import kafka.utils.TestUtils.random import kafka.utils.TestUtils.random
import kafka.utils._ import kafka.utils._
import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.CommonClientConfigs
@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupConfig
import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs} import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog} import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog}
import org.apache.kafka.test.TestUtils.assertFutureThrows import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._

View File

@ -21,7 +21,6 @@ import com.yammer.metrics.core.{Gauge, Meter, Timer}
import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.PartitionTest.MockPartitionListener
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -50,18 +50,7 @@ import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemot
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics; import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets; import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets;
@ -194,7 +183,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
// topic ids that are received on leadership changes, this map is cleared on stop partitions // topic ids that are received on leadership changes, this map is cleared on stop partitions
private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap<>(); private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap<>();
private final String clusterId; private final String clusterId;
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); // For compatibility, metrics are defined to be under the `kafka.log.remote.RemoteLogManager` class
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteLogManager");
// The endpoint for remote log metadata manager to connect to // The endpoint for remote log metadata manager to connect to
private Optional<Endpoint> endpoint = Optional.empty(); private Optional<Endpoint> endpoint = Optional.empty();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.FileRecords;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;

View File

@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -24,6 +22,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
@ -42,19 +41,7 @@ import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.log.remote.TopicPartitionLog; import org.apache.kafka.server.log.remote.TopicPartitionLog;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.metrics.KafkaYammerMetrics;
@ -131,12 +118,12 @@ import java.util.function.BiConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
import static org.apache.kafka.common.record.TimestampType.CREATE_TIME; import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS;
@ -204,7 +191,7 @@ public class RemoteLogManagerTest {
private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class);
private KafkaConfig config; private RemoteLogManagerConfig config;
private BrokerTopicStats brokerTopicStats = null; private BrokerTopicStats brokerTopicStats = null;
private final Metrics metrics = new Metrics(time); private final Metrics metrics = new Metrics(time);
@ -236,10 +223,10 @@ public class RemoteLogManagerTest {
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100");
appendRLMConfig(props); appendRLMConfig(props);
config = KafkaConfig.fromProps(props); config = configs(props);
brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); brokerTopicStats = new BrokerTopicStats(config.isRemoteStorageSystemEnabled());
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset), (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -263,6 +250,10 @@ public class RemoteLogManagerTest {
doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class)); doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class));
} }
private RemoteLogManagerConfig configs(Properties props) {
return new RemoteLogManagerConfig(new AbstractConfig(RemoteLogManagerConfig.configDef(), props));
}
@AfterEach @AfterEach
void tearDown() { void tearDown() {
if (remoteLogManager != null) { if (remoteLogManager != null) {
@ -353,9 +344,9 @@ public class RemoteLogManagerTest {
props.put(configPrefix + key, "world"); props.put(configPrefix + key, "world");
props.put("remote.log.metadata.y", "z"); props.put("remote.log.metadata.y", "z");
appendRLMConfig(props); appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props); RemoteLogManagerConfig config = configs(props);
Map<String, Object> metadataMangerConfig = config.remoteLogManagerConfig().remoteLogMetadataManagerProps(); Map<String, Object> metadataMangerConfig = config.remoteLogMetadataManagerProps();
assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key)); assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key));
assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
} }
@ -370,9 +361,9 @@ public class RemoteLogManagerTest {
props.put(configPrefix + key, "world"); props.put(configPrefix + key, "world");
props.put("remote.storage.manager.y", "z"); props.put("remote.storage.manager.y", "z");
appendRLMConfig(props); appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props); RemoteLogManagerConfig config = configs(props);
Map<String, Object> remoteStorageManagerConfig = config.remoteLogManagerConfig().remoteStorageManagerProps(); Map<String, Object> remoteStorageManagerConfig = config.remoteStorageManagerProps();
assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key)); assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key));
assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y")); assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
} }
@ -401,9 +392,9 @@ public class RemoteLogManagerTest {
// override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix"
props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
appendRLMConfig(props); appendRLMConfig(props);
KafkaConfig config = KafkaConfig.fromProps(props); RemoteLogManagerConfig config = configs(props);
try (RemoteLogManager remoteLogManager = new RemoteLogManager( try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config.remoteLogManagerConfig(), config,
brokerId, brokerId,
logDir, logDir,
clusterId, clusterId,
@ -1354,7 +1345,7 @@ public class RemoteLogManagerTest {
void testGetClassLoaderAwareRemoteStorageManager() throws Exception { void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
try (RemoteLogManager remoteLogManager = try (RemoteLogManager remoteLogManager =
new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, new RemoteLogManager(config, brokerId, logDir, clusterId, time,
t -> Optional.empty(), t -> Optional.empty(),
(topicPartition, offset) -> { }, (topicPartition, offset) -> { },
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -1745,7 +1736,7 @@ public class RemoteLogManagerTest {
}); });
when(mockLog.logEndOffset()).thenReturn(300L); when(mockLog.logEndOffset()).thenReturn(300L);
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
partition -> Optional.of(mockLog), partition -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset), (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -1803,7 +1794,7 @@ public class RemoteLogManagerTest {
@Test @Test
public void testRemoveMetricsOnClose() throws IOException { public void testRemoveMetricsOnClose() throws IOException {
try (MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) { try (MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) {
RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId,
time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { time, tp -> Optional.of(mockLog), (topicPartition, offset) -> {
}, brokerTopicStats, metrics) { }, brokerTopicStats, metrics) {
public RemoteStorageManager createRemoteStorageManager() { public RemoteStorageManager createRemoteStorageManager() {
@ -2200,7 +2191,7 @@ public class RemoteLogManagerTest {
else else
return Collections.emptyIterator(); return Collections.emptyIterator();
}); });
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> { }, (topicPartition, offset) -> { },
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -2225,7 +2216,7 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt()))
.thenReturn(Collections.emptyIterator()); .thenReturn(Collections.emptyIterator());
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> { }, (topicPartition, offset) -> { },
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -2259,7 +2250,7 @@ public class RemoteLogManagerTest {
}); });
AtomicLong logStartOffset = new AtomicLong(0); AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset), (topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -2313,7 +2304,7 @@ public class RemoteLogManagerTest {
} }
}; };
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset), (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -2967,7 +2958,7 @@ public class RemoteLogManagerTest {
@Test @Test
public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException {
AtomicLong logStartOffset = new AtomicLong(0); AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> logStartOffset.set(offset), (topicPartition, offset) -> logStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -3128,7 +3119,7 @@ public class RemoteLogManagerTest {
); );
try (RemoteLogManager remoteLogManager = new RemoteLogManager( try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config.remoteLogManagerConfig(), config,
brokerId, brokerId,
logDir, logDir,
clusterId, clusterId,
@ -3205,7 +3196,7 @@ public class RemoteLogManagerTest {
); );
try (RemoteLogManager remoteLogManager = new RemoteLogManager( try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config.remoteLogManagerConfig(), config,
brokerId, brokerId,
logDir, logDir,
clusterId, clusterId,
@ -3290,7 +3281,7 @@ public class RemoteLogManagerTest {
try (RemoteLogManager remoteLogManager = new RemoteLogManager( try (RemoteLogManager remoteLogManager = new RemoteLogManager(
config.remoteLogManagerConfig(), config,
brokerId, brokerId,
logDir, logDir,
clusterId, clusterId,
@ -3337,8 +3328,8 @@ public class RemoteLogManagerTest {
Properties defaultProps = new Properties(); Properties defaultProps = new Properties();
defaultProps.putAll(brokerConfig); defaultProps.putAll(brokerConfig);
appendRLMConfig(defaultProps); appendRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); RemoteLogManagerConfig defaultRlmConfig = configs(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
@ -3349,9 +3340,9 @@ public class RemoteLogManagerTest {
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
appendRLMConfig(customProps); appendRLMConfig(customProps);
KafkaConfig config = KafkaConfig.fromProps(customProps); RemoteLogManagerConfig config = configs(customProps);
RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig()); RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config);
assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
@ -3362,9 +3353,9 @@ public class RemoteLogManagerTest {
Properties defaultProps = new Properties(); Properties defaultProps = new Properties();
defaultProps.putAll(brokerConfig); defaultProps.putAll(brokerConfig);
appendRLMConfig(defaultProps); appendRLMConfig(defaultProps);
KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); RemoteLogManagerConfig defaultRlmConfig = configs(defaultProps);
RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples());
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds());
@ -3375,8 +3366,8 @@ public class RemoteLogManagerTest {
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31);
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1);
appendRLMConfig(customProps); appendRLMConfig(customProps);
KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); RemoteLogManagerConfig rlmConfig = configs(customProps);
RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig()); RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
@ -3646,7 +3637,7 @@ public class RemoteLogManagerTest {
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt()))
.thenReturn(fileInputStream); .thenReturn(fileInputStream);
RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset), (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {
@ -3687,9 +3678,9 @@ public class RemoteLogManagerTest {
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "30000"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "30000");
appendRLMConfig(props); appendRLMConfig(props);
config = KafkaConfig.fromProps(props); config = configs(props);
remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time,
tp -> Optional.of(mockLog), tp -> Optional.of(mockLog),
(topicPartition, offset) -> currentLogStartOffset.set(offset), (topicPartition, offset) -> currentLogStartOffset.set(offset),
brokerTopicStats, metrics) { brokerTopicStats, metrics) {

View File

@ -14,16 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.util.MockTime; import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@ -31,6 +27,7 @@ import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;

View File

@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.log.remote; package org.apache.kafka.server.log.remote.storage;
import kafka.utils.TestUtils; import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.tiered.storage; package org.apache.kafka.tiered.storage;
import kafka.api.IntegrationTestHarness; import kafka.api.IntegrationTestHarness;
import kafka.log.remote.RemoteLogManager;
import kafka.server.KafkaBroker; import kafka.server.KafkaBroker;
import org.apache.kafka.common.replica.ReplicaSelector; import org.apache.kafka.common.replica.ReplicaSelector;
@ -26,6 +25,7 @@ import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;