KAFKA-18115; Fix for loading big files while performing load tests (#18391)
CI / build (push) Waiting to run Details

When performing perf tests, we can specify a payload using the
"--payloadFile" flag. This file is utilized during the load/performance
testing process. This causes the entire file to get loaded into a String
and split using the delimiter. However, if the file is large, it may
result in  NegativeArraySizeException error.

Moving the file loading logic to Scanner which doesn't have this issue.

Reviewers: José Armando García Sancio <jsancio@apache.org>, Ken Huang
 <s7133700@gmail.com>, Zhe Guang <zheguang.zhao@alumni.brown.edu>
This commit is contained in:
Manoj 2025-05-09 02:38:36 +05:30 committed by GitHub
parent 99ecd5ca08
commit b5c468fd7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 86 additions and 6 deletions

View File

@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Scanner;
import java.util.SplittableRandom; import java.util.SplittableRandom;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -194,14 +195,18 @@ public class ProducerPerformance {
throw new IllegalArgumentException("File does not exist or empty file provided."); throw new IllegalArgumentException("File does not exist or empty file provided.");
} }
String[] payloadList = Files.readString(path).split(payloadDelimiter); try (Scanner payLoadScanner = new Scanner(path, StandardCharsets.UTF_8)) {
//setting the delimiter while parsing the file, avoids loading entire data in memory before split
System.out.println("Number of messages read: " + payloadList.length); payLoadScanner.useDelimiter(payloadDelimiter);
while (payLoadScanner.hasNext()) {
for (String payload : payloadList) { byte[] payloadBytes = payLoadScanner.next().getBytes(StandardCharsets.UTF_8);
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); payloadByteList.add(payloadBytes);
} }
} }
System.out.println("Number of messages read: " + payloadByteList.size());
}
return payloadByteList; return payloadByteList;
} }

View File

@ -104,6 +104,81 @@ public class ProducerPerformanceTest {
Utils.delete(producerConfig); Utils.delete(producerConfig);
} }
@Test
public void testReadPayloadFileWithAlternateDelimiters() throws Exception {
List<byte[]> payloadByteList;
payloadByteList = generateListFromFileUsingDelimiter("Hello~~Kafka", "~~");
assertEquals(2, payloadByteList.size());
assertEquals("Hello", new String(payloadByteList.get(0)));
assertEquals("Kafka", new String(payloadByteList.get(1)));
payloadByteList = generateListFromFileUsingDelimiter("Hello,Kafka,", ",");
assertEquals(2, payloadByteList.size());
assertEquals("Hello", new String(payloadByteList.get(0)));
assertEquals("Kafka", new String(payloadByteList.get(1)));
payloadByteList = generateListFromFileUsingDelimiter("Hello\t\tKafka", "\t");
assertEquals(3, payloadByteList.size());
assertEquals("Hello", new String(payloadByteList.get(0)));
assertEquals("Kafka", new String(payloadByteList.get(2)));
payloadByteList = generateListFromFileUsingDelimiter("Hello\n\nKafka\n", "\n");
assertEquals(3, payloadByteList.size());
assertEquals("Hello", new String(payloadByteList.get(0)));
assertEquals("Kafka", new String(payloadByteList.get(2)));
payloadByteList = generateListFromFileUsingDelimiter("Hello::Kafka::World", "\\s*::\\s*");
assertEquals(3, payloadByteList.size());
assertEquals("Hello", new String(payloadByteList.get(0)));
assertEquals("Kafka", new String(payloadByteList.get(1)));
}
@Test
public void testCompareStringSplitWithScannerDelimiter() throws Exception {
String contents = "Hello~~Kafka";
String payloadDelimiter = "~~";
compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter));
contents = "Hello,Kafka,";
payloadDelimiter = ",";
compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter));
contents = "Hello\t\tKafka";
payloadDelimiter = "\t";
compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter));
contents = "Hello\n\nKafka\n";
payloadDelimiter = "\n";
compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter));
contents = "Hello::Kafka::World";
payloadDelimiter = "\\s*::\\s*";
compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter));
}
private void compareList(List<byte[]> payloadByteList, String[] payloadByteListFromSplit) {
assertEquals(payloadByteListFromSplit.length, payloadByteList.size());
for (int i = 0; i < payloadByteListFromSplit.length; i++) {
assertEquals(payloadByteListFromSplit[i], new String(payloadByteList.get(i)));
}
}
private List<byte[]> generateListFromFileUsingDelimiter(String fileContent, String payloadDelimiter) throws Exception {
File payloadFile = null;
List<byte[]> payloadByteList;
try {
payloadFile = createTempFile(fileContent);
payloadByteList = ProducerPerformance.readPayloadFile(payloadFile.getAbsolutePath(), payloadDelimiter);
} finally {
Utils.delete(payloadFile);
}
return payloadByteList;
}
@Test @Test
public void testNumberOfCallsForSendAndClose() throws IOException { public void testNumberOfCallsForSendAndClose() throws IOException {
doReturn(null).when(producerMock).send(any(), any()); doReturn(null).when(producerMock).send(any(), any());