diff --git a/build.gradle b/build.gradle index 9b1971feed9..2e35057165c 100644 --- a/build.gradle +++ b/build.gradle @@ -902,6 +902,7 @@ project(':server') { implementation project(':transaction-coordinator') implementation project(':raft') implementation project(':share-coordinator') + implementation project(':storage:storage-api') implementation libs.jacksonDatabind implementation libs.metrics implementation libs.slf4jApi @@ -913,6 +914,7 @@ project(':server') { testImplementation testLog4j2Libs testImplementation project(':test-common:test-common-internal-api') testImplementation project(':test-common:test-common-runtime') + testImplementation project(':storage:storage-api').sourceSets.test.output testRuntimeOnly runtimeTestLibs } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index ccd08ad75ad..8f9b983cb78 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -220,8 +220,6 @@ class BrokerServer( brokerTopicStats, logDirFailureChannel) - remoteLogManagerOpt = createRemoteLogManager() - lifecycleManager = new BrokerLifecycleManager(config, time, s"broker-${config.nodeId}-", @@ -280,6 +278,8 @@ class BrokerServer( withWildcardHostnamesResolved(). withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name))) + remoteLogManagerOpt = createRemoteLogManager(listenerInfo) + alterPartitionManager = AlterPartitionManager( config, scheduler = kafkaScheduler, @@ -471,23 +471,6 @@ class BrokerServer( socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, "RequestHandlerAvgIdlePercent") - // Start RemoteLogManager before initializing broker metadata publishers. - remoteLogManagerOpt.foreach { rlm => - val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() - if (listenerName != null) { - val endpoint = listenerInfo.listeners().values().stream - .filter(e => - e.listenerName().isPresent && - ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName)) - ) - .findFirst() - .orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values())) - rlm.onEndPointCreated(endpoint) - } - rlm.startup() - } - metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler)) brokerMetadataPublisher = new BrokerMetadataPublisher(config, metadataCache, @@ -712,16 +695,31 @@ class BrokerServer( } } - protected def createRemoteLogManager(): Option[RemoteLogManager] = { - if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) { - Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, + protected def createRemoteLogManager(listenerInfo: ListenerInfo): Option[RemoteLogManager] = { + if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) { + val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName() + val endpoint = if (listenerName != null) { + Some(listenerInfo.listeners().values().stream + .filter(e => + e.listenerName().isPresent && + ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName)) + ) + .findFirst() + .orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values()))) + } else { + None + } + + val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time, (tp: TopicPartition) => logManager.getLog(tp).toJava, (tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => { logManager.getLog(tp).foreach { log => log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset) } }, - brokerTopicStats, metrics)) + brokerTopicStats, metrics, endpoint.toJava) + Some(rlm) } else { None } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 79d39f14578..3f73f8f731a 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2236,7 +2236,8 @@ class UnifiedLogTest { _ => Optional.empty[UnifiedLog](), (_, _) => {}, brokerTopicStats, - new Metrics())) + new Metrics(), + Optional.empty)) remoteLogManager.setDelayedOperationPurgatory(purgatory) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, @@ -2333,7 +2334,8 @@ class UnifiedLogTest { _ => Optional.empty[UnifiedLog](), (_, _) => {}, brokerTopicStats, - new Metrics())) + new Metrics(), + Optional.empty)) remoteLogManager.setDelayedOperationPurgatory(purgatory) val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1, diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 08cbfee24eb..748d5eda4ea 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3792,8 +3792,8 @@ class ReplicaManagerTest { _ => Optional.of(mockLog), (TopicPartition, Long) => {}, brokerTopicStats, - metrics) - remoteLogManager.startup() + metrics, + Optional.empty) val spyRLM = spy(remoteLogManager) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM)) @@ -3903,8 +3903,8 @@ class ReplicaManagerTest { _ => Optional.of(dummyLog), (TopicPartition, Long) => {}, brokerTopicStats, - metrics) - remoteLogManager.startup() + metrics, + Optional.empty) val spyRLM = spy(remoteLogManager) val timer = new MockTimer(time) diff --git a/server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java index 5e42d67b985..43b56e63298 100644 --- a/server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java @@ -27,43 +27,41 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.metadata.authorizer.StandardAuthorizer; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager; import java.util.LinkedHashMap; import java.util.Map; import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG; import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; public class MonitorablePluginsIntegrationTest { - private static int controllerId(Type type) { return type == Type.KRAFT ? 3000 : 0; } - private static Map expectedTags(String config, String clazz) { - return expectedTags(config, clazz, Map.of()); - } - - private static Map expectedTags(String config, String clazz, Map extraTags) { - Map tags = new LinkedHashMap<>(); - tags.put("config", config); - tags.put("class", clazz); - tags.putAll(extraTags); - return tags; - } - @ClusterTest( types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"), @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"), - @ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector") + @ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector"), + @ClusterConfigProperty(key = REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, value = "true"), + @ClusterConfigProperty(key = REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, + value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteLogMetadataManager"), + @ClusterConfigProperty(key = REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, + value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteStorageManager") } ) public void testMonitorableServerPlugins(ClusterInstance clusterInstance) { assertAuthorizerMetrics(clusterInstance); assertReplicaSelectorMetrics(clusterInstance); + assertRemoteLogManagerMetrics(clusterInstance); } private void assertAuthorizerMetrics(ClusterInstance clusterInstance) { @@ -78,6 +76,17 @@ public class MonitorablePluginsIntegrationTest { expectedTags(AUTHORIZER_CLASS_NAME_CONFIG, "StandardAuthorizer", Map.of("role", "controller"))); } + private void assertRemoteLogManagerMetrics(ClusterInstance clusterInstance) { + assertMetrics( + clusterInstance.brokers().get(0).metrics(), + MonitorableNoOpRemoteLogMetadataManager.METRICS_COUNT, + expectedTags(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteLogMetadataManager.class.getSimpleName())); + assertMetrics( + clusterInstance.brokers().get(0).metrics(), + MonitorableNoOpRemoteStorageManager.METRICS_COUNT, + expectedTags(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteStorageManager.class.getSimpleName())); + } + private void assertReplicaSelectorMetrics(ClusterInstance clusterInstance) { assertMetrics( clusterInstance.brokers().get(0).metrics(), @@ -98,6 +107,17 @@ public class MonitorablePluginsIntegrationTest { assertEquals(expected, found); } + public static class MonitorableNoOpRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager implements Monitorable { + + private static final int METRICS_COUNT = 1; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + MetricName name = metrics.metricName("name", "description", Map.of()); + metrics.addMetric(name, (Measurable) (config, now) -> 123); + } + } + public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable { private static final int METRICS_COUNT = 1; @@ -108,4 +128,27 @@ public class MonitorablePluginsIntegrationTest { metrics.addMetric(name, (Measurable) (config, now) -> 123); } } + + public static class MonitorableNoOpRemoteStorageManager extends NoOpRemoteStorageManager implements Monitorable { + + private static final int METRICS_COUNT = 1; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + MetricName name = metrics.metricName("name", "description", Map.of()); + metrics.addMetric(name, (Measurable) (config, now) -> 123); + } + } + + private static Map expectedTags(String config, String clazz) { + return expectedTags(config, clazz, Map.of()); + } + + private static Map expectedTags(String config, String clazz, Map extraTags) { + Map tags = new LinkedHashMap<>(); + tags.put("config", config); + tags.put("class", clazz); + tags.putAll(extraTags); + return tags; + } } diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java index 2cc581dbfec..189e0a1713e 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java @@ -47,6 +47,10 @@ import java.util.concurrent.CompletableFuture; * "cluster.id", "broker.id" and all other properties prefixed with the config: "remote.log.metadata.manager.impl.prefix" * (default value is "rlmm.config.") are passed when {@link #configure(Map)} is invoked on this instance. *

+ * + * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics. + * The following tags are automatically added to all metrics registered: config set to + * remote.log.metadata.manager.class.name, and class set to the RemoteLogMetadataManager class name. */ public interface RemoteLogMetadataManager extends Configurable, Closeable { diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java index 03eeebcfa1d..3fd6a633b7d 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java @@ -39,6 +39,10 @@ import java.util.Optional; *

* All properties prefixed with the config: "remote.log.storage.manager.impl.prefix" * (default value is "rsm.config.") are passed when {@link #configure(Map)} is invoked on this instance. + * + * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics. + * The following tags are automatically added to all metrics registered: config set to + * remote.log.storage.manager.class.name, and class set to the RemoteStorageManager class name. */ public interface RemoteStorageManager extends Configurable, Closeable { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index a5080c8eb65..a00fd1da614 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.internals.SecurityManagerCompatibility; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -156,9 +157,9 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { private final BrokerTopicStats brokerTopicStats; private final Metrics metrics; - private final RemoteStorageManager remoteStorageManager; + private final Plugin remoteStorageManagerPlugin; - private final RemoteLogMetadataManager remoteLogMetadataManager; + private final Plugin remoteLogMetadataManagerPlugin; private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); @@ -190,7 +191,6 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { private Optional endpoint = Optional.empty(); private boolean closed = false; - private volatile boolean remoteLogManagerConfigured = false; private final Timer remoteReadTimer; private volatile DelayedOperationPurgatory delayedRemoteListOffsetsPurgatory; @@ -216,7 +216,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { Function> fetchLog, BiConsumer updateRemoteLogStartOffset, BrokerTopicStats brokerTopicStats, - Metrics metrics) throws IOException { + Metrics metrics, + Optional endpoint) throws IOException { this.rlmConfig = rlmConfig; this.brokerId = brokerId; this.logDir = logDir; @@ -226,9 +227,11 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { this.updateRemoteLogStartOffset = updateRemoteLogStartOffset; this.brokerTopicStats = brokerTopicStats; this.metrics = metrics; + this.endpoint = endpoint; + + remoteStorageManagerPlugin = configAndWrapRsmPlugin(createRemoteStorageManager()); + remoteLogMetadataManagerPlugin = configAndWrapRlmmPlugin(createRemoteLogMetadataManager()); - remoteStorageManager = createRemoteStorageManager(); - remoteLogMetadataManager = createRemoteLogMetadataManager(); rlmCopyQuotaManager = createRLMCopyQuotaManager(); rlmFetchQuotaManager = createRLMFetchQuotaManager(); @@ -237,7 +240,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { copyThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(), "The %s time in millis remote copies was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor(); - indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteStorageManager, logDir); + indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteStorageManagerPlugin.get(), logDir); delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmCopyThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(), "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-%d"); @@ -360,10 +363,11 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { }); } - private void configureRSM() { + private Plugin configAndWrapRsmPlugin(RemoteStorageManager rsm) { final Map rsmProps = new HashMap<>(rlmConfig.remoteStorageManagerProps()); rsmProps.put(ServerConfigs.BROKER_ID_CONFIG, brokerId); - remoteStorageManager.configure(rsmProps); + rsm.configure(rsmProps); + return Plugin.wrapInstance(rsm, metrics, RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP); } RemoteLogMetadataManager createRemoteLogMetadataManager() { @@ -379,11 +383,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { }); } - public void onEndPointCreated(Endpoint endpoint) { - this.endpoint = Optional.of(endpoint); - } - - private void configureRLMM() { + private Plugin configAndWrapRlmmPlugin(RemoteLogMetadataManager rlmm) { final Map rlmmProps = new HashMap<>(); endpoint.ifPresent(e -> { rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers", e.host() + ":" + e.port()); @@ -396,23 +396,16 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); - remoteLogMetadataManager.configure(rlmmProps); + rlmm.configure(rlmmProps); + return Plugin.wrapInstance(rlmm, metrics, RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP); } - public void startup() { - // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources - // in connecting to the brokers or remote storages. - configureRSM(); - configureRLMM(); - remoteLogManagerConfigured = true; - } - - private boolean isRemoteLogManagerConfigured() { - return this.remoteLogManagerConfigured; + RemoteLogMetadataManager remoteLogMetadataManager() { + return remoteLogMetadataManagerPlugin.get(); } public RemoteStorageManager storageManager() { - return remoteStorageManager; + return remoteStorageManagerPlugin.get(); } private Stream filterPartitions(Set partitions) { @@ -442,10 +435,6 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { Map topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); - if (rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { - throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); - } - Map leaderPartitions = filterPartitions(partitionsBecomeLeader) .collect(Collectors.toMap(p -> new TopicIdPartition(topicIds.get(p.topicPartition().topic()), p.topicPartition()), p -> p.unifiedLog().isPresent() ? p.unifiedLog().get().config().remoteLogCopyDisable() : false)); @@ -461,7 +450,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { leaderPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); followerPartitions.forEach((tp, __) -> cacheTopicPartitionIds(tp)); - remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); + remoteLogMetadataManagerPlugin.get().onPartitionLeadershipChanges(leaderPartitions.keySet(), followerPartitions.keySet()); followerPartitions.forEach((tp, __) -> doHandleFollowerPartition(tp)); // If this node was the previous leader for the partition, then the RLMTask might be running in the @@ -545,13 +534,13 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { if (!pendingActionsPartitions.isEmpty()) { pendingActionsPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition())); - remoteLogMetadataManager.onStopPartitions(pendingActionsPartitions); + remoteLogMetadataManagerPlugin.get().onStopPartitions(pendingActionsPartitions); } } private void deleteRemoteLogPartition(TopicIdPartition partition) throws RemoteStorageException, ExecutionException, InterruptedException { List metadataList = new ArrayList<>(); - remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add); + remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(partition).forEachRemaining(metadataList::add); List deleteSegmentStartedEvents = metadataList.stream() .map(metadata -> @@ -564,7 +553,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { Collection deletedSegmentIds = new ArrayList<>(); for (RemoteLogSegmentMetadata metadata: metadataList) { deletedSegmentIds.add(metadata.remoteLogSegmentId().id()); - remoteStorageManager.deleteLogSegmentData(metadata); + remoteStorageManagerPlugin.get().deleteLogSegmentData(metadata); } indexCache.removeAll(deletedSegmentIds); @@ -579,7 +568,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { private CompletableFuture publishEvents(List events) throws RemoteStorageException { List> result = new ArrayList<>(); for (RemoteLogSegmentMetadataUpdate event : events) { - result.add(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(event)); + result.add(remoteLogMetadataManagerPlugin.get().updateRemoteLogSegmentMetadata(event)); } return CompletableFuture.allOf(result.toArray(new CompletableFuture[0])); } @@ -591,7 +580,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { if (topicId == null) { throw new KafkaException("No topic id registered for topic partition: " + topicPartition); } - return remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset); + return remoteLogMetadataManagerPlugin.get().remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset); } /** @@ -611,7 +600,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { throw new KafkaException("No topic id registered for topic partition: " + topicPartition); } TopicIdPartition tpId = new TopicIdPartition(topicId, topicPartition); - return remoteLogMetadataManager.nextSegmentWithTxnIndex(tpId, epochForOffset, offset); + return remoteLogMetadataManagerPlugin.get().nextSegmentWithTxnIndex(tpId, epochForOffset, offset); } Optional lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long startingOffset) @@ -621,7 +610,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { InputStream remoteSegInputStream = null; try { // Search forward for the position of the last offset that is greater than or equal to the startingOffset - remoteSegInputStream = remoteStorageManager.fetchLogSegment(rlsMetadata, startPos); + remoteSegInputStream = remoteStorageManagerPlugin.get().fetchLogSegment(rlsMetadata, startPos); RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); while (true) { @@ -711,7 +700,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { int epoch = maybeEpoch.getAsInt(); // KAFKA-15802: Add a new API for RLMM to choose how to implement the predicate. // currently, all segments are returned and then iterated, and filtered - Iterator iterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + Iterator iterator = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch); while (iterator.hasNext()) { RemoteLogSegmentMetadata rlsMetadata = iterator.next(); if (rlsMetadata.maxTimestampMs() >= timestamp @@ -795,7 +784,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { logger.debug("Skipping the current run for partition {} as it is cancelled", topicIdPartition); return; } - if (!remoteLogMetadataManager.isReady(topicIdPartition)) { + if (!remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition)) { logger.debug("Skipping the current run for partition {} as the remote-log metadata is not ready", topicIdPartition); return; } @@ -1005,7 +994,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { segment.largestTimestamp(), brokerId, time.milliseconds(), segment.log().sizeInBytes(), segmentLeaderEpochs, isTxnIdxEmpty); - remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); + remoteLogMetadataManagerPlugin.get().addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get(); ByteBuffer leaderEpochsIndex = epochEntriesAsByteBuffer(getLeaderEpochEntries(log, -1, nextSegmentBaseOffset)); LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()), @@ -1016,7 +1005,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { Optional customMetadata; try { - customMetadata = remoteStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); + customMetadata = remoteStorageManagerPlugin.get().copyLogSegmentData(copySegmentStartedRlsm, segmentData); } catch (RemoteStorageException e) { logger.info("Copy failed, cleaning segment {}", copySegmentStartedRlsm.remoteLogSegmentId()); try { @@ -1052,7 +1041,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { } } - remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get(); + remoteLogMetadataManagerPlugin.get().updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get(); brokerTopicStats.topicStats(log.topicPartition().topic()) .remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark(copySegmentStartedRlsm.segmentSizeInBytes()); @@ -1235,7 +1224,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { final UnifiedLog log = logOptional.get(); // Cleanup remote log segments and update the log start offset if applicable. - final Iterator segmentMetadataIter = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition); + final Iterator segmentMetadataIter = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition); if (!segmentMetadataIter.hasNext()) { updateMetadataCountAndLogSizeWith(0, 0); logger.debug("No remote log segments available on remote storage for partition: {}", topicIdPartition); @@ -1277,7 +1266,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { long sizeOfDeletableSegmentsBytes = 0L; while (canProcess && epochIterator.hasNext()) { Integer epoch = epochIterator.next(); - Iterator segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + Iterator segmentsIterator = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch); while (canProcess && segmentsIterator.hasNext()) { if (isCancelled()) { logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); @@ -1370,7 +1359,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { while (epochsToClean.hasNext()) { int epoch = epochsToClean.next(); - Iterator segmentsToBeCleaned = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + Iterator segmentsToBeCleaned = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch); while (segmentsToBeCleaned.hasNext()) { if (!isCancelled()) { RemoteLogSegmentMetadata nextSegmentMetadata = segmentsToBeCleaned.next(); @@ -1415,7 +1404,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { // log size may be computed for all the segments but not for segments with in the current // partition's leader epoch lineage. Better to revisit this API. // remoteLogSizeBytes += remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch); - Iterator segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); + Iterator segmentsIterator = remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, epoch); while (segmentsIterator.hasNext()) { RemoteLogSegmentMetadata segmentMetadata = segmentsIterator.next(); // Count only the size of segments in "COPY_SEGMENT_FINISHED" state because @@ -1472,7 +1461,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { String topic = segmentMetadata.topicIdPartition().topic(); // Publish delete segment started event. - remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + remoteLogMetadataManagerPlugin.get().updateRemoteLogSegmentMetadata( new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get(); @@ -1481,7 +1470,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { // Delete the segment in remote storage. try { - remoteStorageManager.deleteLogSegmentData(segmentMetadata); + remoteStorageManagerPlugin.get().deleteLogSegmentData(segmentMetadata); } catch (RemoteStorageException e) { brokerTopicStats.topicStats(topic).failedRemoteDeleteRequestRate().mark(); brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark(); @@ -1489,7 +1478,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { } // Publish delete segment finished event. - remoteLogMetadataManager.updateRemoteLogSegmentMetadata( + remoteLogMetadataManagerPlugin.get().updateRemoteLogSegmentMetadata( new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(), segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get(); LOGGER.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId()); @@ -1685,7 +1674,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { remoteLogSegmentMetadata = rlsMetadataOptional.get(); // Search forward for the position of the last offset that is greater than or equal to the target offset startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); - remoteSegInputStream = remoteStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); + remoteSegInputStream = remoteStorageManagerPlugin.get().fetchLogSegment(remoteLogSegmentMetadata, startPos); RemoteLogInputStream remoteLogInputStream = getRemoteLogInputStream(remoteSegInputStream); enrichedRecordBatch = findFirstBatch(remoteLogInputStream, offset); if (enrichedRecordBatch.batch == null) { @@ -1905,7 +1894,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { while (offsetAndEpoch == null && maybeEpochEntry.isPresent()) { int epoch = maybeEpochEntry.get().epoch; Optional highestRemoteOffsetOpt = - remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch); + remoteLogMetadataManagerPlugin.get().highestOffsetForEpoch(topicIdPartition, epoch); if (highestRemoteOffsetOpt.isPresent()) { Map.Entry entry = leaderEpochCache.endOffsetFor(epoch, log.logEndOffset()); int requestedEpoch = entry.getKey(); @@ -1936,7 +1925,7 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { .orElseGet(OptionalInt::empty); while (logStartOffset.isEmpty() && earliestEpochOpt.isPresent()) { Iterator iterator = - remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt()); + remoteLogMetadataManagerPlugin.get().listRemoteLogSegments(topicIdPartition, earliestEpochOpt.getAsInt()); if (iterator.hasNext()) { logStartOffset = Optional.of(iterator.next().startOffset()); } @@ -2034,8 +2023,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel); leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel); followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel); - Utils.closeQuietly(remoteStorageManager, "RemoteStorageManager"); - Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager"); + Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); + Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); Utils.closeQuietly(indexCache, "RemoteIndexCache"); rlmCopyThreadPool.close(); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 0edbd542e64..6068edb8938 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -26,6 +26,8 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -139,6 +141,7 @@ import static org.apache.kafka.test.TestUtils.tempFile; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -214,6 +217,11 @@ public class RemoteLogManagerTest { private final MockScheduler scheduler = new MockScheduler(time); private final Properties brokerConfig = kafka.utils.TestUtils.createDummyBrokerConfig(); + private final String host = "localhost"; + private final int port = 1234; + private final String securityProtocol = "PLAINTEXT"; + private final Optional endPoint = Optional.of(new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port)); + @BeforeEach void setUp() throws Exception { checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1)); @@ -229,16 +237,20 @@ public class RemoteLogManagerTest { remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + @Override public RLMQuotaManager createRLMCopyQuotaManager() { return rlmCopyQuotaManager; } + @Override public Duration quotaTimeout() { return Duration.ofMillis(100); } @@ -370,13 +382,6 @@ public class RemoteLogManagerTest { @Test void testRemoteLogMetadataManagerWithEndpointConfig() { - String host = "localhost"; - int port = 1234; - String securityProtocol = "PLAINTEXT"; - Endpoint endPoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port); - remoteLogManager.onEndPointCreated(endPoint); - remoteLogManager.startup(); - ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); verify(remoteLogMetadataManager, times(1)).configure(capture.capture()); assertEquals(host + ":" + port, capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers")); @@ -402,24 +407,20 @@ public class RemoteLogManagerTest { tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, - metrics) { + metrics, + endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } }) { - - String host = "localhost"; - int port = 1234; - String securityProtocol = "PLAINTEXT"; - Endpoint endpoint = new Endpoint(securityProtocol, SecurityProtocol.PLAINTEXT, host, port); - remoteLogManager.onEndPointCreated(endpoint); - remoteLogManager.startup(); - ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); - verify(remoteLogMetadataManager, times(1)).configure(capture.capture()); + // One is called from setup, the other is from this function. + verify(remoteLogMetadataManager, times(2)).configure(capture.capture()); assertEquals(host + ":" + port, capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "bootstrap.servers")); // should be overridden as SSL assertEquals("SSL", capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol")); @@ -429,8 +430,7 @@ public class RemoteLogManagerTest { } @Test - void testStartup() { - remoteLogManager.startup(); + void testConfigure() { ArgumentCaptor> capture = ArgumentCaptor.forClass(Map.class); verify(remoteStorageManager, times(1)).configure(capture.capture()); assertEquals(brokerId, capture.getValue().get("broker.id")); @@ -765,17 +765,8 @@ public class RemoteLogManagerTest { assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count()); } - @Test - void testLeadershipChangesWithoutRemoteLogManagerConfiguring() { - assertThrows(KafkaException.class, () -> - remoteLogManager.onLeadershipChange( - Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), topicIds), - "RemoteLogManager is not configured when remote storage system is enabled"); - } - @Test void testRemoteLogManagerTasksAvgIdlePercentAndMetadataCountMetrics() throws Exception { - remoteLogManager.startup(); long oldSegmentStartOffset = 0L; long nextSegmentStartOffset = 150L; int segmentCount = 3; @@ -894,7 +885,6 @@ public class RemoteLogManagerTest { @Test void testRemoteLogTaskUpdateRemoteLogSegmentMetadataAfterLogDirChanged() throws Exception { - remoteLogManager.startup(); long oldSegmentStartOffset = 0L; long nextSegmentStartOffset = 150L; int segmentCount = 3; @@ -1009,7 +999,6 @@ public class RemoteLogManagerTest { @Test void testRemoteLogManagerRemoteMetrics() throws Exception { - remoteLogManager.startup(); long oldestSegmentStartOffset = 0L; long olderSegmentStartOffset = 75L; long nextSegmentStartOffset = 150L; @@ -1348,7 +1337,8 @@ public class RemoteLogManagerTest { new RemoteLogManager(config, brokerId, logDir, clusterId, time, t -> Optional.empty(), (topicPartition, offset) -> { }, - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return rsmManager; } @@ -1373,7 +1363,6 @@ public class RemoteLogManagerTest { @Test void testTopicIdCacheUpdates() throws RemoteStorageException { - remoteLogManager.startup(); TopicPartitionLog mockLeaderPartition = mockPartition(leaderTopicIdPartition); TopicPartitionLog mockFollowerPartition = mockPartition(followerTopicIdPartition); @@ -1398,7 +1387,6 @@ public class RemoteLogManagerTest { @Test void testFetchRemoteLogSegmentMetadata() throws RemoteStorageException { - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), topicIds); remoteLogManager.fetchRemoteLogSegmentMetadata(leaderTopicIdPartition.topicPartition(), 10, 100L); @@ -1412,7 +1400,6 @@ public class RemoteLogManagerTest { @Test public void testFetchNextSegmentWithTxnIndex() throws RemoteStorageException { - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), topicIds); remoteLogManager.fetchNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(), 10, 100L); @@ -1445,7 +1432,6 @@ public class RemoteLogManagerTest { return Optional.of(metadata); }); - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), topicIds); @@ -1480,7 +1466,6 @@ public class RemoteLogManagerTest { return metadataOpt; }); - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), topicIds); @@ -1498,7 +1483,6 @@ public class RemoteLogManagerTest { @Test void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() { - remoteLogManager.startup(); RemoteLogManager spyRemoteLogManager = spy(remoteLogManager); spyRemoteLogManager.onLeadershipChange( Set.of(), Set.of(mockPartition(followerTopicIdPartition)), topicIds); @@ -1523,7 +1507,6 @@ public class RemoteLogManagerTest { @Test void testFindOffsetByTimestamp() throws IOException, RemoteStorageException { - remoteLogManager.startup(); TopicPartition tp = leaderTopicIdPartition.topicPartition(); long ts = time.milliseconds(); @@ -1557,7 +1540,6 @@ public class RemoteLogManagerTest { @Test void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, RemoteStorageException { - remoteLogManager.startup(); TopicPartition tp = leaderTopicIdPartition.topicPartition(); long ts = time.milliseconds(); @@ -1589,7 +1571,6 @@ public class RemoteLogManagerTest { @Test void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteStorageException { - remoteLogManager.startup(); TopicPartition tp = leaderTopicIdPartition.topicPartition(); long ts = time.milliseconds(); @@ -1739,7 +1720,7 @@ public class RemoteLogManagerTest { remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, partition -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; @@ -1749,7 +1730,6 @@ public class RemoteLogManagerTest { return Optional.of(expectedRemoteResult); } }; - remoteLogManager.startup(); remoteLogManager.onLeadershipChange(Set.of(), Set.of(mockFollowerPartition), topicIds); // Read the offset from the remote storage, since the local-log starts from offset 50L and the message with `timestamp` does not exist in the local log @@ -1796,11 +1776,12 @@ public class RemoteLogManagerTest { try (MockedConstruction mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class)) { RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { - }, brokerTopicStats, metrics) { + }, brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } - + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -2106,7 +2087,6 @@ public class RemoteLogManagerTest { @Test public void testStopPartitionsWithoutDeletion() throws RemoteStorageException { - remoteLogManager.startup(); BiConsumer errorHandler = (topicPartition, throwable) -> fail("shouldn't be called"); Set partitions = new HashSet<>(); partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false, false)); @@ -2128,7 +2108,6 @@ public class RemoteLogManagerTest { @Test public void testStopPartitionsWithDeletion() throws RemoteStorageException { - remoteLogManager.startup(); BiConsumer errorHandler = (topicPartition, ex) -> fail("shouldn't be called: " + ex); Set partitions = new HashSet<>(); @@ -2194,7 +2173,8 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -2219,7 +2199,8 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -2253,7 +2234,8 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -2307,16 +2289,20 @@ public class RemoteLogManagerTest { remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + @Override public RLMQuotaManager createRLMCopyQuotaManager() { return rlmCopyQuotaManager; } + @Override public Duration quotaTimeout() { return Duration.ofMillis(100); } @@ -2849,7 +2835,6 @@ public class RemoteLogManagerTest { ); checkpoint.write(epochEntries); LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); - int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; long localLogSegmentsSize = 512L; long retentionSize = ((long) segmentCount - deletableSegmentCount) * segmentSize + localLogSegmentsSize; @@ -2869,7 +2854,7 @@ public class RemoteLogManagerTest { List segmentMetadataList = listRemoteLogSegmentMetadata( leaderTopicIdPartition, segmentCount, recordsPerSegment, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); - verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch); + verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount); } @ParameterizedTest(name = "testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}") @@ -2887,7 +2872,6 @@ public class RemoteLogManagerTest { ); checkpoint.write(epochEntries); LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, scheduler); - int currentLeaderEpoch = epochEntries.get(epochEntries.size() - 1).epoch; long localLogSegmentsSize = 512L; long retentionSize = -1L; @@ -2907,7 +2891,7 @@ public class RemoteLogManagerTest { List segmentMetadataList = listRemoteLogSegmentMetadataByTime( leaderTopicIdPartition, segmentCount, deletableSegmentCount, recordsPerSegment, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED); - verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch); + verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount); } private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long remoteDeleteLagSegments) { @@ -2926,8 +2910,7 @@ public class RemoteLogManagerTest { } private void verifyDeleteLogSegment(List segmentMetadataList, - int deletableSegmentCount, - int currentLeaderEpoch) + int deletableSegmentCount) throws RemoteStorageException, ExecutionException, InterruptedException { when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)) .thenReturn(segmentMetadataList.iterator()); @@ -2961,10 +2944,12 @@ public class RemoteLogManagerTest { try (RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> logStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -2999,8 +2984,8 @@ public class RemoteLogManagerTest { .thenAnswer(answer -> CompletableFuture.runAsync(() -> { })); task.cleanupExpiredRemoteLogSegments(); - - verifyNoMoreInteractions(remoteStorageManager); + // One is called from setup, the other is from this function. + verify(remoteStorageManager, times(2)).configure(any()); assertEquals(0L, logStartOffset.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); @@ -3127,10 +3112,13 @@ public class RemoteLogManagerTest { tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, - metrics) { + metrics, + endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return rsmManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } @@ -3204,23 +3192,25 @@ public class RemoteLogManagerTest { tp -> Optional.of(mockLog), (topicPartition, offset) -> { }, brokerTopicStats, - metrics) { + metrics, + endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return rsmManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } - - public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, - int epochForOffset, long offset) { + @Override + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, int epochForOffset, long offset) { return Optional.of(segmentMetadata); } - + @Override int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { return 1; } - + @Override EnrichedRecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) { when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); doNothing().when(firstBatch).writeTo(capture.capture()); @@ -3290,23 +3280,25 @@ public class RemoteLogManagerTest { (topicPartition, offset) -> { }, brokerTopicStats, - metrics) { + metrics, + endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return rsmManager; } - + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } - - public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, - int epochForOffset, long offset) { + @Override + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, int epochForOffset, long offset) { return Optional.of(segmentMetadata); } + @Override public RemoteLogInputStream getRemoteLogInputStream(InputStream in) { return remoteLogInputStream; } - + @Override int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { return 1; } @@ -3436,7 +3428,6 @@ public class RemoteLogManagerTest { @Test public void testRLMShutdownDuringQuotaExceededScenario() throws Exception { - remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); @@ -3597,7 +3588,6 @@ public class RemoteLogManagerTest { @Test public void testTierLagResetsToZeroOnBecomingFollower() { - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition); @@ -3640,18 +3630,20 @@ public class RemoteLogManagerTest { RemoteLogManager remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + @Override int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { return 0; } }; - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); @@ -3683,16 +3675,20 @@ public class RemoteLogManagerTest { remoteLogManager = new RemoteLogManager(config, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), (topicPartition, offset) -> currentLogStartOffset.set(offset), - brokerTopicStats, metrics) { + brokerTopicStats, metrics, endPoint) { + @Override public RemoteStorageManager createRemoteStorageManager() { return remoteStorageManager; } + @Override public RemoteLogMetadataManager createRemoteLogMetadataManager() { return remoteLogMetadataManager; } + @Override public RLMQuotaManager createRLMCopyQuotaManager() { return rlmCopyQuotaManager; } + @Override public Duration quotaTimeout() { return Duration.ofMillis(100); } @@ -3708,7 +3704,6 @@ public class RemoteLogManagerTest { latch.countDown(); return false; }); - remoteLogManager.startup(); remoteLogManager.onLeadershipChange( Set.of(mockPartition(leaderTopicIdPartition)), Set.of(mockPartition(followerTopicIdPartition)), @@ -3730,6 +3725,34 @@ public class RemoteLogManagerTest { verifyNoMoreInteractions(remoteStorageManager); } + @Test + public void testMonitorableRemoteLogStorageManager() throws IOException { + Properties props = new Properties(); + props.putAll(brokerConfig); + appendRLMConfig(props); + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteStorageManager.class.getName()); + props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteLogMetadataManager.class.getName()); + config = configs(props); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + config, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats, + metrics, + endPoint)) { + assertInstanceOf(MonitorableNoOpRemoteStorageManager.class, remoteLogManager.storageManager()); + assertInstanceOf(MonitorableNoOpRemoteLogMetadataManager.class, remoteLogManager.remoteLogMetadataManager()); + + assertTrue(((MonitorableNoOpRemoteStorageManager) remoteLogManager.storageManager()).pluginMetrics); + assertTrue(((MonitorableNoOpRemoteLogMetadataManager) remoteLogManager.remoteLogMetadataManager()).pluginMetrics); + } + } + private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException { byte magic = RecordBatch.CURRENT_MAGIC_VALUE; Compression compression = Compression.NONE; @@ -3773,4 +3796,21 @@ public class RemoteLogManagerTest { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); } + public static class MonitorableNoOpRemoteStorageManager extends NoOpRemoteStorageManager implements Monitorable { + public boolean pluginMetrics = false; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + pluginMetrics = true; + } + } + + public static class MonitorableNoOpRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager implements Monitorable { + public boolean pluginMetrics = false; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + pluginMetrics = true; + } + } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java index de6925b33c9..db083e79219 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogOffsetReaderTest.java @@ -152,7 +152,8 @@ class RemoteLogOffsetReaderTest { tp -> Optional.empty(), (tp, logStartOffset) -> { }, new BrokerTopicStats(true), - new Metrics() + new Metrics(), + Optional.empty() ); }