MINOR; Remove cast for Records' slice (#19661)
CI / build (push) Waiting to run Details

In Java return types are covariant. This means that method override can
override the return type with a subclass.

Reviewers: Jun Rao <junrao@apache.org>, Chia-Ping Tsai
 <chia7712@apache.org>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
José Armando García Sancio 2025-05-13 20:06:38 -04:00 committed by GitHub
parent c60c83aaba
commit a619c6bb95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 110 additions and 76 deletions

View File

@ -54,33 +54,52 @@ public class FileRecords extends AbstractRecords implements Closeable {
* The {@code FileRecords.open} methods should be used instead of this constructor whenever possible. * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible.
* The constructor is visible for tests. * The constructor is visible for tests.
*/ */
FileRecords(File file, FileRecords(
FileChannel channel, File file,
int start, FileChannel channel,
int end, int end
boolean isSlice) throws IOException { ) throws IOException {
this.file = file;
this.channel = channel;
this.start = 0;
this.end = end;
this.isSlice = false;
if (channel.size() > Integer.MAX_VALUE) {
throw new KafkaException(
"The size of segment " + file + " (" + channel.size() +
") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE
);
}
int limit = Math.min((int) channel.size(), end);
this.size = new AtomicInteger(limit - start);
// update the file position to the end of the file
channel.position(limit);
batches = batchesFrom(start);
}
/**
* Constructor for creating a slice.
*
* This overloaded constructor avoids having to declare a checked IO exception.
*/
private FileRecords(
File file,
FileChannel channel,
int start,
int end
) {
this.file = file; this.file = file;
this.channel = channel; this.channel = channel;
this.start = start; this.start = start;
this.end = end; this.end = end;
this.isSlice = isSlice; this.isSlice = true;
this.size = new AtomicInteger();
if (isSlice) { // don't check the file size since this is just a slice view
// don't check the file size if this is just a slice view this.size = new AtomicInteger(end - start);
size.set(end - start);
} else {
if (channel.size() > Integer.MAX_VALUE)
throw new KafkaException("The size of segment " + file + " (" + channel.size() +
") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE);
int limit = Math.min((int) channel.size(), end);
size.set(limit - start);
// if this is not a slice, update the file pointer to the end of the file
// set the file position to the last byte in the file
channel.position(limit);
}
batches = batchesFrom(start); batches = batchesFrom(start);
} }
@ -121,10 +140,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
} }
@Override @Override
public Records slice(int position, int size) throws IOException { public FileRecords slice(int position, int size) {
int availableBytes = availableBytes(position, size); int availableBytes = availableBytes(position, size);
int startPosition = this.start + position; int startPosition = this.start + position;
return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true);
return new FileRecords(file, channel, startPosition, startPosition + availableBytes);
} }
/** /**
@ -288,7 +308,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
*/ */
public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, int startingPosition) { public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, int startingPosition) {
FileChannelRecordBatch prevBatch = null; FileChannelRecordBatch prevBatch = null;
// The following logic is intentionally designed to minimize memory usage by avoiding // The following logic is intentionally designed to minimize memory usage by avoiding
// unnecessary calls to lastOffset() for every batch. // unnecessary calls to lastOffset() for every batch.
// Instead, we use baseOffset() comparisons when possible, and only check lastOffset() when absolutely necessary. // Instead, we use baseOffset() comparisons when possible, and only check lastOffset() when absolutely necessary.
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
@ -296,14 +316,14 @@ public class FileRecords extends AbstractRecords implements Closeable {
if (batch.baseOffset() == targetOffset) { if (batch.baseOffset() == targetOffset) {
return LogOffsetPosition.fromBatch(batch); return LogOffsetPosition.fromBatch(batch);
} }
// If we find the first batch with baseOffset greater than targetOffset // If we find the first batch with baseOffset greater than targetOffset
if (batch.baseOffset() > targetOffset) { if (batch.baseOffset() > targetOffset) {
// If the previous batch contains the target // If the previous batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch); return LogOffsetPosition.fromBatch(prevBatch);
else { else {
// If there's no previous batch or the previous batch doesn't contain the // If there's no previous batch or the previous batch doesn't contain the
// target, return the current batch // target, return the current batch
return LogOffsetPosition.fromBatch(batch); return LogOffsetPosition.fromBatch(batch);
} }
@ -312,7 +332,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
} }
// Only one case would reach here: all batches have baseOffset less than targetOffset // Only one case would reach here: all batches have baseOffset less than targetOffset
// Check if the last batch contains the target // Check if the last batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch); return LogOffsetPosition.fromBatch(prevBatch);
return null; return null;
@ -424,7 +444,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
boolean preallocate) throws IOException { boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
return new FileRecords(file, channel, 0, end, false); return new FileRecords(file, channel, end);
} }
public static FileRecords open(File file, public static FileRecords open(File file,
@ -475,7 +495,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
public final long offset; public final long offset;
public final int position; public final int position;
public final int size; public final int size;
public static LogOffsetPosition fromBatch(FileChannelRecordBatch batch) { public static LogOffsetPosition fromBatch(FileChannelRecordBatch batch) {
return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes()); return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes());
} }

