mirror of https://github.com/apache/kafka.git
MINOR: Fixed ProducerPerformance still counting successful sending when sending failed (#13348)
Reviewers: Luke Chen <showuon@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d01662fb0f
commit
8e4c0d0b04
|
@ -94,7 +94,7 @@ public class ProducerPerformance {
|
|||
}
|
||||
Random random = new Random(0);
|
||||
ProducerRecord<byte[], byte[]> record;
|
||||
Stats stats = new Stats(numRecords, 5000);
|
||||
stats = new Stats(numRecords, 5000);
|
||||
long startMs = System.currentTimeMillis();
|
||||
|
||||
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
|
||||
|
@ -113,7 +113,7 @@ public class ProducerPerformance {
|
|||
record = new ProducerRecord<>(topicName, payload);
|
||||
|
||||
long sendStartMs = System.currentTimeMillis();
|
||||
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
|
||||
cb = stats.nextCompletion(sendStartMs, payload.length);
|
||||
producer.send(record, cb);
|
||||
|
||||
currentTransactionSize++;
|
||||
|
@ -164,6 +164,10 @@ public class ProducerPerformance {
|
|||
return new KafkaProducer<>(props);
|
||||
}
|
||||
|
||||
Callback cb;
|
||||
|
||||
Stats stats;
|
||||
|
||||
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
|
||||
Random random) {
|
||||
if (!payloadByteList.isEmpty()) {
|
||||
|
@ -383,9 +387,16 @@ public class ProducerPerformance {
|
|||
}
|
||||
}
|
||||
|
||||
public Callback nextCompletion(long start, int bytes, Stats stats) {
|
||||
Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
|
||||
this.iteration++;
|
||||
public long totalCount() {
|
||||
return this.count;
|
||||
}
|
||||
|
||||
public long currentWindowCount() {
|
||||
return this.windowCount;
|
||||
}
|
||||
|
||||
public Callback nextCompletion(long start, int bytes) {
|
||||
Callback cb = new PerfCallback(this.iteration, start, bytes, this);
|
||||
return cb;
|
||||
}
|
||||
|
||||
|
@ -454,7 +465,12 @@ public class ProducerPerformance {
|
|||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
long now = System.currentTimeMillis();
|
||||
int latency = (int) (now - start);
|
||||
this.stats.record(iteration, latency, bytes, now);
|
||||
// It will only be counted when the sending is successful, otherwise the number of sent records may be
|
||||
// magically printed when the sending fails.
|
||||
if (exception == null) {
|
||||
this.stats.record(iteration, latency, bytes, now);
|
||||
this.stats.iteration++;
|
||||
}
|
||||
if (exception != null)
|
||||
exception.printStackTrace();
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.kafka.tools;
|
||||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.common.errors.AuthorizationException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
|
@ -42,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -104,6 +106,49 @@ public class ProducerPerformanceTest {
|
|||
verify(producerMock, times(1)).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumberOfSuccessfulSendAndClose() throws IOException {
|
||||
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
|
||||
doAnswer(invocation -> {
|
||||
producerPerformanceSpy.cb.onCompletion(null, null);
|
||||
return null;
|
||||
}).when(producerMock).send(any(), any());
|
||||
|
||||
String[] args = new String[] {
|
||||
"--topic", "Hello-Kafka",
|
||||
"--num-records", "10",
|
||||
"--throughput", "1",
|
||||
"--record-size", "100",
|
||||
"--producer-props", "bootstrap.servers=localhost:9000"};
|
||||
producerPerformanceSpy.start(args);
|
||||
|
||||
verify(producerMock, times(10)).send(any(), any());
|
||||
assertEquals(10, producerPerformanceSpy.stats.totalCount());
|
||||
verify(producerMock, times(1)).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumberOfFailedSendAndClose() throws IOException {
|
||||
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
|
||||
doAnswer(invocation -> {
|
||||
producerPerformanceSpy.cb.onCompletion(null, new AuthorizationException("not authorized."));
|
||||
return null;
|
||||
}).when(producerMock).send(any(), any());
|
||||
|
||||
String[] args = new String[] {
|
||||
"--topic", "Hello-Kafka",
|
||||
"--num-records", "10",
|
||||
"--throughput", "1",
|
||||
"--record-size", "100",
|
||||
"--producer-props", "bootstrap.servers=localhost:9000"};
|
||||
producerPerformanceSpy.start(args);
|
||||
|
||||
verify(producerMock, times(10)).send(any(), any());
|
||||
assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
|
||||
assertEquals(0, producerPerformanceSpy.stats.totalCount());
|
||||
verify(producerMock, times(1)).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnexpectedArg() {
|
||||
String[] args = new String[] {
|
||||
|
|
Loading…
Reference in New Issue