KAFKA-19525: Refactor TopicBasedRLMM implementation to remove unused code (#20204)
CI / build (push) Waiting to run Details

- startConsumerThread is always true so removed the variable.
- Replaced the repetitive lock handling logic with
`withReadLockAndEnsureInitialized` to reduce duplication and improve
readability.
- Consolidated the logic in `initializeResources` and. simplified method
arguments to better encapsulate configuration.
- Extracted common code and reduced the usage of global variables.
- Named the variables properly.

Tests:
- Existing UTs since this patch refactored the code.

Reviewers: PoAn Yang <payang@apache.org>
This commit is contained in:
Kamal Chandraprakash 2025-07-23 12:19:13 +05:30 committed by GitHub
parent f1e9aa1c65
commit 16c079ed23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 143 additions and 234 deletions

View File

@ -56,28 +56,25 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
* This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name
* {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
* This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with
* {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class, and it subscribes * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class, and it subscribes
* to metadata updates for the registered user topic partitions. * to metadata updates for the registered user topic partitions.
*/ */
public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
private final Time time = Time.SYSTEM;
private volatile boolean configured = false; private final AtomicBoolean configured = new AtomicBoolean(false);
// It indicates whether the close process of this instance is started or not via #close() method.
// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
// if the field is read but not updated in a spin loop like in #initializeResources() method.
private final AtomicBoolean closing = new AtomicBoolean(false); private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false); private final AtomicBoolean initialized = new AtomicBoolean(false);
private final Time time = Time.SYSTEM;
private final boolean startConsumerThread;
private Thread initializationThread; private Thread initializationThread;
private volatile ProducerManager producerManager; private volatile ProducerManager producerManager;
@ -85,112 +82,77 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new // This allows to gracefully close this instance using {@link #close()} method while there are some pending or new
// requests calling different methods which use the resources like producer/consumer managers. // requests calling different methods which use the resources like producer/consumer managers.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final RemotePartitionMetadataStore remotePartitionMetadataStore;
private RemotePartitionMetadataStore remotePartitionMetadataStore;
private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>()); private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed; private volatile boolean initializationFailed = false;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier; private final Function<Integer, RemoteLogMetadataTopicPartitioner> partitionerFunction;
private final Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction;
/**
* The default constructor delegates to the internal one, starting the consumer thread and
* supplying an instance of RemoteLogMetadataTopicPartitioner and RemotePartitionMetadataStore by default.
*/
public TopicBasedRemoteLogMetadataManager() { public TopicBasedRemoteLogMetadataManager() {
this(true, RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new); this(RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new);
}
TopicBasedRemoteLogMetadataManager(Function<Integer, RemoteLogMetadataTopicPartitioner> partitionerFunction,
Supplier<RemotePartitionMetadataStore> metadataStoreSupplier) {
this.partitionerFunction = partitionerFunction;
this.remotePartitionMetadataStore = metadataStoreSupplier.get();
} }
/** /**
* Used in tests to dynamically configure the instance. * Adds metadata for a remote log segment to the metadata store.
* The provided metadata must have the state {@code COPY_SEGMENT_STARTED}.
*
* @param remoteLogSegmentMetadata the metadata of the remote log segment to be added; must not be null
* @return a {@link CompletableFuture} that completes once the metadata has been published to the topic
* @throws RemoteStorageException if an error occurs while storing the metadata
* @throws IllegalArgumentException if the state of the provided metadata is not {@code COPY_SEGMENT_STARTED}
*/ */
TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
this.startConsumerThread = startConsumerThread;
this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier;
this.remoteLogMetadataTopicPartitionerFunction = remoteLogMetadataTopicPartitionerFunction;
}
@Override @Override
public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
return withReadLockAndEnsureInitialized(() -> {
// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers
// closing the producer/consumer manager instances.
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
// This method is allowed only to add remote log segment with the initial state(which is RemoteLogSegmentState.COPY_SEGMENT_STARTED)
// but not to update the existing remote log segment metadata.
if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) { if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED "Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED
+ " but it contains state as: " + remoteLogSegmentMetadata.state()); + " but it contains state as: " + remoteLogSegmentMetadata.state());
} }
return storeRemoteLogMetadata(remoteLogSegmentMetadata);
// Publish the message to the topic. });
return storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(),
remoteLogSegmentMetadata);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metadataUpdate)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(segmentMetadataUpdate, "segmentMetadataUpdate can not be null"); Objects.requireNonNull(metadataUpdate, "metadataUpdate can not be null");
return withReadLockAndEnsureInitialized(() -> {
lock.readLock().lock(); if (metadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
try { throw new IllegalArgumentException("Given remoteLogSegmentMetadataUpdate should not have the state as: "
ensureInitializedAndNotClosed();
// Callers should use addRemoteLogSegmentMetadata to add RemoteLogSegmentMetadata with state as
// RemoteLogSegmentState.COPY_SEGMENT_STARTED.
if (segmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: "
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED); + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
} }
return storeRemoteLogMetadata(metadataUpdate);
// Publish the message to the topic. });
return storeRemoteLogMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata deleteMetadata)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); Objects.requireNonNull(deleteMetadata, "deleteMetadata can not be null");
return withReadLockAndEnsureInitialized(
lock.readLock().lock(); () -> storeRemoteLogMetadata(deleteMetadata));
try {
ensureInitializedAndNotClosed();
return storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
} finally {
lock.readLock().unlock();
}
} }
/** /**
* Returns {@link CompletableFuture} which will complete only after publishing of the given {@code remoteLogMetadata} into * Returns {@link CompletableFuture} which will complete only after publishing of the given {@code remoteLogMetadata} into
* the remote log metadata topic and the internal consumer is caught up until the produced record's offset. * the remote log metadata topic and the internal consumer is caught up until the produced record's offset.
* *
* @param topicIdPartition partition of the given remoteLogMetadata.
* @param remoteLogMetadata RemoteLogMetadata to be stored. * @param remoteLogMetadata RemoteLogMetadata to be stored.
* @return a future with acknowledge and potentially waiting also for consumer to catch up. * @return a future with acknowledge and potentially waiting also for consumer to catch up.
* This ensures cache is synchronized with backing topic. * This ensures cache is synchronized with backing topic.
* @throws RemoteStorageException if there are any storage errors occur. * @throws RemoteStorageException if there are any storage errors occur.
*/ */
private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition, private CompletableFuture<Void> storeRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException {
RemoteLogMetadata remoteLogMetadata) log.debug("Storing the partition: {} metadata: {}", remoteLogMetadata.topicIdPartition(), remoteLogMetadata);
throws RemoteStorageException {
log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata);
try { try {
// Publish the message to the metadata topic. // Publish the message to the metadata topic.
CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata); CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata);
@ -214,62 +176,33 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
@Override @Override
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
int epochForOffset, int epochForOffset,
long offset) long offset) throws RemoteStorageException {
throws RemoteStorageException { return withReadLockAndEnsureInitialized(
lock.readLock().lock(); () -> remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset));
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, offset, epochForOffset);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
int leaderEpoch) int leaderEpoch)
throws RemoteStorageException { throws RemoteStorageException {
lock.readLock().lock(); return withReadLockAndEnsureInitialized(
try { () -> remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch));
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.highestLogOffset(topicIdPartition, leaderEpoch);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return withReadLockAndEnsureInitialized(
lock.readLock().lock(); () -> remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition));
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException { throws RemoteStorageException {
Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null"); Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
return withReadLockAndEnsureInitialized(
lock.readLock().lock(); () -> remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch));
try {
ensureInitializedAndNotClosed();
return remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
@ -277,17 +210,14 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
Set<TopicIdPartition> followerPartitions) { Set<TopicIdPartition> followerPartitions) {
Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be null"); Objects.requireNonNull(leaderPartitions, "leaderPartitions can not be null");
Objects.requireNonNull(followerPartitions, "followerPartitions can not be null"); Objects.requireNonNull(followerPartitions, "followerPartitions can not be null");
log.info("Received leadership notifications with leader partitions {} and follower partitions {}", log.info("Received leadership notifications with leader partitions {} and follower partitions {}",
leaderPartitions, followerPartitions); leaderPartitions, followerPartitions);
lock.readLock().lock(); lock.readLock().lock();
try { try {
if (closing.get()) { if (closing.get()) {
throw new IllegalStateException("This instance is in closing state"); throw new IllegalStateException("This instance is in closing state");
} }
Set<TopicIdPartition> allPartitions = new HashSet<>(leaderPartitions);
HashSet<TopicIdPartition> allPartitions = new HashSet<>(leaderPartitions);
allPartitions.addAll(followerPartitions); allPartitions.addAll(followerPartitions);
if (!initialized.get()) { if (!initialized.get()) {
// If it is not yet initialized, then keep them as pending partitions and assign them // If it is not yet initialized, then keep them as pending partitions and assign them
@ -305,7 +235,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
for (TopicIdPartition partition : allPartitions) { for (TopicIdPartition partition : allPartitions) {
remotePartitionMetadataStore.maybeLoadPartition(partition); remotePartitionMetadataStore.maybeLoadPartition(partition);
} }
consumerManager.addAssignmentsForPartitions(allPartitions); consumerManager.addAssignmentsForPartitions(allPartitions);
} }
@ -316,7 +245,6 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
if (closing.get()) { if (closing.get()) {
throw new IllegalStateException("This instance is in closing state"); throw new IllegalStateException("This instance is in closing state");
} }
if (!initialized.get()) { if (!initialized.get()) {
// If it is not yet initialized, then remove them from the pending partitions if any. // If it is not yet initialized, then remove them from the pending partitions if any.
if (!pendingAssignPartitions.isEmpty()) { if (!pendingAssignPartitions.isEmpty()) {
@ -339,7 +267,8 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
// we reached a consensus that sequential iteration over files on the local file system is performant enough. // we reached a consensus that sequential iteration over files on the local file system is performant enough.
// Should this stop being the case, the remote log size could be calculated by incrementing/decrementing // Should this stop being the case, the remote log size could be calculated by incrementing/decrementing
// counters during API calls for a more performant implementation. // counters during API calls for a more performant implementation.
Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator = remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch); Iterator<RemoteLogSegmentMetadata> remoteLogSegmentMetadataIterator =
remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, leaderEpoch);
while (remoteLogSegmentMetadataIterator.hasNext()) { while (remoteLogSegmentMetadataIterator.hasNext()) {
RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next(); RemoteLogSegmentMetadata remoteLogSegmentMetadata = remoteLogSegmentMetadataIterator.next();
remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes(); remoteLogSize += remoteLogSegmentMetadata.segmentSizeInBytes();
@ -348,40 +277,30 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
} }
@Override @Override
public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long offset) throws RemoteStorageException { public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
lock.readLock().lock(); int epoch,
try { long offset) throws RemoteStorageException {
ensureInitializedAndNotClosed(); return withReadLockAndEnsureInitialized(
return remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset); () -> remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
} finally {
lock.readLock().unlock();
}
} }
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null."); Objects.requireNonNull(configs, "configs can not be null.");
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
if (configured) { if (configured.compareAndSet(false, true)) {
log.info("Skipping configure as it is already configured."); TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
return;
}
log.info("Started configuring topic-based RLMM with configs: {}", configs);
rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmTopicPartitioner = remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get();
configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
// Scheduling the initialization producer/consumer managers in a separate thread. Required resources may // Scheduling the initialization producer/consumer managers in a separate thread. Required resources may
// not yet be available now. This thread makes sure that it is retried at regular intervals until it is // not yet be available now. This thread makes sure that it is retried at regular intervals until it is
// successful. // successful.
initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources); initializationThread = KafkaThread.nonDaemon(
"RLMMInitializationThread", () -> initializeResources(rlmmConfig));
initializationThread.start(); initializationThread.start();
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
} else {
log.info("Skipping configure as it is already configured.");
}
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
@ -392,93 +311,74 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
return remotePartitionMetadataStore.isInitialized(topicIdPartition); return remotePartitionMetadataStore.isInitialized(topicIdPartition);
} }
private void initializeResources() { private void handleRetry(long retryIntervalMs) {
log.info("Sleep for {} ms before retrying.", retryIntervalMs);
Utils.sleep(retryIntervalMs);
}
private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
log.info("Initializing topic-based RLMM resources"); log.info("Initializing topic-based RLMM resources");
final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); int metadataTopicPartitionCount = rlmmConfig.metadataTopicPartitionsCount();
boolean topicCreated = false; long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs();
long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs();
RemoteLogMetadataTopicPartitioner partitioner = partitionerFunction.apply(metadataTopicPartitionCount);
NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig);
boolean isTopicCreated = false;
long startTimeMs = time.milliseconds(); long startTimeMs = time.milliseconds();
Admin adminClient = null; try (Admin admin = Admin.create(rlmmConfig.commonProperties())) {
try { while (!(initialized.get() || closing.get() || initializationFailed)) {
adminClient = Admin.create(rlmmConfig.commonProperties()); if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) {
// Stop if it is already initialized or closing. log.error("Timed out to initialize the resources within {} ms.", retryMaxTimeoutMs);
while (!(initialized.get() || closing.get())) {
// If it is timed out then raise an error to exit.
if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) {
log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.",
rlmmConfig.initializationRetryMaxTimeoutMs());
initializationFailed = true; initializationFailed = true;
return; break;
} }
isTopicCreated = isTopicCreated || createTopic(admin, newTopic);
if (!topicCreated) { if (!isTopicCreated) {
topicCreated = createTopic(adminClient, remoteLogMetadataTopicRequest); handleRetry(retryIntervalMs);
}
if (!topicCreated) {
// Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again.
log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue; continue;
} else { }
// If topic is already created, validate the existing topic partitions.
try { try {
String topicName = remoteLogMetadataTopicRequest.name(); if (!isPartitionsCountSameAsConfigured(admin, newTopic.name(), metadataTopicPartitionCount)) {
// If the existing topic partition size is not same as configured, mark initialization as failed and exit.
if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) {
initializationFailed = true; initializationFailed = true;
break;
} }
} catch (Exception e) { } catch (Exception e) {
log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); handleRetry(retryIntervalMs);
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue; continue;
} }
}
// Create producer and consumer managers. // Create producer and consumer managers.
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner); producerManager = new ProducerManager(rlmmConfig, partitioner);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time); consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, partitioner, time);
if (startConsumerThread) {
consumerManager.startConsumerThread(); consumerManager.startConsumerThread();
} else {
log.info("RLMM Consumer task thread is not configured to be started.");
}
if (!pendingAssignPartitions.isEmpty()) { if (!pendingAssignPartitions.isEmpty()) {
assignPartitions(pendingAssignPartitions); assignPartitions(pendingAssignPartitions);
pendingAssignPartitions.clear(); pendingAssignPartitions.clear();
} }
initialized.set(true); initialized.set(true);
log.info("Initialized topic-based RLMM resources successfully"); log.info("Initialized topic-based RLMM resources successfully");
} catch (Exception e) { } catch (Exception e) {
log.error("Encountered error while initializing producer/consumer", e); log.error("Encountered error while initializing producer/consumer", e);
initializationFailed = true; initializationFailed = true;
return;
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
} catch (Exception e) { } catch (KafkaException e) {
log.error("Encountered error while initializing topic-based RLMM resources", e); log.error("Encountered error while initializing topic-based RLMM resources", e);
initializationFailed = true; initializationFailed = true;
} finally {
Utils.closeQuietly(adminClient, "AdminClient");
} }
} }
boolean doesTopicExist(Admin adminClient, String topic) throws ExecutionException, InterruptedException { boolean doesTopicExist(Admin admin, String topic) throws ExecutionException, InterruptedException {
try { try {
TopicDescription description = adminClient.describeTopics(Set.of(topic)) TopicDescription description = admin.describeTopics(Set.of(topic))
.topicNameValues() .topicNameValues()
.get(topic) .get(topic)
.get(); .get();
if (description != null) { log.info("Topic {} exists. TopicId: {}, numPartitions: {}", topic, description.topicId(),
log.info("Topic {} exists. TopicId: {}, numPartitions: {}, ", topic, description.partitions().size());
description.topicId(), description.partitions().size());
}
return true; return true;
} catch (ExecutionException | InterruptedException ex) { } catch (ExecutionException | InterruptedException ex) {
if (ex.getCause() instanceof UnknownTopicOrPartitionException) { if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
@ -489,24 +389,25 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
} }
} }
private boolean isPartitionsCountSameAsConfigured(Admin adminClient, private boolean isPartitionsCountSameAsConfigured(Admin admin,
String topicName) throws InterruptedException, ExecutionException { String topicName,
int metadataTopicPartitionCount) throws InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and replication factor."); log.debug("Getting topic details to check for partition count and replication factor.");
TopicDescription topicDescription = adminClient.describeTopics(Set.of(topicName)) TopicDescription topicDescription = admin
.topicNameValues().get(topicName).get(); .describeTopics(Set.of(topicName))
int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount(); .topicNameValues()
.get(topicName)
.get();
int topicPartitionsSize = topicDescription.partitions().size(); int topicPartitionsSize = topicDescription.partitions().size();
if (topicPartitionsSize != metadataTopicPartitionCount) {
if (topicPartitionsSize != expectedPartitions) { log.error("Existing topic partition count {} is not same as the expected partition count {}",
log.error("Existing topic partition count [{}] is not same as the expected partition count [{}]", topicPartitionsSize, metadataTopicPartitionCount);
topicPartitionsSize, expectedPartitions);
return false; return false;
} }
return true; return true;
} }
private NewTopic createRemoteLogMetadataTopicRequest() { private NewTopic newRemoteLogMetadataTopic(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) {
Map<String, String> topicConfigs = new HashMap<>(); Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs())); topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
@ -520,13 +421,13 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
* @param newTopic topic to be created. * @param newTopic topic to be created.
* @return Returns true if the topic already exists, or it is created successfully. * @return Returns true if the topic already exists, or it is created successfully.
*/ */
private boolean createTopic(Admin adminClient, NewTopic newTopic) { private boolean createTopic(Admin admin, NewTopic newTopic) {
boolean doesTopicExist = false; boolean doesTopicExist = false;
String topic = newTopic.name(); String topic = newTopic.name();
try { try {
doesTopicExist = doesTopicExist(adminClient, topic); doesTopicExist = doesTopicExist(admin, topic);
if (!doesTopicExist) { if (!doesTopicExist) {
CreateTopicsResult result = adminClient.createTopics(Set.of(newTopic)); CreateTopicsResult result = admin.createTopics(Set.of(newTopic));
result.all().get(); result.all().get();
List<String> overriddenConfigs = result.config(topic).get() List<String> overriddenConfigs = result.config(topic).get()
.entries() .entries()
@ -543,7 +444,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
// This exception can still occur as multiple brokers may call create topics and one of them may become // This exception can still occur as multiple brokers may call create topics and one of them may become
// successful and other would throw TopicExistsException // successful and other would throw TopicExistsException
if (e.getCause() instanceof TopicExistsException) { if (e.getCause() instanceof TopicExistsException) {
log.info("Topic [{}] already exists", topic); log.info("Topic: {} already exists", topic);
doesTopicExist = true; doesTopicExist = true;
} else { } else {
log.error("Encountered error while querying or creating {} topic.", topic, e); log.error("Encountered error while querying or creating {} topic.", topic, e);
@ -583,11 +484,31 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
log.error("Initialization thread was interrupted while waiting to join on close.", e); log.error("Initialization thread was interrupted while waiting to join on close.", e);
} }
} }
Utils.closeQuietly(producerManager, "ProducerTask"); Utils.closeQuietly(producerManager, "ProducerTask");
Utils.closeQuietly(consumerManager, "RLMMConsumerManager"); Utils.closeQuietly(consumerManager, "RLMMConsumerManager");
Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore"); Utils.closeQuietly(remotePartitionMetadataStore, "RemotePartitionMetadataStore");
log.info("Closed topic-based RLMM resources"); log.info("Closed topic-based RLMM resources");
} }
} }
private <T> T withReadLockAndEnsureInitialized(ThrowingSupplier<T, RemoteStorageException> action) throws RemoteStorageException {
lock.readLock().lock();
try {
ensureInitializedAndNotClosed();
return action.get();
} finally {
lock.readLock().unlock();
}
}
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
/**
* Supplies a result, potentially throwing an exception.
*
* @return the supplied result.
* @throws E an exception that may be thrown during execution.
*/
T get() throws E;
}
} }

