diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 6774f235114..cf4f2b05514 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -94,7 +94,7 @@ public class ProducerPerformance { } Random random = new Random(0); ProducerRecord record; - Stats stats = new Stats(numRecords, 5000); + stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); @@ -113,7 +113,7 @@ public class ProducerPerformance { record = new ProducerRecord<>(topicName, payload); long sendStartMs = System.currentTimeMillis(); - Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); + cb = stats.nextCompletion(sendStartMs, payload.length); producer.send(record, cb); currentTransactionSize++; @@ -164,6 +164,10 @@ public class ProducerPerformance { return new KafkaProducer<>(props); } + Callback cb; + + Stats stats; + static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, Random random) { if (!payloadByteList.isEmpty()) { @@ -383,9 +387,16 @@ public class ProducerPerformance { } } - public Callback nextCompletion(long start, int bytes, Stats stats) { - Callback cb = new PerfCallback(this.iteration, start, bytes, stats); - this.iteration++; + public long totalCount() { + return this.count; + } + + public long currentWindowCount() { + return this.windowCount; + } + + public Callback nextCompletion(long start, int bytes) { + Callback cb = new PerfCallback(this.iteration, start, bytes, this); return cb; } @@ -454,7 +465,12 @@ public class ProducerPerformance { public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); - this.stats.record(iteration, latency, bytes, now); + // It will only be counted when the sending is successful, otherwise the number of sent records may be + // magically printed when the sending fails. + if (exception == null) { + this.stats.record(iteration, latency, bytes, now); + this.stats.iteration++; + } if (exception != null) exception.printStackTrace(); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index b8adf5d8eb2..aa97e269bb5 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.errors.AuthorizationException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -104,6 +106,49 @@ public class ProducerPerformanceTest { verify(producerMock, times(1)).close(); } + @Test + public void testNumberOfSuccessfulSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } + + @Test + public void testNumberOfFailedSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, new AuthorizationException("not authorized.")); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(0, producerPerformanceSpy.stats.currentWindowCount()); + assertEquals(0, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } + @Test public void testUnexpectedArg() { String[] args = new String[] {