diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index ae6015ec18a..e90bbff8a62 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; +import java.util.Scanner; import java.util.SplittableRandom; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -194,13 +195,17 @@ public class ProducerPerformance { throw new IllegalArgumentException("File does not exist or empty file provided."); } - String[] payloadList = Files.readString(path).split(payloadDelimiter); - - System.out.println("Number of messages read: " + payloadList.length); - - for (String payload : payloadList) { - payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8)); + try (Scanner payLoadScanner = new Scanner(path, StandardCharsets.UTF_8)) { + //setting the delimiter while parsing the file, avoids loading entire data in memory before split + payLoadScanner.useDelimiter(payloadDelimiter); + while (payLoadScanner.hasNext()) { + byte[] payloadBytes = payLoadScanner.next().getBytes(StandardCharsets.UTF_8); + payloadByteList.add(payloadBytes); + } } + + System.out.println("Number of messages read: " + payloadByteList.size()); + } return payloadByteList; } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 10b790bf914..9117daabe33 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -104,6 +104,81 @@ public class ProducerPerformanceTest { Utils.delete(producerConfig); } + @Test + public void testReadPayloadFileWithAlternateDelimiters() throws Exception { + List payloadByteList; + + payloadByteList = generateListFromFileUsingDelimiter("Hello~~Kafka", "~~"); + assertEquals(2, payloadByteList.size()); + assertEquals("Hello", new String(payloadByteList.get(0))); + assertEquals("Kafka", new String(payloadByteList.get(1))); + + payloadByteList = generateListFromFileUsingDelimiter("Hello,Kafka,", ","); + assertEquals(2, payloadByteList.size()); + assertEquals("Hello", new String(payloadByteList.get(0))); + assertEquals("Kafka", new String(payloadByteList.get(1))); + + payloadByteList = generateListFromFileUsingDelimiter("Hello\t\tKafka", "\t"); + assertEquals(3, payloadByteList.size()); + assertEquals("Hello", new String(payloadByteList.get(0))); + assertEquals("Kafka", new String(payloadByteList.get(2))); + + payloadByteList = generateListFromFileUsingDelimiter("Hello\n\nKafka\n", "\n"); + assertEquals(3, payloadByteList.size()); + assertEquals("Hello", new String(payloadByteList.get(0))); + assertEquals("Kafka", new String(payloadByteList.get(2))); + + payloadByteList = generateListFromFileUsingDelimiter("Hello::Kafka::World", "\\s*::\\s*"); + assertEquals(3, payloadByteList.size()); + assertEquals("Hello", new String(payloadByteList.get(0))); + assertEquals("Kafka", new String(payloadByteList.get(1))); + + } + + @Test + public void testCompareStringSplitWithScannerDelimiter() throws Exception { + + String contents = "Hello~~Kafka"; + String payloadDelimiter = "~~"; + compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter)); + + contents = "Hello,Kafka,"; + payloadDelimiter = ","; + compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter)); + + contents = "Hello\t\tKafka"; + payloadDelimiter = "\t"; + compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter)); + + contents = "Hello\n\nKafka\n"; + payloadDelimiter = "\n"; + compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter)); + + contents = "Hello::Kafka::World"; + payloadDelimiter = "\\s*::\\s*"; + compareList(generateListFromFileUsingDelimiter(contents, payloadDelimiter), contents.split(payloadDelimiter)); + + } + + private void compareList(List payloadByteList, String[] payloadByteListFromSplit) { + assertEquals(payloadByteListFromSplit.length, payloadByteList.size()); + for (int i = 0; i < payloadByteListFromSplit.length; i++) { + assertEquals(payloadByteListFromSplit[i], new String(payloadByteList.get(i))); + } + } + + private List generateListFromFileUsingDelimiter(String fileContent, String payloadDelimiter) throws Exception { + File payloadFile = null; + List payloadByteList; + try { + payloadFile = createTempFile(fileContent); + payloadByteList = ProducerPerformance.readPayloadFile(payloadFile.getAbsolutePath(), payloadDelimiter); + } finally { + Utils.delete(payloadFile); + } + return payloadByteList; + } + @Test public void testNumberOfCallsForSendAndClose() throws IOException { doReturn(null).when(producerMock).send(any(), any());