View File

@ -301,7 +301,7 @@ public class MemoryRecords extends AbstractRecords {
} }
@Override @Override
public Records slice(int position, int size) { public MemoryRecords slice(int position, int size) {
if (position < 0) if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this); throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this);
if (position > buffer.limit()) if (position > buffer.limit())

View File

@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.AbstractIterator;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
@ -105,5 +104,5 @@ public interface Records extends TransferableRecords {
* @param size The number of bytes after the start position to include * @param size The number of bytes after the start position to include
* @return A sliced wrapper on this message set limited based on the given position and size * @return A sliced wrapper on this message set limited based on the given position and size
*/ */
Records slice(int position, int size) throws IOException; Records slice(int position, int size);
} }

View File

@ -89,7 +89,7 @@ public class FileRecordsTest {
FileChannel fileChannelMock = mock(FileChannel.class); FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE); when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);
FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); FileRecords records = new FileRecords(fileMock, fileChannelMock, Integer.MAX_VALUE);
assertThrows(IllegalArgumentException.class, () -> append(records, values)); assertThrows(IllegalArgumentException.class, () -> append(records, values));
} }
@ -99,7 +99,7 @@ public class FileRecordsTest {
FileChannel fileChannelMock = mock(FileChannel.class); FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L); when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);
assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false)); assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, Integer.MAX_VALUE));
} }
@Test @Test
@ -198,9 +198,9 @@ public class FileRecordsTest {
*/ */
@Test @Test
public void testRead() throws IOException { public void testRead() throws IOException {
Records read = fileRecords.slice(0, fileRecords.sizeInBytes()); FileRecords read = fileRecords.slice(0, fileRecords.sizeInBytes());
assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes()); assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
TestUtils.checkEquals(fileRecords.batches(), ((FileRecords) read).batches()); TestUtils.checkEquals(fileRecords.batches(), read.batches());
List<RecordBatch> items = batches(read); List<RecordBatch> items = batches(read);
RecordBatch first = items.get(0); RecordBatch first = items.get(0);
@ -313,7 +313,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L); when(channelMock.size()).thenReturn(42L);
when(channelMock.position(42L)).thenReturn(null); when(channelMock.position(42L)).thenReturn(null);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE);
fileRecords.truncateTo(42); fileRecords.truncateTo(42);
verify(channelMock, atLeastOnce()).size(); verify(channelMock, atLeastOnce()).size();
@ -330,7 +330,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L); when(channelMock.size()).thenReturn(42L);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE);
try { try {
fileRecords.truncateTo(43); fileRecords.truncateTo(43);
@ -352,7 +352,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L); when(channelMock.size()).thenReturn(42L);
when(channelMock.truncate(anyLong())).thenReturn(channelMock); when(channelMock.truncate(anyLong())).thenReturn(channelMock);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE);
fileRecords.truncateTo(23); fileRecords.truncateTo(23);
verify(channelMock, atLeastOnce()).size(); verify(channelMock, atLeastOnce()).size();
@ -526,7 +526,7 @@ public class FileRecordsTest {
* 1. If the target offset equals the base offset of the first batch * 1. If the target offset equals the base offset of the first batch
* 2. If the target offset is less than the base offset of the first batch * 2. If the target offset is less than the base offset of the first batch
* <p> * <p>
* If the base offset of the first batch is equal to or greater than the target offset, it should return the * If the base offset of the first batch is equal to or greater than the target offset, it should return the
* position of the first batch and the lastOffset method should not be called. * position of the first batch and the lastOffset method should not be called.
*/ */
@ParameterizedTest @ParameterizedTest
@ -537,7 +537,7 @@ public class FileRecordsTest {
FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch.baseOffset()).thenReturn(baseOffset); when(batch.baseOffset()).thenReturn(baseOffset);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, batch); mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0);
@ -557,7 +557,7 @@ public class FileRecordsTest {
when(batch.baseOffset()).thenReturn(3L); when(batch.baseOffset()).thenReturn(3L);
when(batch.lastOffset()).thenReturn(5L); when(batch.lastOffset()).thenReturn(5L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, batch); mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0);
@ -581,7 +581,7 @@ public class FileRecordsTest {
when(currentBatch.baseOffset()).thenReturn(15L); when(currentBatch.baseOffset()).thenReturn(15L);
when(currentBatch.lastOffset()).thenReturn(20L); when(currentBatch.lastOffset()).thenReturn(20L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch); mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(20L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(20L, 0);
@ -605,13 +605,13 @@ public class FileRecordsTest {
FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch); mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result); assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result);
// Because the target offset is in the current batch, we should call lastOffset // Because the target offset is in the current batch, we should call lastOffset
// on the previous batch // on the previous batch
verify(prevBatch, times(1)).lastOffset(); verify(prevBatch, times(1)).lastOffset();
} }
@ -629,13 +629,13 @@ public class FileRecordsTest {
when(batch2.baseOffset()).thenReturn(8L); // < targetOffset when(batch2.baseOffset()).thenReturn(8L); // < targetOffset
when(batch2.lastOffset()).thenReturn(9L); // < targetOffset when(batch2.lastOffset()).thenReturn(9L); // < targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2); mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0);
assertNull(result); assertNull(result);
// Because the target offset is exceeded by the last offset of the batch2, // Because the target offset is exceeded by the last offset of the batch2,
// we should call lastOffset on the batch2 // we should call lastOffset on the batch2
verify(batch1, never()).lastOffset(); verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset(); verify(batch2, times(1)).lastOffset();
@ -657,7 +657,7 @@ public class FileRecordsTest {
when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset
when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2); mockFileRecordBatches(fileRecords, batch1, batch2);
long targetOffset = 10L; long targetOffset = 10L;
@ -670,7 +670,7 @@ public class FileRecordsTest {
verify(batch1, never()).lastOffset(); verify(batch1, never()).lastOffset();
verify(batch2, never()).lastOffset(); verify(batch2, never()).lastOffset();
} else { } else {
// Because the target offset is in the batch2, we should not call // Because the target offset is in the batch2, we should not call
// lastOffset on batch1 // lastOffset on batch1
verify(batch1, never()).lastOffset(); verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset(); verify(batch2, times(1)).lastOffset();
@ -685,13 +685,13 @@ public class FileRecordsTest {
File mockFile = mock(File.class); File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class); FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch1.baseOffset()).thenReturn(5L); when(batch1.baseOffset()).thenReturn(5L);
when(batch1.lastOffset()).thenReturn(10L); when(batch1.lastOffset()).thenReturn(10L);
FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch2.baseOffset()).thenReturn(15L); when(batch2.baseOffset()).thenReturn(15L);
when(batch2.lastOffset()).thenReturn(20L); when(batch2.lastOffset()).thenReturn(20L);
FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2); mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(13L, 0); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(13L, 0);

