diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 927cc47efde..91011b1d9c2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -56,28 +56,25 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.function.Supplier; /** - * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. + * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name + * {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class, and it subscribes * to metadata updates for the registered user topic partitions. */ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); + private final Time time = Time.SYSTEM; - private volatile boolean configured = false; - - // It indicates whether the close process of this instance is started or not via #close() method. - // Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD - // if the field is read but not updated in a spin loop like in #initializeResources() method. + private final AtomicBoolean configured = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean initialized = new AtomicBoolean(false); - private final Time time = Time.SYSTEM; - private final boolean startConsumerThread; private Thread initializationThread; private volatile ProducerManager producerManager; @@ -85,112 +82,77 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana // This allows to gracefully close this instance using {@link #close()} method while there are some pending or new // requests calling different methods which use the resources like producer/consumer managers. - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - private RemotePartitionMetadataStore remotePartitionMetadataStore; - private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; - private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final RemotePartitionMetadataStore remotePartitionMetadataStore; private final Set pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>()); - private volatile boolean initializationFailed; - private final Supplier remoteLogMetadataManagerSupplier; - private final Function remoteLogMetadataTopicPartitionerFunction; + private volatile boolean initializationFailed = false; + private final Function partitionerFunction; - /** - * The default constructor delegates to the internal one, starting the consumer thread and - * supplying an instance of RemoteLogMetadataTopicPartitioner and RemotePartitionMetadataStore by default. - */ public TopicBasedRemoteLogMetadataManager() { - this(true, RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new); + this(RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new); + } + + TopicBasedRemoteLogMetadataManager(Function partitionerFunction, + Supplier metadataStoreSupplier) { + this.partitionerFunction = partitionerFunction; + this.remotePartitionMetadataStore = metadataStoreSupplier.get(); } /** - * Used in tests to dynamically configure the instance. + * Adds metadata for a remote log segment to the metadata store. + * The provided metadata must have the state {@code COPY_SEGMENT_STARTED}. + * + * @param remoteLogSegmentMetadata the metadata of the remote log segment to be added; must not be null + * @return a {@link CompletableFuture} that completes once the metadata has been published to the topic + * @throws RemoteStorageException if an error occurs while storing the metadata + * @throws IllegalArgumentException if the state of the provided metadata is not {@code COPY_SEGMENT_STARTED} */ - TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Function remoteLogMetadataTopicPartitionerFunction, Supplier remoteLogMetadataManagerSupplier) { - this.startConsumerThread = startConsumerThread; - this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier; - this.remoteLogMetadataTopicPartitionerFunction = remoteLogMetadataTopicPartitionerFunction; - } - @Override public CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException { Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); - - // This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers - // closing the producer/consumer manager instances. - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - // This method is allowed only to add remote log segment with the initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED) - // but not to update the existing remote log segment metadata. + return withReadLockAndEnsureInitialized(() -> { if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) { throw new IllegalArgumentException( "Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED + " but it contains state as: " + remoteLogSegmentMetadata.state()); } - - // Publish the message to the topic. - return storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(), - remoteLogSegmentMetadata); - } finally { - lock.readLock().unlock(); - } + return storeRemoteLogMetadata(remoteLogSegmentMetadata); + }); } @Override - public CompletableFuture updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) + public CompletableFuture updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate) throws RemoteStorageException { - Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null"); - - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - // Callers should use addRemoteLogSegmentMetadata to add RemoteLogSegmentMetadata with state as - // RemoteLogSegmentState.COPY_SEGMENT_STARTED. - if (segmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) { - throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: " - + RemoteLogSegmentState.COPY_SEGMENT_STARTED); + Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null"); + return withReadLockAndEnsureInitialized(() -> { + if (metadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) { + throw new IllegalArgumentException("Given remoteLogSegmentMetadataUpdate should not have the state as: " + + RemoteLogSegmentState.COPY_SEGMENT_STARTED); } - - // Publish the message to the topic. - return storeRemoteLogMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); - } finally { - lock.readLock().unlock(); - } + return storeRemoteLogMetadata(metadataUpdate); + }); } @Override - public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) + public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata deleteMetadata) throws RemoteStorageException { - Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); - - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - return storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); - } finally { - lock.readLock().unlock(); - } + Objects.requireNonNull(deleteMetadata, "deleteMetadata can not be null"); + return withReadLockAndEnsureInitialized( + () -> storeRemoteLogMetadata(deleteMetadata)); } /** * Returns {@link CompletableFuture} which will complete only after publishing of the given {@code remoteLogMetadata} into * the remote log metadata topic and the internal consumer is caught up until the produced record's offset. * - * @param topicIdPartition partition of the given remoteLogMetadata. - * @param remoteLogMetadata RemoteLogMetadata to be stored. + * @param remoteLogMetadata RemoteLogMetadata to be stored. * @return a future with acknowledge and potentially waiting also for consumer to catch up. * This ensures cache is synchronized with backing topic. * @throws RemoteStorageException if there are any storage errors occur. */ - private CompletableFuture storeRemoteLogMetadata(TopicIdPartition topicIdPartition, - RemoteLogMetadata remoteLogMetadata) - throws RemoteStorageException { - log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata); + private CompletableFuture storeRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { + log.debug("Storing the partition: {} metadata: {}", remoteLogMetadata.topicIdPartition(), remoteLogMetadata); try { // Publish the message to the metadata topic. CompletableFuture produceFuture = producerManager.publishMessage(remoteLogMetadata); @@ -214,62 +176,33 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana @Override public Optional remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int epochForOffset, - long offset) - throws RemoteStorageException { - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - return remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset); - } finally { - lock.readLock().unlock(); - } + long offset) throws RemoteStorageException { + return withReadLockAndEnsureInitialized( + () -> remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset)); } @Override public Optional highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException { - lock.readLock().lock(); - try { - - ensureInitializedAndNotClosed(); - - return remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch); - } finally { - lock.readLock().unlock(); - } - + return withReadLockAndEnsureInitialized( + () -> remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch)); } @Override public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException { Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition); - } finally { - lock.readLock().unlock(); - } + return withReadLockAndEnsureInitialized( + () -> remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition)); } @Override public Iterator listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) throws RemoteStorageException { Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); - - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - - return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch); - } finally { - lock.readLock().unlock(); - } + return withReadLockAndEnsureInitialized( + () -> remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch)); } @Override @@ -277,17 +210,14 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana Set followerPartitions) { Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be null"); Objects.requireNonNull(followerPartitions, "followerPartitions can not be null"); - log.info("Received leadership notifications with leader partitions {} and follower partitions {}", leaderPartitions, followerPartitions); - lock.readLock().lock(); try { if (closing.get()) { throw new IllegalStateException("This instance is in closing state"); } - - HashSet allPartitions = new HashSet<>(leaderPartitions); + Set allPartitions = new HashSet<>(leaderPartitions); allPartitions.addAll(followerPartitions); if (!initialized.get()) { // If it is not yet initialized, then keep them as pending partitions and assign them @@ -305,7 +235,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana for (TopicIdPartition partition : allPartitions) { remotePartitionMetadataStore.maybeLoadPartition(partition); } - consumerManager.addAssignmentsForPartitions(allPartitions); } @@ -316,7 +245,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana if (closing.get()) { throw new IllegalStateException("This instance is in closing state"); } - if (!initialized.get()) { // If it is not yet initialized, then remove them from the pending partitions if any. if (!pendingAssignPartitions.isEmpty()) { @@ -339,7 +267,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana // we reached a consensus that sequential iteration over files on the local file system is performant enough. // Should this stop being the case, the remote log size could be calculated by incrementing/decrementing // counters during API calls for a more performant implementation. - Iterator remoteLogSegmentMetadataIterator = remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch); + Iterator remoteLogSegmentMetadataIterator = + remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch); while (remoteLogSegmentMetadataIterator.hasNext()) { RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next(); remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes(); @@ -348,40 +277,30 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } @Override - public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException { - lock.readLock().lock(); - try { - ensureInitializedAndNotClosed(); - return remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset); - } finally { - lock.readLock().unlock(); - } + public Optional nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, + int epoch, + long offset) throws RemoteStorageException { + return withReadLockAndEnsureInitialized( + () -> remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset)); } @Override public void configure(Map configs) { Objects.requireNonNull(configs, "configs can not be null."); - lock.writeLock().lock(); try { - if (configured) { + if (configured.compareAndSet(false, true)) { + TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); + // Scheduling the initialization producer/consumer managers in a separate thread. Required resources may + // not yet be available now. This thread makes sure that it is retried at regular intervals until it is + // successful. + initializationThread = KafkaThread.nonDaemon( + "RLMMInitializationThread", () -> initializeResources(rlmmConfig)); + initializationThread.start(); + log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); + } else { log.info("Skipping configure as it is already configured."); - return; } - - log.info("Started configuring topic-based RLMM with configs: {}", configs); - - rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); - rlmTopicPartitioner = remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount()); - remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get(); - configured = true; - log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); - - // Scheduling the initialization producer/consumer managers in a separate thread. Required resources may - // not yet be available now. This thread makes sure that it is retried at regular intervals until it is - // successful. - initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources); - initializationThread.start(); } finally { lock.writeLock().unlock(); } @@ -392,93 +311,74 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana return remotePartitionMetadataStore.isInitialized(topicIdPartition); } - private void initializeResources() { + private void handleRetry(long retryIntervalMs) { + log.info("Sleep for {} ms before retrying.", retryIntervalMs); + Utils.sleep(retryIntervalMs); + } + + private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) { log.info("Initializing topic-based RLMM resources"); - final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); - boolean topicCreated = false; + int metadataTopicPartitionCount = rlmmConfig.metadataTopicPartitionsCount(); + long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs(); + long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs(); + RemoteLogMetadataTopicPartitioner partitioner = partitionerFunction.apply(metadataTopicPartitionCount); + NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig); + boolean isTopicCreated = false; long startTimeMs = time.milliseconds(); - Admin adminClient = null; - try { - adminClient = Admin.create(rlmmConfig.commonProperties()); - // Stop if it is already initialized or closing. - while (!(initialized.get() || closing.get())) { - - // If it is timed out then raise an error to exit. - if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) { - log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.", - rlmmConfig.initializationRetryMaxTimeoutMs()); + try (Admin admin = Admin.create(rlmmConfig.commonProperties())) { + while (!(initialized.get() || closing.get() || initializationFailed)) { + if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) { + log.error("Timed out to initialize the resources within {} ms.", retryMaxTimeoutMs); initializationFailed = true; - return; + break; } - - if (!topicCreated) { - topicCreated = createTopic(adminClient, remoteLogMetadataTopicRequest); - } - - if (!topicCreated) { - // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again. - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); + isTopicCreated = isTopicCreated || createTopic(admin, newTopic); + if (!isTopicCreated) { + handleRetry(retryIntervalMs); + continue; + } + try { + if (!isPartitionsCountSameAsConfigured(admin, newTopic.name(), metadataTopicPartitionCount)) { + initializationFailed = true; + break; + } + } catch (Exception e) { + handleRetry(retryIntervalMs); continue; - } else { - // If topic is already created, validate the existing topic partitions. - try { - String topicName = remoteLogMetadataTopicRequest.name(); - // If the existing topic partition size is not same as configured, mark initialization as failed and exit. - if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) { - initializationFailed = true; - } - } catch (Exception e) { - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); - continue; - } } - // Create producer and consumer managers. lock.writeLock().lock(); try { - producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner); - consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time); - if (startConsumerThread) { - consumerManager.startConsumerThread(); - } else { - log.info("RLMM Consumer task thread is not configured to be started."); - } - + producerManager = new ProducerManager(rlmmConfig, partitioner); + consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, partitioner, time); + consumerManager.startConsumerThread(); if (!pendingAssignPartitions.isEmpty()) { assignPartitions(pendingAssignPartitions); pendingAssignPartitions.clear(); } - initialized.set(true); log.info("Initialized topic-based RLMM resources successfully"); } catch (Exception e) { log.error("Encountered error while initializing producer/consumer", e); initializationFailed = true; - return; } finally { lock.writeLock().unlock(); } } - } catch (Exception e) { + } catch (KafkaException e) { log.error("Encountered error while initializing topic-based RLMM resources", e); initializationFailed = true; - } finally { - Utils.closeQuietly(adminClient, "AdminClient"); } } - boolean doesTopicExist(Admin adminClient, String topic) throws ExecutionException, InterruptedException { + boolean doesTopicExist(Admin admin, String topic) throws ExecutionException, InterruptedException { try { - TopicDescription description = adminClient.describeTopics(Set.of(topic)) + TopicDescription description = admin.describeTopics(Set.of(topic)) .topicNameValues() .get(topic) .get(); - if (description != null) { - log.info("Topic {} exists. TopicId: {}, numPartitions: {}, ", topic, - description.topicId(), description.partitions().size()); - } + log.info("Topic {} exists. TopicId: {}, numPartitions: {}", topic, description.topicId(), + description.partitions().size()); return true; } catch (ExecutionException | InterruptedException ex) { if (ex.getCause() instanceof UnknownTopicOrPartitionException) { @@ -489,24 +389,25 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } } - private boolean isPartitionsCountSameAsConfigured(Admin adminClient, - String topicName) throws InterruptedException, ExecutionException { + private boolean isPartitionsCountSameAsConfigured(Admin admin, + String topicName, + int metadataTopicPartitionCount) throws InterruptedException, ExecutionException { log.debug("Getting topic details to check for partition count and replication factor."); - TopicDescription topicDescription = adminClient.describeTopics(Set.of(topicName)) - .topicNameValues().get(topicName).get(); - int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount(); + TopicDescription topicDescription = admin + .describeTopics(Set.of(topicName)) + .topicNameValues() + .get(topicName) + .get(); int topicPartitionsSize = topicDescription.partitions().size(); - - if (topicPartitionsSize != expectedPartitions) { - log.error("Existing topic partition count [{}] is not same as the expected partition count [{}]", - topicPartitionsSize, expectedPartitions); + if (topicPartitionsSize != metadataTopicPartitionCount) { + log.error("Existing topic partition count {} is not same as the expected partition count {}", + topicPartitionsSize, metadataTopicPartitionCount); return false; } - return true; } - private NewTopic createRemoteLogMetadataTopicRequest() { + private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) { Map topicConfigs = new HashMap<>(); topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs())); topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); @@ -520,13 +421,13 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana * @param newTopic topic to be created. * @return Returns true if the topic already exists, or it is created successfully. */ - private boolean createTopic(Admin adminClient, NewTopic newTopic) { + private boolean createTopic(Admin admin, NewTopic newTopic) { boolean doesTopicExist = false; String topic = newTopic.name(); try { - doesTopicExist = doesTopicExist(adminClient, topic); + doesTopicExist = doesTopicExist(admin, topic); if (!doesTopicExist) { - CreateTopicsResult result = adminClient.createTopics(Set.of(newTopic)); + CreateTopicsResult result = admin.createTopics(Set.of(newTopic)); result.all().get(); List overriddenConfigs = result.config(topic).get() .entries() @@ -543,7 +444,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana // This exception can still occur as multiple brokers may call create topics and one of them may become // successful and other would throw TopicExistsException if (e.getCause() instanceof TopicExistsException) { - log.info("Topic [{}] already exists", topic); + log.info("Topic: {} already exists", topic); doesTopicExist = true; } else { log.error("Encountered error while querying or creating {} topic.", topic, e); @@ -583,11 +484,31 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana log.error("Initialization thread was interrupted while waiting to join on close.", e); } } - Utils.closeQuietly(producerManager, "ProducerTask"); Utils.closeQuietly(consumerManager, "RLMMConsumerManager"); Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore"); log.info("Closed topic-based RLMM resources"); } } + + private T withReadLockAndEnsureInitialized(ThrowingSupplier action) throws RemoteStorageException { + lock.readLock().lock(); + try { + ensureInitializedAndNotClosed(); + return action.get(); + } finally { + lock.readLock().unlock(); + } + } + + @FunctionalInterface + public interface ThrowingSupplier { + /** + * Supplies a result, potentially throwing an exception. + * + * @return the supplied result. + * @throws E an exception that may be thrown during execution. + */ + T get() throws E; + } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java index 9c79fad397d..149ebb177d9 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java @@ -44,7 +44,6 @@ public class RemoteLogMetadataManagerTestUtils { public static class Builder { private String bootstrapServers; - private boolean startConsumerThread; private Map overrideRemoteLogMetadataManagerProps = Map.of(); private Supplier remotePartitionMetadataStore = RemotePartitionMetadataStore::new; private Function remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new; @@ -57,11 +56,6 @@ public class RemoteLogMetadataManagerTestUtils { return this; } - public Builder startConsumerThread(boolean startConsumerThread) { - this.startConsumerThread = startConsumerThread; - return this; - } - public Builder remotePartitionMetadataStore(Supplier remotePartitionMetadataStore) { this.remotePartitionMetadataStore = remotePartitionMetadataStore; return this; @@ -81,8 +75,7 @@ public class RemoteLogMetadataManagerTestUtils { Objects.requireNonNull(bootstrapServers); String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = - new TopicBasedRemoteLogMetadataManager(startConsumerThread, - remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore); + new TopicBasedRemoteLogMetadataManager(remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore); // Initialize TopicBasedRemoteLogMetadataManager. Map configs = new HashMap<>(); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java index 1e91625e2bd..6dfd948dd09 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java @@ -69,7 +69,6 @@ public class RemoteLogSegmentLifecycleTest { private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { return RemoteLogMetadataManagerTestUtils.builder() .bootstrapServers(clusterInstance.bootstrapServers()) - .startConsumerThread(true) .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore) .build(); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java index c867b68635f..62befa0f6e3 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java @@ -98,7 +98,6 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest { try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() .bootstrapServers(clusterInstance.bootstrapServers()) - .startConsumerThread(true) .remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) { @Override public int metadataPartition(TopicIdPartition topicIdPartition) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 908c9ef014c..a407882e42a 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -48,7 +48,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { return RemoteLogMetadataManagerTestUtils.builder() .bootstrapServers(clusterInstance.bootstrapServers()) - .startConsumerThread(true) .remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new) .overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir)) .build(); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 4e40b7c7018..9fe8572cb4f 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -72,7 +72,6 @@ public class TopicBasedRemoteLogMetadataManagerTest { if (remoteLogMetadataManager == null) remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() .bootstrapServers(clusterInstance.bootstrapServers()) - .startConsumerThread(true) .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataEventHandler) .build(); return remoteLogMetadataManager; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java index ce471a24999..8875d8d6a93 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java @@ -57,7 +57,6 @@ public class RemoteLogMetadataManagerTest { private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { return RemoteLogMetadataManagerTestUtils.builder() .bootstrapServers(clusterInstance.bootstrapServers()) - .startConsumerThread(true) .remotePartitionMetadataStore(RemotePartitionMetadataStore::new) .build(); }