KAFKA-18795 Remove `Records#downConvert` (#18897)

Since we no longer convert records to the old format for fetch requests, this code is no longer used in production.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
xijiu 2025-02-22 02:29:58 +08:00 committed by GitHub
parent c580874fc2
commit 118818a7ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 0 additions and 484 deletions

View File

@ -1,36 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
public class ConvertedRecords<T extends Records> {
private final T records;
private final RecordValidationStats recordValidationStats;
public ConvertedRecords(T records, RecordValidationStats recordValidationStats) {
this.records = records;
this.recordValidationStats = recordValidationStats;
}
public T records() {
return records;
}
public RecordValidationStats recordConversionStats() {
return recordValidationStats;
}
}

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import java.io.Closeable; import java.io.Closeable;
@ -275,23 +274,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
return originalSize - targetSize; return originalSize - targetSize;
} }
@Override
public ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time) {
ConvertedRecords<MemoryRecords> convertedRecords = RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
if (convertedRecords.recordConversionStats().numRecordsConverted() == 0) {
// This indicates that the message is too large, which means that the buffer is not large
// enough to hold a full record batch. We just return all the bytes in this instance.
// Even though the record batch does not have the right format version, we expect old clients
// to raise an error to the user after reading the record batch size and seeing that there
// are not enough available bytes in the response to read it fully. Note that this is
// only possible prior to KIP-74, after which the broker was changed to always return at least
// one full record batch, even if it requires exceeding the max fetch size requested by the client.
return new ConvertedRecords<>(this, RecordValidationStats.EMPTY);
} else {
return convertedRecords;
}
}
@Override @Override
public int writeTo(TransferableChannel destChannel, int offset, int length) throws IOException { public int writeTo(TransferableChannel destChannel, int offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start; long newSize = Math.min(channel.size(), end) - start;

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -111,11 +110,6 @@ public class MemoryRecords extends AbstractRecords {
return bytes; return bytes;
} }
@Override
public ConvertedRecords<MemoryRecords> downConvert(byte toMagic, long firstOffset, Time time) {
return RecordsUtil.downConvert(batches(), toMagic, firstOffset, time);
}
@Override @Override
public AbstractIterator<MutableRecordBatch> batchIterator() { public AbstractIterator<MutableRecordBatch> batchIterator() {
return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
@ -85,17 +84,6 @@ public interface Records extends TransferableRecords {
*/ */
boolean hasMatchingMagic(byte magic); boolean hasMatchingMagic(byte magic);
/**
* Convert all batches in this buffer to the format passed as a parameter. Note that this requires
* deep iteration since all of the deep records must also be converted to the desired format.
* @param toMagic The magic value to convert to
* @param firstOffset The starting offset for returned records. This only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long, Time)} for an explanation.
* @param time instance used for reporting stats
* @return A ConvertedRecords instance which may or may not contain the same instance in its records field.
*/
ConvertedRecords<? extends Records> downConvert(byte toMagic, long firstOffset, Time time);
/** /**
* Get an iterator over the records in this log. Note that this generally requires decompression, * Get an iterator over the records in this log. Note that this generally requires decompression,
* and should therefore be used with care. * and should therefore be used with care.

View File

@ -1,141 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class RecordsUtil {
/**
* Down convert batches to the provided message format version. The first offset parameter is only relevant in the
* conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1
* are not batched (put another way, each batch always has 1 record).
*
* If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we
* need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for
* correctness.
*
* The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap
* (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we
* only load records into the heap when down converting), but it's not for the producer. However, down converting
* in the producer is very uncommon and the extra complexity to handle that case is not worth it.
*/
protected static ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic,
long firstOffset, Time time) {
// maintain the batch along with the decompressed records to avoid the need to decompress again
List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>();
int totalSizeEstimate = 0;
long startNanos = time.nanoseconds();
for (RecordBatch batch : batches) {
if (toMagic < RecordBatch.MAGIC_VALUE_V2) {
if (batch.isControlBatch())
continue;
if (batch.compressionType() == CompressionType.ZSTD)
throw new UnsupportedCompressionTypeException("Down-conversion of zstandard-compressed batches " +
"is not supported");
}
if (batch.magic() <= toMagic) {
totalSizeEstimate += batch.sizeInBytes();
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null));
} else {
List<Record> records = new ArrayList<>();
for (Record record : batch) {
// See the method javadoc for an explanation
if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset)
records.add(record);
}
if (records.isEmpty())
continue;
final long baseOffset;
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2)
baseOffset = batch.baseOffset();
else
baseOffset = records.get(0).offset();
totalSizeEstimate += AbstractRecords.estimateSizeInBytes(toMagic, baseOffset, batch.compressionType(), records);
recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, records, baseOffset));
}
}
ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
long temporaryMemoryBytes = 0;
int numRecordsConverted = 0;
for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) {
temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
if (recordBatchAndRecords.batch.magic() <= toMagic) {
buffer = Utils.ensureCapacity(buffer, buffer.position() + recordBatchAndRecords.batch.sizeInBytes());
recordBatchAndRecords.batch.writeTo(buffer);
} else {
MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords);
buffer = builder.buffer();
temporaryMemoryBytes += builder.uncompressedBytesWritten();
numRecordsConverted += builder.numRecords();
}
}
buffer.flip();
RecordValidationStats stats = new RecordValidationStats(temporaryMemoryBytes, numRecordsConverted,
time.nanoseconds() - startNanos);
return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats);
}
/**
* Return a buffer containing the converted record batches. The returned buffer may not be the same as the received
* one (e.g. it may require expansion).
*/
private static MemoryRecordsBuilder convertRecordBatch(byte magic, ByteBuffer buffer, RecordBatchAndRecords recordBatchAndRecords) {
RecordBatch batch = recordBatchAndRecords.batch;
final TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, Compression.of(batch.compressionType()).build(),
timestampType, recordBatchAndRecords.baseOffset, logAppendTime);
for (Record record : recordBatchAndRecords.records) {
// Down-convert this record. Ignore headers when down-converting to V0 and V1 since they are not supported
if (magic > RecordBatch.MAGIC_VALUE_V1)
builder.append(record);
else
builder.appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value());
}
builder.close();
return builder;
}
private static class RecordBatchAndRecords {
private final RecordBatch batch;
private final List<Record> records;
private final Long baseOffset;
private RecordBatchAndRecords(RecordBatch batch, List<Record> records, Long baseOffset) {
this.batch = batch;
this.records = records;
this.baseOffset = baseOffset;
}
}
}