View File

@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -136,10 +135,6 @@ public class MemoryRecordsTest {
ByteBuffer buffer = ByteBuffer.allocate(1024); ByteBuffer buffer = ByteBuffer.allocate(1024);
int partitionLeaderEpoch = 998; int partitionLeaderEpoch = 998;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression,
TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false,
partitionLeaderEpoch, buffer.limit());
SimpleRecord[] records = new SimpleRecord[] { SimpleRecord[] records = new SimpleRecord[] {
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
@ -149,10 +144,30 @@ public class MemoryRecordsTest {
new SimpleRecord(6L, (byte[]) null, null) new SimpleRecord(6L, (byte[]) null, null)
}; };
for (SimpleRecord record : records) final MemoryRecords memoryRecords;
builder.append(record); try (var builder = new MemoryRecordsBuilder(
buffer,
magic,
compression,
TimestampType.CREATE_TIME,
firstOffset,
logAppendTime,
pid,
epoch,
firstSequence,
false,
false,
partitionLeaderEpoch,
buffer.limit()
)
) {
for (SimpleRecord record : records) {
builder.append(record);
}
memoryRecords = builder.build();
}
MemoryRecords memoryRecords = builder.build();
for (int iteration = 0; iteration < 2; iteration++) { for (int iteration = 0; iteration < 2; iteration++) {
int total = 0; int total = 0;
for (RecordBatch batch : memoryRecords.batches()) { for (RecordBatch batch : memoryRecords.batches()) {
@ -1075,7 +1090,7 @@ public class MemoryRecordsTest {
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(MemoryRecordsArgumentsProvider.class) @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
public void testSlice(Args args) throws IOException { public void testSlice(Args args) {
// Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2,
// every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple
// batches in a single MemoryRecords instance. Though with compression, we can have multiple // batches in a single MemoryRecords instance. Though with compression, we can have multiple
@ -1087,10 +1102,10 @@ public class MemoryRecordsTest {
MemoryRecords records = createMemoryRecords(args, recordsPerOffset); MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
// Test slicing from start // Test slicing from start
Records sliced = records.slice(0, records.sizeInBytes()); MemoryRecords sliced = records.slice(0, records.sizeInBytes());
assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes(), sliced.sizeInBytes());
assertEquals(records.validBytes(), ((MemoryRecords) sliced).validBytes()); assertEquals(records.validBytes(), sliced.validBytes());
TestUtils.checkEquals(records.batches(), ((MemoryRecords) sliced).batches()); TestUtils.checkEquals(records.batches(), sliced.batches());
List<RecordBatch> items = batches(records); List<RecordBatch> items = batches(records);
// Test slicing first message. // Test slicing first message.
@ -1098,19 +1113,19 @@ public class MemoryRecordsTest {
sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message");
assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and size is past the end of the file. // Read from second message and size is past the end of the file.
sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); sliced = records.slice(first.sizeInBytes(), records.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message");
assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and position + size overflows. // Read from second message and position + size overflows.
sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE);
assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message");
assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read a single message starting from second message. // Read a single message starting from second message.
RecordBatch second = items.get(1); RecordBatch second = items.get(1);
@ -1131,14 +1146,14 @@ public class MemoryRecordsTest {
.slice(first.sizeInBytes() - 1, records.sizeInBytes()); .slice(first.sizeInBytes() - 1, records.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message");
assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and position + size overflows on the already sliced view. // Read from second message and position + size overflows on the already sliced view.
sliced = records.slice(1, records.sizeInBytes() - 1) sliced = records.slice(1, records.sizeInBytes() - 1)
.slice(first.sizeInBytes() - 1, Integer.MAX_VALUE); .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE);
assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message");
assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
} }
@ParameterizedTest @ParameterizedTest
@ -1170,7 +1185,7 @@ public class MemoryRecordsTest {
*/ */
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(MemoryRecordsArgumentsProvider.class) @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
public void testSliceForAlreadySlicedMemoryRecords(Args args) throws IOException { public void testSliceForAlreadySlicedMemoryRecords(Args args) {
LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>(); LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
recordsPerOffset.put(args.firstOffset, 5); recordsPerOffset.put(args.firstOffset, 5);
recordsPerOffset.put(args.firstOffset + 5L, 10); recordsPerOffset.put(args.firstOffset + 5L, 10);

View File

@ -277,7 +277,7 @@ object DumpLogSegments {
println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}") println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}")
} }
} }
val fileRecords = FileRecords.open(file, false).slice(0, maxBytes).asInstanceOf[FileRecords] val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
try { try {
var validBytes = 0L var validBytes = 0L
var lastOffset = -1L var lastOffset = -1L
@ -567,7 +567,7 @@ object DumpLogSegments {
private class RemoteMetadataLogMessageParser extends MessageParser[String, String] { private class RemoteMetadataLogMessageParser extends MessageParser[String, String] {
private val metadataRecordSerde = new RemoteLogMetadataSerde private val metadataRecordSerde = new RemoteLogMetadataSerde
override def parse(record: Record): (Option[String], Option[String]) = { override def parse(record: Record): (Option[String], Option[String]) = {
val output = try { val output = try {
val data = new Array[Byte](record.value.remaining) val data = new Array[Byte](record.value.remaining)
@ -626,7 +626,7 @@ object DumpLogSegments {
private val transactionLogOpt = parser.accepts("transaction-log-decoder", "If set, log data will be parsed as " + private val transactionLogOpt = parser.accepts("transaction-log-decoder", "If set, log data will be parsed as " +
"transaction metadata from the __transaction_state topic.") "transaction metadata from the __transaction_state topic.")
private val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as cluster metadata records.") private val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as cluster metadata records.")
private val remoteMetadataOpt = parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." + private val remoteMetadataOpt = parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." +
" Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured.") " Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured.")
private val shareStateOpt = parser.accepts("share-group-state-decoder", "If set, log data will be parsed as share group state data from the " + private val shareStateOpt = parser.accepts("share-group-state-decoder", "If set, log data will be parsed as share group state data from the " +
"__share_group_state topic.") "__share_group_state topic.")