From a619c6bb95200108cbeabd041242b8ff01a0c3dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Tue, 13 May 2025 20:06:38 -0400 Subject: [PATCH] MINOR; Remove cast for Records' slice (#19661) In Java return types are covariant. This means that method override can override the return type with a subclass. Reviewers: Jun Rao , Chia-Ping Tsai , Apoorv Mittal --- .../kafka/common/record/FileRecords.java | 80 ++++++++++++------- .../kafka/common/record/MemoryRecords.java | 2 +- .../apache/kafka/common/record/Records.java | 3 +- .../kafka/common/record/FileRecordsTest.java | 44 +++++----- .../common/record/MemoryRecordsTest.java | 51 +++++++----- .../scala/kafka/tools/DumpLogSegments.scala | 6 +- 6 files changed, 110 insertions(+), 76 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 6a42a52d2e0..4ec7db604bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -54,33 +54,52 @@ public class FileRecords extends AbstractRecords implements Closeable { * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible. * The constructor is visible for tests. */ - FileRecords(File file, - FileChannel channel, - int start, - int end, - boolean isSlice) throws IOException { + FileRecords( + File file, + FileChannel channel, + int end + ) throws IOException { + this.file = file; + this.channel = channel; + this.start = 0; + this.end = end; + this.isSlice = false; + + if (channel.size() > Integer.MAX_VALUE) { + throw new KafkaException( + "The size of segment " + file + " (" + channel.size() + + ") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE + ); + } + + int limit = Math.min((int) channel.size(), end); + this.size = new AtomicInteger(limit - start); + + // update the file position to the end of the file + channel.position(limit); + + batches = batchesFrom(start); + } + + /** + * Constructor for creating a slice. + * + * This overloaded constructor avoids having to declare a checked IO exception. + */ + private FileRecords( + File file, + FileChannel channel, + int start, + int end + ) { this.file = file; this.channel = channel; this.start = start; this.end = end; - this.isSlice = isSlice; - this.size = new AtomicInteger(); + this.isSlice = true; - if (isSlice) { - // don't check the file size if this is just a slice view - size.set(end - start); - } else { - if (channel.size() > Integer.MAX_VALUE) - throw new KafkaException("The size of segment " + file + " (" + channel.size() + - ") is larger than the maximum allowed segment size of " + Integer.MAX_VALUE); - - int limit = Math.min((int) channel.size(), end); - size.set(limit - start); - - // if this is not a slice, update the file pointer to the end of the file - // set the file position to the last byte in the file - channel.position(limit); - } + // don't check the file size since this is just a slice view + this.size = new AtomicInteger(end - start); batches = batchesFrom(start); } @@ -121,10 +140,11 @@ public class FileRecords extends AbstractRecords implements Closeable { } @Override - public Records slice(int position, int size) throws IOException { + public FileRecords slice(int position, int size) { int availableBytes = availableBytes(position, size); int startPosition = this.start + position; - return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true); + + return new FileRecords(file, channel, startPosition, startPosition + availableBytes); } /** @@ -288,7 +308,7 @@ public class FileRecords extends AbstractRecords implements Closeable { */ public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, int startingPosition) { FileChannelRecordBatch prevBatch = null; - // The following logic is intentionally designed to minimize memory usage by avoiding + // The following logic is intentionally designed to minimize memory usage by avoiding // unnecessary calls to lastOffset() for every batch. // Instead, we use baseOffset() comparisons when possible, and only check lastOffset() when absolutely necessary. for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { @@ -296,14 +316,14 @@ public class FileRecords extends AbstractRecords implements Closeable { if (batch.baseOffset() == targetOffset) { return LogOffsetPosition.fromBatch(batch); } - + // If we find the first batch with baseOffset greater than targetOffset if (batch.baseOffset() > targetOffset) { // If the previous batch contains the target if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) return LogOffsetPosition.fromBatch(prevBatch); else { - // If there's no previous batch or the previous batch doesn't contain the + // If there's no previous batch or the previous batch doesn't contain the // target, return the current batch return LogOffsetPosition.fromBatch(batch); } @@ -312,7 +332,7 @@ public class FileRecords extends AbstractRecords implements Closeable { } // Only one case would reach here: all batches have baseOffset less than targetOffset // Check if the last batch contains the target - if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) + if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) return LogOffsetPosition.fromBatch(prevBatch); return null; @@ -424,7 +444,7 @@ public class FileRecords extends AbstractRecords implements Closeable { boolean preallocate) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false); + return new FileRecords(file, channel, end); } public static FileRecords open(File file, @@ -475,7 +495,7 @@ public class FileRecords extends AbstractRecords implements Closeable { public final long offset; public final int position; public final int size; - + public static LogOffsetPosition fromBatch(FileChannelRecordBatch batch) { return new LogOffsetPosition(batch.baseOffset(), batch.position(), batch.sizeInBytes()); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 1786f61d187..2e2b97dfe37 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -301,7 +301,7 @@ public class MemoryRecords extends AbstractRecords { } @Override - public Records slice(int position, int size) { + public MemoryRecords slice(int position, int size) { if (position < 0) throw new IllegalArgumentException("Invalid position: " + position + " in read from " + this); if (position > buffer.limit()) diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index 3d45762e815..017c49ba94c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.utils.AbstractIterator; -import java.io.IOException; import java.util.Iterator; import java.util.Optional; @@ -105,5 +104,5 @@ public interface Records extends TransferableRecords { * @param size The number of bytes after the start position to include * @return A sliced wrapper on this message set limited based on the given position and size */ - Records slice(int position, int size) throws IOException; + Records slice(int position, int size); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 75babcf95b0..04590ccbe7b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -89,7 +89,7 @@ public class FileRecordsTest { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE); - FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); + FileRecords records = new FileRecords(fileMock, fileChannelMock, Integer.MAX_VALUE); assertThrows(IllegalArgumentException.class, () -> append(records, values)); } @@ -99,7 +99,7 @@ public class FileRecordsTest { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L); - assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false)); + assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, Integer.MAX_VALUE)); } @Test @@ -198,9 +198,9 @@ public class FileRecordsTest { */ @Test public void testRead() throws IOException { - Records read = fileRecords.slice(0, fileRecords.sizeInBytes()); + FileRecords read = fileRecords.slice(0, fileRecords.sizeInBytes()); assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes()); - TestUtils.checkEquals(fileRecords.batches(), ((FileRecords) read).batches()); + TestUtils.checkEquals(fileRecords.batches(), read.batches()); List items = batches(read); RecordBatch first = items.get(0); @@ -313,7 +313,7 @@ public class FileRecordsTest { when(channelMock.size()).thenReturn(42L); when(channelMock.position(42L)).thenReturn(null); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE); fileRecords.truncateTo(42); verify(channelMock, atLeastOnce()).size(); @@ -330,7 +330,7 @@ public class FileRecordsTest { when(channelMock.size()).thenReturn(42L); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE); try { fileRecords.truncateTo(43); @@ -352,7 +352,7 @@ public class FileRecordsTest { when(channelMock.size()).thenReturn(42L); when(channelMock.truncate(anyLong())).thenReturn(channelMock); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, Integer.MAX_VALUE); fileRecords.truncateTo(23); verify(channelMock, atLeastOnce()).size(); @@ -526,7 +526,7 @@ public class FileRecordsTest { * 1. If the target offset equals the base offset of the first batch * 2. If the target offset is less than the base offset of the first batch *

- * If the base offset of the first batch is equal to or greater than the target offset, it should return the + * If the base offset of the first batch is equal to or greater than the target offset, it should return the * position of the first batch and the lastOffset method should not be called. */ @ParameterizedTest @@ -537,7 +537,7 @@ public class FileRecordsTest { FileLogInputStream.FileChannelRecordBatch batch = mock(FileLogInputStream.FileChannelRecordBatch.class); when(batch.baseOffset()).thenReturn(baseOffset); - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, batch); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); @@ -557,7 +557,7 @@ public class FileRecordsTest { when(batch.baseOffset()).thenReturn(3L); when(batch.lastOffset()).thenReturn(5L); - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, batch); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(5L, 0); @@ -581,7 +581,7 @@ public class FileRecordsTest { when(currentBatch.baseOffset()).thenReturn(15L); when(currentBatch.lastOffset()).thenReturn(20L); - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, prevBatch, currentBatch); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(20L, 0); @@ -605,13 +605,13 @@ public class FileRecordsTest { FileLogInputStream.FileChannelRecordBatch currentBatch = mock(FileLogInputStream.FileChannelRecordBatch.class); when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, prevBatch, currentBatch); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), result); - // Because the target offset is in the current batch, we should call lastOffset + // Because the target offset is in the current batch, we should call lastOffset // on the previous batch verify(prevBatch, times(1)).lastOffset(); } @@ -629,13 +629,13 @@ public class FileRecordsTest { when(batch2.baseOffset()).thenReturn(8L); // < targetOffset when(batch2.lastOffset()).thenReturn(9L); // < targetOffset - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, batch1, batch2); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(10L, 0); assertNull(result); - // Because the target offset is exceeded by the last offset of the batch2, + // Because the target offset is exceeded by the last offset of the batch2, // we should call lastOffset on the batch2 verify(batch1, never()).lastOffset(); verify(batch2, times(1)).lastOffset(); @@ -657,7 +657,7 @@ public class FileRecordsTest { when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset or == targetOffset when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, batch1, batch2); long targetOffset = 10L; @@ -670,7 +670,7 @@ public class FileRecordsTest { verify(batch1, never()).lastOffset(); verify(batch2, never()).lastOffset(); } else { - // Because the target offset is in the batch2, we should not call + // Because the target offset is in the batch2, we should not call // lastOffset on batch1 verify(batch1, never()).lastOffset(); verify(batch2, times(1)).lastOffset(); @@ -685,13 +685,13 @@ public class FileRecordsTest { File mockFile = mock(File.class); FileChannel mockChannel = mock(FileChannel.class); FileLogInputStream.FileChannelRecordBatch batch1 = mock(FileLogInputStream.FileChannelRecordBatch.class); - when(batch1.baseOffset()).thenReturn(5L); - when(batch1.lastOffset()).thenReturn(10L); + when(batch1.baseOffset()).thenReturn(5L); + when(batch1.lastOffset()).thenReturn(10L); FileLogInputStream.FileChannelRecordBatch batch2 = mock(FileLogInputStream.FileChannelRecordBatch.class); - when(batch2.baseOffset()).thenReturn(15L); - when(batch2.lastOffset()).thenReturn(20L); + when(batch2.baseOffset()).thenReturn(15L); + when(batch2.lastOffset()).thenReturn(20L); - FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 0, 100, false)); + FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, mockChannel, 100)); mockFileRecordBatches(fileRecords, batch1, batch2); FileRecords.LogOffsetPosition result = fileRecords.searchForOffsetFromPosition(13L, 0); diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 6f98da21eed..7092928010b 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -136,10 +135,6 @@ public class MemoryRecordsTest { ByteBuffer buffer = ByteBuffer.allocate(1024); int partitionLeaderEpoch = 998; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression, - TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false, - partitionLeaderEpoch, buffer.limit()); - SimpleRecord[] records = new SimpleRecord[] { new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), @@ -149,10 +144,30 @@ public class MemoryRecordsTest { new SimpleRecord(6L, (byte[]) null, null) }; - for (SimpleRecord record : records) - builder.append(record); + final MemoryRecords memoryRecords; + try (var builder = new MemoryRecordsBuilder( + buffer, + magic, + compression, + TimestampType.CREATE_TIME, + firstOffset, + logAppendTime, + pid, + epoch, + firstSequence, + false, + false, + partitionLeaderEpoch, + buffer.limit() + ) + ) { + for (SimpleRecord record : records) { + builder.append(record); + } + + memoryRecords = builder.build(); + } - MemoryRecords memoryRecords = builder.build(); for (int iteration = 0; iteration < 2; iteration++) { int total = 0; for (RecordBatch batch : memoryRecords.batches()) { @@ -1075,7 +1090,7 @@ public class MemoryRecordsTest { @ParameterizedTest @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testSlice(Args args) throws IOException { + public void testSlice(Args args) { // Create a MemoryRecords instance with multiple batches. Prior RecordBatch.MAGIC_VALUE_V2, // every append in a batch is a new batch. After RecordBatch.MAGIC_VALUE_V2, we can have multiple // batches in a single MemoryRecords instance. Though with compression, we can have multiple @@ -1087,10 +1102,10 @@ public class MemoryRecordsTest { MemoryRecords records = createMemoryRecords(args, recordsPerOffset); // Test slicing from start - Records sliced = records.slice(0, records.sizeInBytes()); + MemoryRecords sliced = records.slice(0, records.sizeInBytes()); assertEquals(records.sizeInBytes(), sliced.sizeInBytes()); - assertEquals(records.validBytes(), ((MemoryRecords) sliced).validBytes()); - TestUtils.checkEquals(records.batches(), ((MemoryRecords) sliced).batches()); + assertEquals(records.validBytes(), sliced.validBytes()); + TestUtils.checkEquals(records.batches(), sliced.batches()); List items = batches(records); // Test slicing first message. @@ -1098,19 +1113,19 @@ public class MemoryRecordsTest { sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - first.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); - assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); // Read from second message and size is past the end of the file. sliced = records.slice(first.sizeInBytes(), records.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); - assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); // Read from second message and position + size overflows. sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); - assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); // Read a single message starting from second message. RecordBatch second = items.get(1); @@ -1131,14 +1146,14 @@ public class MemoryRecordsTest { .slice(first.sizeInBytes() - 1, records.sizeInBytes()); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); - assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); // Read from second message and position + size overflows on the already sliced view. sliced = records.slice(1, records.sizeInBytes() - 1) .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE); assertEquals(records.sizeInBytes() - first.sizeInBytes(), sliced.sizeInBytes()); assertEquals(items.subList(1, items.size()), batches(sliced), "Read starting from the second message"); - assertTrue(((MemoryRecords) sliced).validBytes() <= sliced.sizeInBytes()); + assertTrue(sliced.validBytes() <= sliced.sizeInBytes()); } @ParameterizedTest @@ -1170,7 +1185,7 @@ public class MemoryRecordsTest { */ @ParameterizedTest @ArgumentsSource(MemoryRecordsArgumentsProvider.class) - public void testSliceForAlreadySlicedMemoryRecords(Args args) throws IOException { + public void testSliceForAlreadySlicedMemoryRecords(Args args) { LinkedHashMap recordsPerOffset = new LinkedHashMap<>(); recordsPerOffset.put(args.firstOffset, 5); recordsPerOffset.put(args.firstOffset + 5L, 10); diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 85bc4e1a269..7967d6f4f49 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -277,7 +277,7 @@ object DumpLogSegments { println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: ${path.snapshotId.epoch}") } } - val fileRecords = FileRecords.open(file, false).slice(0, maxBytes).asInstanceOf[FileRecords] + val fileRecords = FileRecords.open(file, false).slice(0, maxBytes) try { var validBytes = 0L var lastOffset = -1L @@ -567,7 +567,7 @@ object DumpLogSegments { private class RemoteMetadataLogMessageParser extends MessageParser[String, String] { private val metadataRecordSerde = new RemoteLogMetadataSerde - + override def parse(record: Record): (Option[String], Option[String]) = { val output = try { val data = new Array[Byte](record.value.remaining) @@ -626,7 +626,7 @@ object DumpLogSegments { private val transactionLogOpt = parser.accepts("transaction-log-decoder", "If set, log data will be parsed as " + "transaction metadata from the __transaction_state topic.") private val clusterMetadataOpt = parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as cluster metadata records.") - private val remoteMetadataOpt = parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." + + private val remoteMetadataOpt = parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." + " Instead, the value-decoder-class option can be used if a custom RLMM implementation is configured.") private val shareStateOpt = parser.accepts("share-group-state-decoder", "If set, log data will be parsed as share group state data from the " + "__share_group_state topic.")