KAFKA-17265 Fix flaky MemoryRecordsBuilderTest#testBuffersDereferencedOnClose (#17092)

Reviewers: TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po Tseng 2024-09-05 19:47:16 +08:00 committed by GitHub
parent 887b947d8a
commit 190df07ace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 30 deletions

View File

@ -41,7 +41,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -734,36 +733,17 @@ public class MemoryRecordsBuilderTest {
@ParameterizedTest
@ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class)
public void testBuffersDereferencedOnClose(Args args) {
Runtime runtime = Runtime.getRuntime();
int payloadLen = 1024 * 1024;
ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
byte[] key = new byte[0];
byte[] value = new byte[payloadLen];
new Random().nextBytes(value); // Use random payload so that compressed buffer is large
List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
long startMem = 0;
long memUsed = 0;
int iterations = 0;
while (iterations++ < 100) {
buffer.rewind();
public void shouldThrowIllegalStateExceptionOnAppendWhenClosed(Args args) {
ByteBuffer buffer = allocateBuffer(128, args);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compression,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false,
RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
builder.append(1L, key, value);
RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.build();
builders.add(builder);
System.gc();
memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
// Ignore memory usage during initialization
if (iterations == 2)
startMem = memUsed;
else if (iterations > 2 && memUsed < (iterations - 2) * 1024L)
break;
}
assertTrue(iterations < 100, "Memory usage too high: " + memUsed);
assertEquals("Tried to append a record, but MemoryRecordsBuilder is closed for record appends",
assertThrows(IllegalStateException.class, () -> builder.append(0L, "a".getBytes(), "1".getBytes())).getMessage());
}
@ParameterizedTest