KAFKA-14812:ProducerPerformance still counting successful sending in console when sending failed (#13404)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
hudeqi 2023-03-21 16:59:18 +08:00 committed by GitHub
parent ccda370e95
commit aef004edee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 107 additions and 13 deletions

View File

@ -94,7 +94,7 @@ public class ProducerPerformance {
} }
Random random = new Random(0); Random random = new Random(0);
ProducerRecord<byte[], byte[]> record; ProducerRecord<byte[], byte[]> record;
Stats stats = new Stats(numRecords, 5000); stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis(); long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
@ -113,7 +113,7 @@ public class ProducerPerformance {
record = new ProducerRecord<>(topicName, payload); record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis(); long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); cb = new PerfCallback(sendStartMs, payload.length, stats);
producer.send(record, cb); producer.send(record, cb);
currentTransactionSize++; currentTransactionSize++;
@ -164,6 +164,10 @@ public class ProducerPerformance {
return new KafkaProducer<>(props); return new KafkaProducer<>(props);
} }
Callback cb;
Stats stats;
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload, static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
Random random) { Random random) {
if (!payloadByteList.isEmpty()) { if (!payloadByteList.isEmpty()) {
@ -363,7 +367,7 @@ public class ProducerPerformance {
this.reportingInterval = reportingInterval; this.reportingInterval = reportingInterval;
} }
public void record(long iter, int latency, int bytes, long time) { public void record(int latency, int bytes, long time) {
this.count++; this.count++;
this.bytes += bytes; this.bytes += bytes;
this.totalLatency += latency; this.totalLatency += latency;
@ -372,7 +376,7 @@ public class ProducerPerformance {
this.windowBytes += bytes; this.windowBytes += bytes;
this.windowTotalLatency += latency; this.windowTotalLatency += latency;
this.windowMaxLatency = Math.max(windowMaxLatency, latency); this.windowMaxLatency = Math.max(windowMaxLatency, latency);
if (iter % this.sampling == 0) { if (this.iteration % this.sampling == 0) {
this.latencies[index] = latency; this.latencies[index] = latency;
this.index++; this.index++;
} }
@ -383,10 +387,24 @@ public class ProducerPerformance {
} }
} }
public Callback nextCompletion(long start, int bytes, Stats stats) { public long totalCount() {
Callback cb = new PerfCallback(this.iteration, start, bytes, stats); return this.count;
this.iteration++; }
return cb;
public long currentWindowCount() {
return this.windowCount;
}
public long iteration() {
return this.iteration;
}
public long bytes() {
return this.bytes;
}
public int index() {
return this.index;
} }
public void printWindow() { public void printWindow() {
@ -438,23 +456,26 @@ public class ProducerPerformance {
} }
} }
private static final class PerfCallback implements Callback { static final class PerfCallback implements Callback {
private final long start; private final long start;
private final long iteration;
private final int bytes; private final int bytes;
private final Stats stats; private final Stats stats;
public PerfCallback(long iter, long start, int bytes, Stats stats) { public PerfCallback(long start, int bytes, Stats stats) {
this.start = start; this.start = start;
this.stats = stats; this.stats = stats;
this.iteration = iter;
this.bytes = bytes; this.bytes = bytes;
} }
public void onCompletion(RecordMetadata metadata, Exception exception) { public void onCompletion(RecordMetadata metadata, Exception exception) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
int latency = (int) (now - start); int latency = (int) (now - start);
this.stats.record(iteration, latency, bytes, now); // It will only be counted when the sending is successful, otherwise the number of sent records may be
// magically printed when the sending fails.
if (exception == null) {
this.stats.record(latency, bytes, now);
this.stats.iteration++;
}
if (exception != null) if (exception != null)
exception.printStackTrace(); exception.printStackTrace();
} }

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.kafka.tools; package org.apache.kafka.tools;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -35,13 +37,19 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -104,6 +112,49 @@ public class ProducerPerformanceTest {
verify(producerMock, times(1)).close(); verify(producerMock, times(1)).close();
} }
@Test
public void testNumberOfSuccessfulSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);
verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
@Test
public void testNumberOfFailedSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, new AuthorizationException("not authorized."));
return null;
}).when(producerMock).send(any(), any());
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);
verify(producerMock, times(10)).send(any(), any());
assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
assertEquals(0, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
@Test @Test
public void testUnexpectedArg() { public void testUnexpectedArg() {
String[] args = new String[] { String[] args = new String[] {
@ -181,4 +232,26 @@ public class ProducerPerformanceTest {
long numRecords = Long.MAX_VALUE; long numRecords = Long.MAX_VALUE;
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000)); assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000));
} }
@Test
public void testStatsCorrectness() throws Exception {
ExecutorService singleThreaded = Executors.newSingleThreadExecutor();
final long numRecords = 1000000;
ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000);
for (long i = 0; i < numRecords; i++) {
final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats);
CompletableFuture.runAsync(() -> {
callback.onCompletion(null, null);
}, singleThreaded);
}
singleThreaded.shutdown();
final boolean success = singleThreaded.awaitTermination(60, TimeUnit.SECONDS);
assertTrue(success, "should have terminated");
assertEquals(numRecords, stats.totalCount());
assertEquals(numRecords, stats.iteration());
assertEquals(500000, stats.index());
assertEquals(1000000 * 100, stats.bytes());
}
} }