From fba01c42c80ba6983ab920b2005da24a00f7a657 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Wed, 23 Jul 2025 22:07:26 -0700 Subject: [PATCH] KAFKA-17645 Enable warmup in producer performance test (KIP-1052) (#17340) In order to better analyze steady-state performance of Kafka, this PR enables a warmup in the Producer Performance test. The warmup duration is specified as a number of records that are a subset of the total numRecords. If warmup records is greater than 0, the warmup is represented by a second Stats object which holds warmup results. Once warmup records have been exhausted, the test switches to using the existing Stats object. At end of test, if warmup was enabled, the summary of the whole test (warump + steady state) is printed followed by the summary of the steady-state portion of the test. If no warmup is used, summary prints don't change from existing behavior. This contribution is an original work and is licensed to the Kafka project under the Apache license Testing strategy comprises new Java unit tests added to ProducerPerformanceTests.java. Reviewers: Kirk True , Federico Valeri , Chia-Ping Tsai --- .../kafka/tools/ProducerPerformance.java | 75 +++++++++++++++--- .../kafka/tools/ProducerPerformanceTest.java | 79 ++++++++++++++++++- 2 files changed, 139 insertions(+), 15 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index e90bbff8a62..d6ed1d0a4ef 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -76,7 +76,12 @@ public class ProducerPerformance { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; - stats = new Stats(config.numRecords, 5000); + + if (config.warmupRecords > 0) { + System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); + } + boolean isSteadyState = false; + stats = new Stats(config.numRecords, isSteadyState); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -95,7 +100,11 @@ public class ProducerPerformance { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - cb = new PerfCallback(sendStartMs, payload.length, stats); + if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) { + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState); + stats.suppressPrinting(); + } + cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); producer.send(record, cb); currentTransactionSize++; @@ -117,6 +126,10 @@ public class ProducerPerformance { /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } } else { // Make sure all messages are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py @@ -125,6 +138,10 @@ public class ProducerPerformance { /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } /* print out metrics */ ToolsUtils.printMetrics(producer.metrics()); @@ -147,8 +164,8 @@ public class ProducerPerformance { } Callback cb; - Stats stats; + Stats steadyStateStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { @@ -164,7 +181,7 @@ public class ProducerPerformance { } return payload; } - + static Properties readProps(List producerProps, String producerConfig) throws IOException { Properties props = new Properties(); if (producerConfig != null) { @@ -331,6 +348,16 @@ public class ProducerPerformance { "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + "the default value will be 3000."); + parser.addArgument("--warmup-records") + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + return parser; } @@ -351,8 +378,10 @@ public class ProducerPerformance { private long windowTotalLatency; private long windowBytes; private long windowStart; + private final boolean isSteadyState; + private boolean suppressPrint; - public Stats(long numRecords, int reportingInterval) { + public Stats(long numRecords, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -365,7 +394,9 @@ public class ProducerPerformance { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = reportingInterval; + this.reportingInterval = 5000; + this.isSteadyState = isSteadyState; + this.suppressPrint = false; } public void record(int latency, int bytes, long time) { @@ -383,9 +414,15 @@ public class ProducerPerformance { } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { - printWindow(); + if (this.isSteadyState && count == windowCount) { + System.out.println("In steady state."); + } + if (!this.suppressPrint) { + printWindow(); + } newWindow(); } + this.iteration++; } public long totalCount() { @@ -433,8 +470,9 @@ public class ProducerPerformance { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, + this.isSteadyState ? " steady state" : "", recsPerSec, mbPerSec, totalLatency / (double) count, @@ -455,16 +493,22 @@ public class ProducerPerformance { } return values; } + + public void suppressPrinting() { + this.suppressPrint = true; + } } static final class PerfCallback implements Callback { private final long start; private final int bytes; private final Stats stats; + private final Stats steadyStateStats; - public PerfCallback(long start, int bytes, Stats stats) { + public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { this.start = start; this.stats = stats; + this.steadyStateStats = steadyStateStats; this.bytes = bytes; } @@ -475,7 +519,9 @@ public class ProducerPerformance { // magically printed when the sending fails. if (exception == null) { this.stats.record(latency, bytes, now); - this.stats.iteration++; + if (steadyStateStats != null) { + this.steadyStateStats.record(latency, bytes, now); + } } if (exception != null) exception.printStackTrace(); @@ -484,7 +530,8 @@ public class ProducerPerformance { static final class ConfigPostProcessor { final String topicName; - final Long numRecords; + final long numRecords; + final long warmupRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -498,6 +545,7 @@ public class ProducerPerformance { Namespace namespace = parser.parseArgs(args); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); + this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); this.recordSize = namespace.getInt("recordSize"); this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); @@ -508,9 +556,12 @@ public class ProducerPerformance { String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); - if (numRecords != null && numRecords <= 0) { + if (numRecords <= 0) { throw new ArgumentParserException("--num-records should be greater than zero", parser); } + if (warmupRecords >= numRecords) { + throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); + } if (recordSize != null && recordSize <= 0) { throw new ArgumentParserException("--record-size should be greater than zero", parser); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 9117daabe33..0ec9c733164 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -381,16 +381,16 @@ public class ProducerPerformanceTest { @Test public void testStatsInitializationWithLargeNumRecords() { long numRecords = Long.MAX_VALUE; - assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000)); + assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false)); } @Test public void testStatsCorrectness() throws Exception { ExecutorService singleThreaded = Executors.newSingleThreadExecutor(); final long numRecords = 1000000; - ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000); + ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false); for (long i = 0; i < numRecords; i++) { - final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats); + final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null); CompletableFuture.runAsync(() -> { callback.onCompletion(null, null); }, singleThreaded); @@ -567,4 +567,77 @@ public class ProducerPerformanceTest { assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString() .startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX)); } + + @Test + public void testWarmupRecordsFractionalValue() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "1.5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + thrown.printStackTrace(); + } + + @Test + public void testWarmupRecordsString() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "foo", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + thrown.printStackTrace(); + } + + @Test + public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "2", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount()); + verify(producerMock, times(1)).close(); + } + + @Test + public void testWarmupNegativeRecordsNormalTest() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "-1", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } }