MINOR: Add additional tests for RemoteLogManager (#14311)

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Nikhil Ramakrishnan 2023-09-21 14:06:22 +05:30 committed by GitHub
parent fcd382138e
commit 47a1a7c70e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 156 additions and 2 deletions

View File

@ -1314,7 +1314,8 @@ public class RemoteLogManager implements Closeable {
}
}
private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
// Visible for testing
int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
}
@ -1398,7 +1399,8 @@ public class RemoteLogManager implements Closeable {
: Optional.empty();
}
private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
// Visible for testing
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
RecordBatch nextBatch;
// Look for the batch which has the desired offset
// We will always have a batch in that segment as it is a non-compacted topic.

View File

@ -34,7 +34,10 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RemoteLogInputStream;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@ -58,15 +61,20 @@ import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoi
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
@ -76,7 +84,9 @@ import scala.collection.JavaConverters;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
@ -89,6 +99,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
@ -121,6 +132,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@ -1731,6 +1743,146 @@ public class RemoteLogManagerTest {
return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
}
@Test
public void testReadForMissingFirstBatchInRemote() throws RemoteStorageException, IOException {
FileInputStream fileInputStream = mock(FileInputStream.class);
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class);
LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt()))
.thenAnswer(a -> fileInputStream);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
int fetchOffset = 0;
int fetchMaxBytes = 10;
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty()
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
brokerId,
logDir,
clusterId,
time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
int epochForOffset, long offset) {
return Optional.of(segmentMetadata);
}
int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
return 1;
}
// This is the key scenario that we are testing here
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
return null;
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset);
assertFalse(fetchDataInfo.firstEntryIncomplete);
assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records);
// FetchIsolation is TXN_COMMITTED
assertTrue(fetchDataInfo.abortedTransactions.isPresent());
assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty());
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testReadForFirstBatchMoreThanMaxFetchBytes(boolean minOneMessage) throws RemoteStorageException, IOException {
FileInputStream fileInputStream = mock(FileInputStream.class);
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class);
LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt()))
.thenAnswer(a -> fileInputStream);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
int fetchOffset = 0;
int fetchMaxBytes = 10;
int recordBatchSizeInBytes = fetchMaxBytes + 1;
RecordBatch firstBatch = mock(RecordBatch.class);
ArgumentCaptor<ByteBuffer> capture = ArgumentCaptor.forClass(ByteBuffer.class);
FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
Uuid.randomUuid(), fetchOffset, 0, fetchMaxBytes, Optional.empty()
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
0, minOneMessage, tp, partitionData, FetchIsolation.HIGH_WATERMARK, false
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
remoteLogManagerConfig,
brokerId,
logDir,
clusterId,
time,
tp -> Optional.of(mockLog),
(topicPartition, offset) -> { },
brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
int epochForOffset, long offset) {
return Optional.of(segmentMetadata);
}
int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
return 1;
}
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
return firstBatch;
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
// Common assertions
assertEquals(fetchOffset, fetchDataInfo.fetchOffsetMetadata.messageOffset);
assertFalse(fetchDataInfo.firstEntryIncomplete);
// FetchIsolation is HIGH_WATERMARK
assertEquals(Optional.empty(), fetchDataInfo.abortedTransactions);
if (minOneMessage) {
// Verify that the byte buffer has capacity equal to the size of the first batch
assertEquals(recordBatchSizeInBytes, capture.getValue().capacity());
} else {
// Verify that the first batch is never written to the buffer
verify(firstBatch, never()).writeTo(any(ByteBuffer.class));
assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records);
}
}
}
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);