View File

@ -44,7 +44,6 @@ public class RemoteLogMetadataManagerTestUtils {
public static class Builder { public static class Builder {
private String bootstrapServers; private String bootstrapServers;
private boolean startConsumerThread;
private Map<String, Object> overrideRemoteLogMetadataManagerProps = Map.of(); private Map<String, Object> overrideRemoteLogMetadataManagerProps = Map.of();
private Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStore = RemotePartitionMetadataStore::new; private Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
private Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new; private Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;
@ -57,11 +56,6 @@ public class RemoteLogMetadataManagerTestUtils {
return this; return this;
} }
public Builder startConsumerThread(boolean startConsumerThread) {
this.startConsumerThread = startConsumerThread;
return this;
}
public Builder remotePartitionMetadataStore(Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStore) { public Builder remotePartitionMetadataStore(Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStore) {
this.remotePartitionMetadataStore = remotePartitionMetadataStore; this.remotePartitionMetadataStore = remotePartitionMetadataStore;
return this; return this;
@ -81,8 +75,7 @@ public class RemoteLogMetadataManagerTestUtils {
Objects.requireNonNull(bootstrapServers); Objects.requireNonNull(bootstrapServers);
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager =
new TopicBasedRemoteLogMetadataManager(startConsumerThread, new TopicBasedRemoteLogMetadataManager(remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore);
remoteLogMetadataTopicPartitioner, remotePartitionMetadataStore);
// Initialize TopicBasedRemoteLogMetadataManager. // Initialize TopicBasedRemoteLogMetadataManager.
Map<String, Object> configs = new HashMap<>(); Map<String, Object> configs = new HashMap<>();

View File

@ -69,7 +69,6 @@ public class RemoteLogSegmentLifecycleTest {
private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { private RemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() {
return RemoteLogMetadataManagerTestUtils.builder() return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers()) .bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore) .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataStore)
.build(); .build();
} }

View File

@ -98,7 +98,6 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() try (TopicBasedRemoteLogMetadataManager remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers()) .bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) { .remoteLogMetadataTopicPartitioner(numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override @Override
public int metadataPartition(TopicIdPartition topicIdPartition) { public int metadataPartition(TopicIdPartition topicIdPartition) {

View File

@ -48,7 +48,6 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() {
return RemoteLogMetadataManagerTestUtils.builder() return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers()) .bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new) .remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
.overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir)) .overrideRemoteLogMetadataManagerProps(Map.of(LOG_DIR, logDir))
.build(); .build();

View File

@ -72,7 +72,6 @@ public class TopicBasedRemoteLogMetadataManagerTest {
if (remoteLogMetadataManager == null) if (remoteLogMetadataManager == null)
remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder() remoteLogMetadataManager = RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers()) .bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remotePartitionMetadataStore(() -> spyRemotePartitionMetadataEventHandler) .remotePartitionMetadataStore(() -> spyRemotePartitionMetadataEventHandler)
.build(); .build();
return remoteLogMetadataManager; return remoteLogMetadataManager;

View File

@ -57,7 +57,6 @@ public class RemoteLogMetadataManagerTest {
private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
return RemoteLogMetadataManagerTestUtils.builder() return RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers()) .bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remotePartitionMetadataStore(RemotePartitionMetadataStore::new) .remotePartitionMetadataStore(RemotePartitionMetadataStore::new)
.build(); .build();
} }