From c73d97de0cc768b0293ca520f2980ba52685bb6a Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Thu, 17 Apr 2025 11:05:14 +0200 Subject: [PATCH] KAFKA-14523: Move kafka.log.remote classes to storage (#19474) Pretty much a straight forward move of these classes. I just updated `RemoteLogManagerTest` to not use `KafkaConfig` Reviewers: Chia-Ping Tsai --- build.gradle | 1 + checkstyle/import-control-core.xml | 14 --- checkstyle/import-control-storage.xml | 7 +- checkstyle/suppressions.xml | 10 +-- .../java/kafka/server/TierStateMachine.java | 2 +- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../scala/kafka/server/BrokerServer.scala | 3 +- .../main/scala/kafka/server/KafkaBroker.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 2 +- .../scala/unit/kafka/log/LogTestUtils.scala | 3 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 3 +- .../server/DynamicBrokerConfigTest.scala | 3 +- .../server/DynamicConfigChangeTest.scala | 2 +- .../kafka/server/ReplicaManagerTest.scala | 1 - .../log/remote/storage}/RemoteLogManager.java | 16 +--- .../storage}/RemoteLogOffsetReader.java | 2 +- .../log/remote/storage}/RemoteLogReader.java | 2 +- .../remote/storage}/RemoteLogManagerTest.java | 87 +++++++++---------- .../storage}/RemoteLogOffsetReaderTest.java | 7 +- .../remote/storage}/RemoteLogReaderTest.java | 3 +- .../storage/TieredStorageTestHarness.java | 2 +- 21 files changed, 68 insertions(+), 106 deletions(-) rename {core/src/main/java/kafka/log/remote => storage/src/main/java/org/apache/kafka/server/log/remote/storage}/RemoteLogManager.java (99%) rename {core/src/main/java/kafka/log/remote => storage/src/main/java/org/apache/kafka/server/log/remote/storage}/RemoteLogOffsetReader.java (98%) rename {core/src/main/java/kafka/log/remote => storage/src/main/java/org/apache/kafka/server/log/remote/storage}/RemoteLogReader.java (98%) rename {core/src/test/java/kafka/log/remote => storage/src/test/java/org/apache/kafka/server/log/remote/storage}/RemoteLogManagerTest.java (98%) rename {core/src/test/java/kafka/log/remote => storage/src/test/java/org/apache/kafka/server/log/remote/storage}/RemoteLogOffsetReaderTest.java (97%) rename {core/src/test/java/kafka/log/remote => storage/src/test/java/org/apache/kafka/server/log/remote/storage}/RemoteLogReaderTest.java (98%) diff --git a/build.gradle b/build.gradle index 2320291e32a..024c428b9fc 100644 --- a/build.gradle +++ b/build.gradle @@ -2242,6 +2242,7 @@ project(':storage') { testImplementation project(':clients').sourceSets.test.output testImplementation project(':core') testImplementation project(':core').sourceSets.test.output + testImplementation project(':storage:storage-api').sourceSets.test.output testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-runtime') testImplementation project(':test-common:test-common-util') diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index f226ec10693..ea93b8d96ad 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -66,20 +66,6 @@ - - - - - - - - - - - - - - diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index e3bee65b6eb..639cb6dc1d0 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -29,6 +29,7 @@ + @@ -74,8 +75,12 @@ - + + + + + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 429d699e7a1..4e0f338af5d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -37,15 +37,11 @@ - - - - + files="(SharePartitionManagerTest|SharePartitionTest).java"/> @@ -364,7 +360,9 @@ + files="(UnifiedLog|RemoteLogManager|RemoteLogManagerTest).java"/> + + JMap} import java.util.concurrent.{CompletionStage, TimeUnit} import java.util.concurrent.atomic.AtomicReference import kafka.log.LogManager -import kafka.log.remote.RemoteLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.utils.TestUtils import org.apache.kafka.common.{Endpoint, Reconfigurable} @@ -37,7 +36,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.DynamicThreadPool import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig +import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.util.KafkaScheduler import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig} diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 4d93a24dc67..28c8d694f9d 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.cluster.Partition import kafka.integration.KafkaServerTestHarness -import kafka.log.remote.RemoteLogManager import kafka.utils.TestUtils.random import kafka.utils._ import org.apache.kafka.clients.CommonClientConfigs @@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs} import org.apache.kafka.server.log.remote.TopicPartitionLog +import org.apache.kafka.server.log.remote.storage.RemoteLogManager import org.apache.kafka.storage.internals.log.{LogConfig, UnifiedLog} import org.apache.kafka.test.TestUtils.assertFutureThrows import org.junit.jupiter.api.Assertions._ diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index cde36a3ae7c..08cbfee24eb 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,7 +21,6 @@ import com.yammer.metrics.core.{Gauge, Meter, Timer} import kafka.cluster.PartitionTest.MockPartitionListener import kafka.cluster.Partition import kafka.log.LogManager -import kafka.log.remote.RemoteLogManager import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA} diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java similarity index 99% rename from core/src/main/java/kafka/log/remote/RemoteLogManager.java rename to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 6c8416c0983..a5080c8eb65 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; +package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaException; @@ -50,18 +50,7 @@ import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemot import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics; -import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; -import org.apache.kafka.server.log.remote.storage.CustomMetadataSizeLimitExceededException; -import org.apache.kafka.server.log.remote.storage.LogSegmentData; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; -import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.purgatory.DelayedRemoteListOffsets; @@ -194,7 +183,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { // topic ids that are received on leadership changes, this map is cleared on stop partitions private final ConcurrentMap topicIdByPartitionMap = new ConcurrentHashMap<>(); private final String clusterId; - private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass()); + // For compatibility, metrics are defined to be under the `kafka.log.remote.RemoteLogManager` class + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log.remote", "RemoteLogManager"); // The endpoint for remote log metadata manager to connect to private Optional endpoint = Optional.empty(); diff --git a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReader.java similarity index 98% rename from core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java rename to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReader.java index 37b7e971f7b..cf08b76bfbb 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogOffsetReader.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReader.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; +package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.FileRecords; diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java similarity index 98% rename from core/src/main/java/kafka/log/remote/RemoteLogReader.java rename to storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java index 369417fa846..a23ee7207ae 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; +package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java similarity index 98% rename from core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java rename to storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index edce4e5a44e..0edbd542e64 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -14,9 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; - -import kafka.server.KafkaConfig; +package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.KafkaException; @@ -24,6 +22,7 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -42,19 +41,7 @@ import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.log.remote.TopicPartitionLog; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig; -import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; -import org.apache.kafka.server.log.remote.storage.LogSegmentData; -import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; -import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; -import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.metrics.KafkaYammerMetrics; @@ -131,12 +118,12 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; -import static kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs; import static org.apache.kafka.common.record.TimestampType.CREATE_TIME; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS; @@ -204,7 +191,7 @@ public class RemoteLogManagerTest { private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class); private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class); private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class); - private KafkaConfig config; + private RemoteLogManagerConfig config; private BrokerTopicStats brokerTopicStats = null; private final Metrics metrics = new Metrics(time); @@ -236,10 +223,10 @@ public class RemoteLogManagerTest { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "100"); appendRLMConfig(props); - config = KafkaConfig.fromProps(props); - brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); + config = configs(props); + brokerTopicStats = new BrokerTopicStats(config.isRemoteStorageSystemEnabled()); - remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -263,6 +250,10 @@ public class RemoteLogManagerTest { doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class)); } + private RemoteLogManagerConfig configs(Properties props) { + return new RemoteLogManagerConfig(new AbstractConfig(RemoteLogManagerConfig.configDef(), props)); + } + @AfterEach void tearDown() { if (remoteLogManager != null) { @@ -353,9 +344,9 @@ public class RemoteLogManagerTest { props.put(configPrefix + key, "world"); props.put("remote.log.metadata.y", "z"); appendRLMConfig(props); - KafkaConfig config = KafkaConfig.fromProps(props); + RemoteLogManagerConfig config = configs(props); - Map metadataMangerConfig = config.remoteLogManagerConfig().remoteLogMetadataManagerProps(); + Map metadataMangerConfig = config.remoteLogMetadataManagerProps(); assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key)); assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); } @@ -370,9 +361,9 @@ public class RemoteLogManagerTest { props.put(configPrefix + key, "world"); props.put("remote.storage.manager.y", "z"); appendRLMConfig(props); - KafkaConfig config = KafkaConfig.fromProps(props); + RemoteLogManagerConfig config = configs(props); - Map remoteStorageManagerConfig = config.remoteLogManagerConfig().remoteStorageManagerProps(); + Map remoteStorageManagerConfig = config.remoteStorageManagerProps(); assertEquals(props.get(configPrefix + key), remoteStorageManagerConfig.get(key)); assertFalse(remoteStorageManagerConfig.containsKey("remote.storage.manager.y")); } @@ -401,9 +392,9 @@ public class RemoteLogManagerTest { // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); appendRLMConfig(props); - KafkaConfig config = KafkaConfig.fromProps(props); + RemoteLogManagerConfig config = configs(props); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config.remoteLogManagerConfig(), + config, brokerId, logDir, clusterId, @@ -1354,7 +1345,7 @@ public class RemoteLogManagerTest { void testGetClassLoaderAwareRemoteStorageManager() throws Exception { ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); try (RemoteLogManager remoteLogManager = - new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + new RemoteLogManager(config, brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -1745,7 +1736,7 @@ public class RemoteLogManagerTest { }); when(mockLog.logEndOffset()).thenReturn(300L); - remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, partition -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -1803,7 +1794,7 @@ public class RemoteLogManagerTest { @Test public void testRemoveMetricsOnClose() throws IOException { try (MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) { - RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, + RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { public RemoteStorageManager createRemoteStorageManager() { @@ -2200,7 +2191,7 @@ public class RemoteLogManagerTest { else return Collections.emptyIterator(); }); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -2225,7 +2216,7 @@ public class RemoteLogManagerTest { when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt())) .thenReturn(Collections.emptyIterator()); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, metrics) { @@ -2259,7 +2250,7 @@ public class RemoteLogManagerTest { }); AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2313,7 +2304,7 @@ public class RemoteLogManagerTest { } }; - remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -2967,7 +2958,7 @@ public class RemoteLogManagerTest { @Test public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException { AtomicLong logStartOffset = new AtomicLong(0); - try (RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), brokerTopicStats, metrics) { @@ -3128,7 +3119,7 @@ public class RemoteLogManagerTest { ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config.remoteLogManagerConfig(), + config, brokerId, logDir, clusterId, @@ -3205,7 +3196,7 @@ public class RemoteLogManagerTest { ); try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config.remoteLogManagerConfig(), + config, brokerId, logDir, clusterId, @@ -3290,7 +3281,7 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager( - config.remoteLogManagerConfig(), + config, brokerId, logDir, clusterId, @@ -3337,8 +3328,8 @@ public class RemoteLogManagerTest { Properties defaultProps = new Properties(); defaultProps.putAll(brokerConfig); appendRLMConfig(defaultProps); - KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); + RemoteLogManagerConfig defaultRlmConfig = configs(defaultProps); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); @@ -3349,9 +3340,9 @@ public class RemoteLogManagerTest { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); appendRLMConfig(customProps); - KafkaConfig config = KafkaConfig.fromProps(customProps); + RemoteLogManagerConfig config = configs(customProps); - RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config.remoteLogManagerConfig()); + RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(config); assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -3362,9 +3353,9 @@ public class RemoteLogManagerTest { Properties defaultProps = new Properties(); defaultProps.putAll(brokerConfig); appendRLMConfig(defaultProps); - KafkaConfig defaultRlmConfig = KafkaConfig.fromProps(defaultProps); + RemoteLogManagerConfig defaultRlmConfig = configs(defaultProps); - RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig.remoteLogManagerConfig()); + RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); @@ -3375,8 +3366,8 @@ public class RemoteLogManagerTest { customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); appendRLMConfig(customProps); - KafkaConfig rlmConfig = KafkaConfig.fromProps(customProps); - RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig.remoteLogManagerConfig()); + RemoteLogManagerConfig rlmConfig = configs(customProps); + RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); @@ -3646,7 +3637,7 @@ public class RemoteLogManagerTest { when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) .thenReturn(fileInputStream); - RemoteLogManager remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { @@ -3687,9 +3678,9 @@ public class RemoteLogManagerTest { props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, "30000"); appendRLMConfig(props); - config = KafkaConfig.fromProps(props); + config = configs(props); - remoteLogManager = new RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, time, + remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), brokerTopicStats, metrics) { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java similarity index 97% rename from core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java rename to storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java index 5f063c3f1ac..de6925b33c9 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java @@ -14,16 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; - -import kafka.utils.TestUtils; +package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; @@ -31,6 +27,7 @@ import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java similarity index 98% rename from core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java rename to storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java index 53905e6d114..6c7026a52d5 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.log.remote; +package org.apache.kafka.server.log.remote.storage; import kafka.utils.TestUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.Records; import org.apache.kafka.server.log.remote.quota.RLMQuotaManager; -import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.RemoteLogReadResult; diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java index ef211c0ceea..a539321f6ac 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java @@ -17,7 +17,6 @@ package org.apache.kafka.tiered.storage; import kafka.api.IntegrationTestHarness; -import kafka.log.remote.RemoteLogManager; import kafka.server.KafkaBroker; import org.apache.kafka.common.replica.ReplicaSelector; @@ -26,6 +25,7 @@ import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;