KAFKA-17980: Introduce `isReady` API in RemoteLogMetadataManager (#17737)

- The isReady API in RemoteLogMetadataManager (RLMM) is used to denote whether the partition metadata is ready for remote storage operations. The plugin implementors can use this API to denote the partition status while bootstrapping the RLMM.
- Using this API, we are gracefully starting the remote log components. The segment copy, delete, and other operations that hits remote storage will be invoked once the metadata is ready for a given partition.
- See KIP-1105 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1105%3A+Make+remote+log+manager+thread-pool+configs+dynamic) for more details.

Reviewers: Federico Valeri <fvaleri@redhat.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Kamal Chandraprakash 2024-11-12 22:47:48 +05:30 committed by GitHub
parent 6cf4081540
commit 5914013219
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 82 additions and 14 deletions

View File

@ -474,9 +474,9 @@ public class RemoteLogManager implements Closeable {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId);
LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId);
task.cancel();
LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId);
LOGGER.info("Resetting remote copy lag metrics for partition: {}", tpId);
((RLMCopyTask) task.rlmTask).resetLagStats();
return null;
});
@ -501,17 +501,17 @@ public class RemoteLogManager implements Closeable {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId);
LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId);
task.cancel();
return null;
});
leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the expiration RLM task for tpId: {}", tpId);
LOGGER.info("Cancelling the expiration RLM task for partition: {}", tpId);
task.cancel();
return null;
});
followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> {
LOGGER.info("Cancelling the follower RLM task for tpId: {}", tpId);
LOGGER.info("Cancelling the follower RLM task for partition: {}", tpId);
task.cancel();
return null;
});
@ -790,8 +790,14 @@ public class RemoteLogManager implements Closeable {
}
public void run() {
if (isCancelled())
if (isCancelled()) {
logger.debug("Skipping the current run for partition {} as it is cancelled", topicIdPartition);
return;
}
if (!remoteLogMetadataManager.isReady(topicIdPartition)) {
logger.debug("Skipping the current run for partition {} as the remote-log metadata is not ready", topicIdPartition);
return;
}
try {
Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition());
@ -803,13 +809,13 @@ public class RemoteLogManager implements Closeable {
execute(unifiedLogOptional.get());
} catch (InterruptedException ex) {
if (!isCancelled()) {
logger.warn("Current thread for topic-partition-id {} is interrupted", topicIdPartition, ex);
logger.warn("Current thread for partition {} is interrupted", topicIdPartition, ex);
}
} catch (RetriableException ex) {
logger.debug("Encountered a retryable error while executing current task for topic-partition {}", topicIdPartition, ex);
logger.debug("Encountered a retryable error while executing current task for partition {}", topicIdPartition, ex);
} catch (Exception ex) {
if (!isCancelled()) {
logger.warn("Current task for topic-partition {} received error but it will be scheduled", topicIdPartition, ex);
logger.warn("Current task for partition {} received error but it will be scheduled", topicIdPartition, ex);
}
}
}

View File

@ -165,12 +165,16 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -259,6 +263,7 @@ public class RemoteLogManagerTest {
return 0L;
}
};
doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class));
}
@AfterEach
@ -2144,11 +2149,6 @@ public class RemoteLogManagerTest {
Set<StopPartition> partitions = new HashSet<>();
partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true));
partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
@ -2159,6 +2159,12 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
.thenReturn(dummyFuture);
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
remoteLogManager.stopPartitions(partitions, errorHandler);
assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
@ -3643,6 +3649,36 @@ public class RemoteLogManagerTest {
assertEquals(273, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
}
@Test
public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class)))
.thenAnswer(ans -> {
latch.countDown();
return false;
});
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)),
topicIds
);
assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
// Once the partitions are assigned to the broker either as leader (or) follower in RLM#onLeadershipChange,
// then it should have called the `isReady` method for each of the partitions. Otherwise, the test will fail.
latch.await(5, TimeUnit.SECONDS);
verify(remoteLogMetadataManager).configure(anyMap());
verify(remoteLogMetadataManager).onPartitionLeadershipChanges(anySet(), anySet());
verify(remoteLogMetadataManager, atLeastOnce()).isReady(eq(leaderTopicIdPartition));
verify(remoteLogMetadataManager, atLeastOnce()).isReady(eq(followerTopicIdPartition));
verifyNoMoreInteractions(remoteLogMetadataManager);
verify(remoteStorageManager).configure(anyMap());
verifyNoMoreInteractions(remoteStorageManager);
}
private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException {
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
Compression compression = Compression.NONE;

View File

@ -230,4 +230,14 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
long offset) throws RemoteStorageException {
return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
}
/**
* Denotes whether the partition metadata is ready to serve.
*
* @param topicIdPartition topic partition
* @return True if the partition is ready to serve for remote storage operations.
*/
default boolean isReady(TopicIdPartition topicIdPartition) {
return true;
}
}

View File

@ -111,6 +111,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada
return withClassLoader(() -> delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
}
@Override
public boolean isReady(TopicIdPartition topicIdPartition) {
return withClassLoader(() -> delegate.isReady(topicIdPartition));
}
@Override
public void configure(Map<String, ?> configs) {
withClassLoader(() -> {

View File

@ -396,6 +396,11 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
}
}
@Override
public boolean isReady(TopicIdPartition topicIdPartition) {
return remotePartitionMetadataStore.isInitialized(topicIdPartition);
}
private void initializeResources() {
log.info("Initializing topic-based RLMM resources");
final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();

View File

@ -152,6 +152,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition));
assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition));
assertFalse(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
assertFalse(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition),
Collections.singleton(newFollowerTopicIdPartition));
@ -166,6 +169,9 @@ public class TopicBasedRemoteLogMetadataManagerTest {
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
assertTrue(topicBasedRlmm().isReady(newLeaderTopicIdPartition));
assertTrue(topicBasedRlmm().isReady(newFollowerTopicIdPartition));
}
@ClusterTest