View File

@ -18,12 +18,9 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -40,18 +37,14 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile; import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@ -74,12 +67,10 @@ public class FileRecordsTest {
"ijkl".getBytes() "ijkl".getBytes()
}; };
private FileRecords fileRecords; private FileRecords fileRecords;
private Time time;
@BeforeEach @BeforeEach
public void setup() throws IOException { public void setup() throws IOException {
this.fileRecords = createFileRecords(values); this.fileRecords = createFileRecords(values);
this.time = new MockTime();
} }
@AfterEach @AfterEach
@ -416,17 +407,6 @@ public class FileRecordsTest {
assertEquals(oldPosition, tempReopen.length()); assertEquals(oldPosition, tempReopen.length());
} }
@Test
public void testFormatConversionWithPartialMessage() throws IOException {
RecordBatch batch = batches(fileRecords).get(1);
int start = fileRecords.searchForOffsetWithSize(1, 0).position;
int size = batch.sizeInBytes();
FileRecords slice = fileRecords.slice(start, size - 1);
Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0, time).records();
assertTrue(batches(messageV0).isEmpty(), "No message should be there");
assertEquals(size - 1, messageV0.sizeInBytes(), "There should be " + (size - 1) + " bytes");
}
@Test @Test
public void testSearchForTimestamp() throws IOException { public void testSearchForTimestamp() throws IOException {
for (RecordVersion version : RecordVersion.values()) { for (RecordVersion version : RecordVersion.values()) {
@ -509,39 +489,6 @@ public class FileRecordsTest {
fileRecords.append(builder.build()); fileRecords.append(builder.build());
} }
@Test
public void testDownconversionAfterMessageFormatDowngrade() throws IOException {
// random bytes
Random random = new Random();
byte[] bytes = new byte[3000];
random.nextBytes(bytes);
// records
GzipCompression compression = Compression.gzip().build();
List<Long> offsets = asList(0L, 1L);
List<Byte> magic = asList(RecordBatch.MAGIC_VALUE_V2, RecordBatch.MAGIC_VALUE_V1); // downgrade message format from v2 to v1
List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), bytes),
new SimpleRecord(2L, "k2".getBytes(), bytes));
byte toMagic = 1;
// create MemoryRecords
ByteBuffer buffer = ByteBuffer.allocate(8000);
for (int i = 0; i < records.size(); i++) {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic.get(i), compression, TimestampType.CREATE_TIME, 0L);
builder.appendWithOffset(offsets.get(i), records.get(i));
builder.close();
}
buffer.flip();
// create FileRecords, down-convert and verify
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush();
downConvertAndVerifyRecords(records, offsets, fileRecords, compression, toMagic, 0L, time);
}
}
@Test @Test
public void testConversion() throws IOException { public void testConversion() throws IOException {
doTestConversion(Compression.NONE, RecordBatch.MAGIC_VALUE_V0); doTestConversion(Compression.NONE, RecordBatch.MAGIC_VALUE_V0);
@ -614,7 +561,6 @@ public class FileRecordsTest {
try (FileRecords fileRecords = FileRecords.open(tempFile())) { try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.readableRecords(buffer)); fileRecords.append(MemoryRecords.readableRecords(buffer));
fileRecords.flush(); fileRecords.flush();
downConvertAndVerifyRecords(records, offsets, fileRecords, compression, toMagic, 0L, time);
if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compression.type() == CompressionType.NONE) { if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compression.type() == CompressionType.NONE) {
long firstOffset; long firstOffset;
@ -627,77 +573,10 @@ public class FileRecordsTest {
int index = filteredOffsets.indexOf(firstOffset) - 1; int index = filteredOffsets.indexOf(firstOffset) - 1;
filteredRecords.remove(index); filteredRecords.remove(index);
filteredOffsets.remove(index); filteredOffsets.remove(index);
downConvertAndVerifyRecords(filteredRecords, filteredOffsets, fileRecords, compression, toMagic, firstOffset, time);
} else {
// firstOffset doesn't have any effect in this case
downConvertAndVerifyRecords(records, offsets, fileRecords, compression, toMagic, 10L, time);
} }
} }
} }
private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
FileRecords fileRecords,
Compression compression,
byte toMagic,
long firstOffset,
Time time) {
long minBatchSize = Long.MAX_VALUE;
long maxBatchSize = Long.MIN_VALUE;
for (RecordBatch batch : fileRecords.batches()) {
minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
}
// Test the normal down-conversion path
List<Records> convertedRecords = new ArrayList<>();
convertedRecords.add(fileRecords.downConvert(toMagic, firstOffset, time).records());
verifyConvertedRecords(initialRecords, initialOffsets, convertedRecords, compression, toMagic);
convertedRecords.clear();
}
private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
List<Records> convertedRecordsList,
Compression compression,
byte magicByte) {
int i = 0;
for (Records convertedRecords : convertedRecordsList) {
for (RecordBatch batch : convertedRecords.batches()) {
assertTrue(batch.magic() <= magicByte, "Magic byte should be lower than or equal to " + magicByte);
if (batch.magic() == RecordBatch.MAGIC_VALUE_V0)
assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
else
assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
assertEquals(compression.type(), batch.compressionType(), "Compression type should not be affected by conversion");
for (Record record : batch) {
assertTrue(record.hasMagic(batch.magic()), "Inner record should have magic " + magicByte);
assertEquals(initialOffsets.get(i).longValue(), record.offset(), "Offset should not change");
assertEquals(utf8(initialRecords.get(i).key()), utf8(record.key()), "Key should not change");
assertEquals(utf8(initialRecords.get(i).value()), utf8(record.value()), "Value should not change");
assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertTrue(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
} else if (batch.magic() == RecordBatch.MAGIC_VALUE_V1) {
assertEquals(initialRecords.get(i).timestamp(), record.timestamp(), "Timestamp should not change");
assertTrue(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
} else {
assertEquals(initialRecords.get(i).timestamp(), record.timestamp(), "Timestamp should not change");
assertFalse(record.hasTimestampType(TimestampType.CREATE_TIME));
assertFalse(record.hasTimestampType(TimestampType.NO_TIMESTAMP_TYPE));
assertArrayEquals(initialRecords.get(i).headers(), record.headers(), "Headers should not change");
}
i += 1;
}
}
}
assertEquals(initialOffsets.size(), i);
}
private static List<RecordBatch> batches(Records buffer) { private static List<RecordBatch> batches(Records buffer) {
return TestUtils.toList(buffer.batches()); return TestUtils.toList(buffer.batches());
} }

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.record; package org.apache.kafka.common.record;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.BufferSupplier;
@ -33,7 +32,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.EnumSource;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -50,7 +48,6 @@ import static java.util.Arrays.asList;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -535,153 +532,6 @@ public class MemoryRecordsBuilderTest {
"b".getBytes(), null)); "b".getBytes(), null));
} }
@ParameterizedTest
@EnumSource(CompressionType.class)
public void convertV2ToV1UsingMixedCreateAndLogAppendTime(CompressionType compressionType) {
ByteBuffer buffer = ByteBuffer.allocate(512);
Compression compression = Compression.of(compressionType).build();
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
compression, TimestampType.LOG_APPEND_TIME, 0L);
builder.append(10L, "1".getBytes(), "a".getBytes());
builder.close();
int sizeExcludingTxnMarkers = buffer.position();
MemoryRecords.writeEndTransactionalMarker(buffer, 1L, System.currentTimeMillis(), 0, 15L, (short) 0,
new EndTransactionMarker(ControlRecordType.ABORT, 0));
int position = buffer.position();
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compression,
TimestampType.CREATE_TIME, 1L);
builder.append(12L, "2".getBytes(), "b".getBytes());
builder.append(13L, "3".getBytes(), "c".getBytes());
builder.close();
sizeExcludingTxnMarkers += buffer.position() - position;
MemoryRecords.writeEndTransactionalMarker(buffer, 14L, System.currentTimeMillis(), 0, 1L, (short) 0,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
buffer.flip();
Supplier<ConvertedRecords<MemoryRecords>> convertedRecordsSupplier = () ->
MemoryRecords.readableRecords(buffer).downConvert(MAGIC_VALUE_V1, 0, time);
if (compression.type() != CompressionType.ZSTD) {
ConvertedRecords<MemoryRecords> convertedRecords = convertedRecordsSupplier.get();
MemoryRecords records = convertedRecords.records();
// Transactional markers are skipped when down converting to V1, so exclude them from size
verifyRecordsProcessingStats(compression, convertedRecords.recordConversionStats(),
3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compression.type() != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
} else {
assertEquals(3, batches.size());
assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
}
List<Record> logRecords = Utils.toList(records.records().iterator());
assertEquals(3, logRecords.size());
assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
} else {
Exception e = assertThrows(UnsupportedCompressionTypeException.class, convertedRecordsSupplier::get);
assertEquals("Down-conversion of zstandard-compressed batches is not supported", e.getMessage());
}
}
@ParameterizedTest
@EnumSource(CompressionType.class)
public void convertToV1WithMixedV0AndV2Data(CompressionType compressionType) {
ByteBuffer buffer = ByteBuffer.allocate(512);
Compression compression = Compression.of(compressionType).build();
Supplier<MemoryRecordsBuilder> supplier = () -> MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
compression, TimestampType.NO_TIMESTAMP_TYPE, 0L);
if (compressionType == CompressionType.ZSTD) {
assertThrows(IllegalArgumentException.class, supplier::get);
} else {
MemoryRecordsBuilder builder = supplier.get();
builder.append(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "a".getBytes());
builder.close();
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compression,
TimestampType.CREATE_TIME, 1L);
builder.append(11L, "2".getBytes(), "b".getBytes());
builder.append(12L, "3".getBytes(), "c".getBytes());
builder.close();
buffer.flip();
ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
.downConvert(MAGIC_VALUE_V1, 0, time);
MemoryRecords records = convertedRecords.records();
verifyRecordsProcessingStats(compression, convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
if (compressionType != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
} else {
assertEquals(3, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
assertEquals(MAGIC_VALUE_V1, batches.get(2).magic());
assertEquals(2, batches.get(2).baseOffset());
}
List<Record> logRecords = Utils.toList(records.records().iterator());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(MAGIC_VALUE_V1, 2L, time);
records = convertedRecords.records();
batches = Utils.toList(records.batches().iterator());
logRecords = Utils.toList(records.records().iterator());
if (compressionType != CompressionType.NONE) {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(1, batches.get(1).baseOffset());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("2", utf8(logRecords.get(1).key()));
assertEquals("3", utf8(logRecords.get(2).key()));
verifyRecordsProcessingStats(compression, convertedRecords.recordConversionStats(), 3, 2,
records.sizeInBytes(), buffer.limit());
} else {
assertEquals(2, batches.size());
assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
assertEquals(0, batches.get(0).baseOffset());
assertEquals(MAGIC_VALUE_V1, batches.get(1).magic());
assertEquals(2, batches.get(1).baseOffset());
assertEquals("1", utf8(logRecords.get(0).key()));
assertEquals("3", utf8(logRecords.get(1).key()));
verifyRecordsProcessingStats(compression, convertedRecords.recordConversionStats(), 3, 1,
records.sizeInBytes(), buffer.limit());
}
}
}
@ParameterizedTest @ParameterizedTest
@ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted(Args args) { public void shouldThrowIllegalStateExceptionOnBuildWhenAborted(Args args) {