diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index e90bbff8a62..d6ed1d0a4ef 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -76,7 +76,12 @@ public class ProducerPerformance { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; - stats = new Stats(config.numRecords, 5000); + + if (config.warmupRecords > 0) { + System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); + } + boolean isSteadyState = false; + stats = new Stats(config.numRecords, isSteadyState); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -95,7 +100,11 @@ public class ProducerPerformance { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - cb = new PerfCallback(sendStartMs, payload.length, stats); + if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) { + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState); + stats.suppressPrinting(); + } + cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); producer.send(record, cb); currentTransactionSize++; @@ -117,6 +126,10 @@ public class ProducerPerformance { /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } } else { // Make sure all messages are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py @@ -125,6 +138,10 @@ public class ProducerPerformance { /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } /* print out metrics */ ToolsUtils.printMetrics(producer.metrics()); @@ -147,8 +164,8 @@ public class ProducerPerformance { } Callback cb; - Stats stats; + Stats steadyStateStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { @@ -164,7 +181,7 @@ public class ProducerPerformance { } return payload; } - + static Properties readProps(List producerProps, String producerConfig) throws IOException { Properties props = new Properties(); if (producerConfig != null) { @@ -331,6 +348,16 @@ public class ProducerPerformance { "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + "the default value will be 3000."); + parser.addArgument("--warmup-records") + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + return parser; } @@ -351,8 +378,10 @@ public class ProducerPerformance { private long windowTotalLatency; private long windowBytes; private long windowStart; + private final boolean isSteadyState; + private boolean suppressPrint; - public Stats(long numRecords, int reportingInterval) { + public Stats(long numRecords, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -365,7 +394,9 @@ public class ProducerPerformance { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = reportingInterval; + this.reportingInterval = 5000; + this.isSteadyState = isSteadyState; + this.suppressPrint = false; } public void record(int latency, int bytes, long time) { @@ -383,9 +414,15 @@ public class ProducerPerformance { } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { - printWindow(); + if (this.isSteadyState && count == windowCount) { + System.out.println("In steady state."); + } + if (!this.suppressPrint) { + printWindow(); + } newWindow(); } + this.iteration++; } public long totalCount() { @@ -433,8 +470,9 @@ public class ProducerPerformance { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, + this.isSteadyState ? " steady state" : "", recsPerSec, mbPerSec, totalLatency / (double) count, @@ -455,16 +493,22 @@ public class ProducerPerformance { } return values; } + + public void suppressPrinting() { + this.suppressPrint = true; + } } static final class PerfCallback implements Callback { private final long start; private final int bytes; private final Stats stats; + private final Stats steadyStateStats; - public PerfCallback(long start, int bytes, Stats stats) { + public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { this.start = start; this.stats = stats; + this.steadyStateStats = steadyStateStats; this.bytes = bytes; } @@ -475,7 +519,9 @@ public class ProducerPerformance { // magically printed when the sending fails. if (exception == null) { this.stats.record(latency, bytes, now); - this.stats.iteration++; + if (steadyStateStats != null) { + this.steadyStateStats.record(latency, bytes, now); + } } if (exception != null) exception.printStackTrace(); @@ -484,7 +530,8 @@ public class ProducerPerformance { static final class ConfigPostProcessor { final String topicName; - final Long numRecords; + final long numRecords; + final long warmupRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -498,6 +545,7 @@ public class ProducerPerformance { Namespace namespace = parser.parseArgs(args); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); + this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); this.recordSize = namespace.getInt("recordSize"); this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); @@ -508,9 +556,12 @@ public class ProducerPerformance { String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); - if (numRecords != null && numRecords <= 0) { + if (numRecords <= 0) { throw new ArgumentParserException("--num-records should be greater than zero", parser); } + if (warmupRecords >= numRecords) { + throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); + } if (recordSize != null && recordSize <= 0) { throw new ArgumentParserException("--record-size should be greater than zero", parser); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 9117daabe33..0ec9c733164 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -381,16 +381,16 @@ public class ProducerPerformanceTest { @Test public void testStatsInitializationWithLargeNumRecords() { long numRecords = Long.MAX_VALUE; - assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000)); + assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false)); } @Test public void testStatsCorrectness() throws Exception { ExecutorService singleThreaded = Executors.newSingleThreadExecutor(); final long numRecords = 1000000; - ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000); + ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false); for (long i = 0; i < numRecords; i++) { - final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats); + final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null); CompletableFuture.runAsync(() -> { callback.onCompletion(null, null); }, singleThreaded); @@ -567,4 +567,77 @@ public class ProducerPerformanceTest { assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString() .startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX)); } + + @Test + public void testWarmupRecordsFractionalValue() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "1.5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + thrown.printStackTrace(); + } + + @Test + public void testWarmupRecordsString() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "foo", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + thrown.printStackTrace(); + } + + @Test + public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "2", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount()); + verify(producerMock, times(1)).close(); + } + + @Test + public void testWarmupNegativeRecordsNormalTest() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "-1", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } }