From 47a1a7c70ed3f53583c33b8a00348e6e5a800d28 Mon Sep 17 00:00:00 2001 From: Nikhil Ramakrishnan Date: Thu, 21 Sep 2023 14:06:22 +0530 Subject: [PATCH] MINOR: Add additional tests for RemoteLogManager (#14311) Reviewers: Divij Vaidya --- .../kafka/log/remote/RemoteLogManager.java | 6 +- .../log/remote/RemoteLogManagerTest.java | 152 ++++++++++++++++++ 2 files changed, 156 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index d8f2144b3e3..5d1d7dc5ec7 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -1314,7 +1314,8 @@ public class RemoteLogManager implements Closeable { } } - private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + // Visible for testing + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); } @@ -1398,7 +1399,8 @@ public class RemoteLogManager implements Closeable { : Optional.empty(); } - private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + // Visible for testing + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { RecordBatch nextBatch; // Look for the batch which has the desired offset // We will always have a batch in that segment as it is a non-compacted topic. diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index bb66994b273..5c0578d359f 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -34,7 +34,10 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RemoteLogInputStream; import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -58,15 +61,20 @@ import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoi import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.EpochEntry; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.FetchIsolation; import org.apache.kafka.storage.internals.log.LazyIndex; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.OffsetIndex; import org.apache.kafka.storage.internals.log.ProducerStateManager; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.internals.log.TimeIndex; import org.apache.kafka.storage.internals.log.TransactionIndex; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.MockedConstruction; @@ -76,7 +84,9 @@ import scala.collection.JavaConverters; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; @@ -89,6 +99,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.Set; import java.util.TreeMap; @@ -121,6 +132,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -1731,6 +1743,146 @@ public class RemoteLogManagerTest { return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + int fetchOffset = 0; + int fetchMaxBytes = 10; + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + // This is the key scenario that we are testing here + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertFalse(fetchDataInfo.firstEntryIncomplete); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + // FetchIsolation is TXN_COMMITTED + assertTrue(fetchDataInfo.abortedTransactions.isPresent()); + assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + int fetchOffset = 0; + int fetchMaxBytes = 10; + int recordBatchSizeInBytes = fetchMaxBytes + 1; + RecordBatch firstBatch = mock(RecordBatch.class); + ArgumentCaptor capture = ArgumentCaptor.forClass(ByteBuffer.class); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, minOneMessage, tp, partitionData, FetchIsolation.HIGH_WATERMARK, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes); + doNothing().when(firstBatch).writeTo(capture.capture()); + return firstBatch; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + // Common assertions + assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertFalse(fetchDataInfo.firstEntryIncomplete); + // FetchIsolation is HIGH_WATERMARK + assertEquals(Optional.empty(), fetchDataInfo.abortedTransactions); + + + if (minOneMessage) { + // Verify that the byte buffer has capacity equal to the size of the first batch + assertEquals(recordBatchSizeInBytes, capture.getValue().capacity()); + } else { + // Verify that the first batch is never written to the buffer + verify(firstBatch, never()).writeTo(any(ByteBuffer.class)); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + } + } + } + private Partition mockPartition(TopicIdPartition topicIdPartition) { TopicPartition tp = topicIdPartition.topicPartition(); Partition partition = mock(Partition.class);