diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 9cd0515ad35..237619bceea 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -474,9 +474,9 @@ public class RemoteLogManager implements Closeable { if (topicIdByPartitionMap.containsKey(tp)) { TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp); leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { - LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId); + LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId); task.cancel(); - LOGGER.info("Resetting remote copy lag metrics for tpId: {}", tpId); + LOGGER.info("Resetting remote copy lag metrics for partition: {}", tpId); ((RLMCopyTask) task.rlmTask).resetLagStats(); return null; }); @@ -501,17 +501,17 @@ public class RemoteLogManager implements Closeable { if (topicIdByPartitionMap.containsKey(tp)) { TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp); leaderCopyRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { - LOGGER.info("Cancelling the copy RLM task for tpId: {}", tpId); + LOGGER.info("Cancelling the copy RLM task for partition: {}", tpId); task.cancel(); return null; }); leaderExpirationRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { - LOGGER.info("Cancelling the expiration RLM task for tpId: {}", tpId); + LOGGER.info("Cancelling the expiration RLM task for partition: {}", tpId); task.cancel(); return null; }); followerRLMTasks.computeIfPresent(tpId, (topicIdPartition, task) -> { - LOGGER.info("Cancelling the follower RLM task for tpId: {}", tpId); + LOGGER.info("Cancelling the follower RLM task for partition: {}", tpId); task.cancel(); return null; }); @@ -790,8 +790,14 @@ public class RemoteLogManager implements Closeable { } public void run() { - if (isCancelled()) + if (isCancelled()) { + logger.debug("Skipping the current run for partition {} as it is cancelled", topicIdPartition); return; + } + if (!remoteLogMetadataManager.isReady(topicIdPartition)) { + logger.debug("Skipping the current run for partition {} as the remote-log metadata is not ready", topicIdPartition); + return; + } try { Optional unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition()); @@ -803,13 +809,13 @@ public class RemoteLogManager implements Closeable { execute(unifiedLogOptional.get()); } catch (InterruptedException ex) { if (!isCancelled()) { - logger.warn("Current thread for topic-partition-id {} is interrupted", topicIdPartition, ex); + logger.warn("Current thread for partition {} is interrupted", topicIdPartition, ex); } } catch (RetriableException ex) { - logger.debug("Encountered a retryable error while executing current task for topic-partition {}", topicIdPartition, ex); + logger.debug("Encountered a retryable error while executing current task for partition {}", topicIdPartition, ex); } catch (Exception ex) { if (!isCancelled()) { - logger.warn("Current task for topic-partition {} received error but it will be scheduled", topicIdPartition, ex); + logger.warn("Current task for partition {} received error but it will be scheduled", topicIdPartition, ex); } } } diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 7bd77ba1052..881da2b32c9 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -165,12 +165,16 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -259,6 +263,7 @@ public class RemoteLogManagerTest { return 0L; } }; + doReturn(true).when(remoteLogMetadataManager).isReady(any(TopicIdPartition.class)); } @AfterEach @@ -2144,11 +2149,6 @@ public class RemoteLogManagerTest { Set partitions = new HashSet<>(); partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true, true)); partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true, true)); - remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), - Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); - assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); - assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition)); - assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition)); when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition))) .thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator()); @@ -2159,6 +2159,12 @@ public class RemoteLogManagerTest { when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any())) .thenReturn(dummyFuture); + remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)), + Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds); + assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition)); + remoteLogManager.stopPartitions(partitions, errorHandler); assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition)); assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition)); @@ -3643,6 +3649,36 @@ public class RemoteLogManagerTest { assertEquals(273, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment); } + @Test + public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class))) + .thenAnswer(ans -> { + latch.countDown(); + return false; + }); + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), + Collections.singleton(mockPartition(followerTopicIdPartition)), + topicIds + ); + assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition)); + + // Once the partitions are assigned to the broker either as leader (or) follower in RLM#onLeadershipChange, + // then it should have called the `isReady` method for each of the partitions. Otherwise, the test will fail. + latch.await(5, TimeUnit.SECONDS); + verify(remoteLogMetadataManager).configure(anyMap()); + verify(remoteLogMetadataManager).onPartitionLeadershipChanges(anySet(), anySet()); + verify(remoteLogMetadataManager, atLeastOnce()).isReady(eq(leaderTopicIdPartition)); + verify(remoteLogMetadataManager, atLeastOnce()).isReady(eq(followerTopicIdPartition)); + verifyNoMoreInteractions(remoteLogMetadataManager); + verify(remoteStorageManager).configure(anyMap()); + verifyNoMoreInteractions(remoteStorageManager); + } + private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException { byte magic = RecordBatch.CURRENT_MAGIC_VALUE; Compression compression = Compression.NONE; diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java index 2280aa51132..efc37128ab2 100644 --- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java @@ -230,4 +230,14 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable { long offset) throws RemoteStorageException { return remoteLogSegmentMetadata(topicIdPartition, epoch, offset); } + + /** + * Denotes whether the partition metadata is ready to serve. + * + * @param topicIdPartition topic partition + * @return True if the partition is ready to serve for remote storage operations. + */ + default boolean isReady(TopicIdPartition topicIdPartition) { + return true; + } } \ No newline at end of file diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java index 1abcbbc20ce..5d5cba2ca11 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java @@ -111,6 +111,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetada return withClassLoader(() -> delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset)); } + @Override + public boolean isReady(TopicIdPartition topicIdPartition) { + return withClassLoader(() -> delegate.isReady(topicIdPartition)); + } + @Override public void configure(Map configs) { withClassLoader(() -> { diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index a5db1ea38ef..58d571630d2 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -396,6 +396,11 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } } + @Override + public boolean isReady(TopicIdPartition topicIdPartition) { + return remotePartitionMetadataStore.isInitialized(topicIdPartition); + } + private void initializeResources() { log.info("Initializing topic-based RLMM resources"); final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 9937a9f37ae..8fc8efc7cd3 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -152,6 +152,9 @@ public class TopicBasedRemoteLogMetadataManagerTest { assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition)); assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition)); + assertFalse(topicBasedRlmm().isReady(newLeaderTopicIdPartition)); + assertFalse(topicBasedRlmm().isReady(newFollowerTopicIdPartition)); + topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(newLeaderTopicIdPartition), Collections.singleton(newFollowerTopicIdPartition)); @@ -166,6 +169,9 @@ public class TopicBasedRemoteLogMetadataManagerTest { verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata); assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); + + assertTrue(topicBasedRlmm().isReady(newLeaderTopicIdPartition)); + assertTrue(topicBasedRlmm().isReady(newFollowerTopicIdPartition)); } @ClusterTest