KAFKA-9439: add KafkaProducer API unit tests (#8174)

Add unit tests for KafkaProducer.close(), KafkaProducer.abortTransaction(), and KafkaProducer.flush() in the KafkaProducerTest.

Increase KafkaProducer test code coverage from 82% methods, 82% lines to 86% methods, 87% lines when being merged.

Reviewers: Boyang Chen <boyang@confluent.io>
This commit is contained in:
Jeff Kim 2020-06-23 14:07:14 -07:00 committed by GitHub
parent 14137def71
commit a047a7c0eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 0 deletions

View File

@ -703,6 +703,40 @@ public class KafkaProducerTest {
producer.close();
}
@Test
public void closeWithNegativeTimestampShouldThrow() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
try (Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
assertThrows(IllegalArgumentException.class, () -> producer.close(Duration.ofMillis(-100)));
}
}
@Test
public void testFlushCompleteSendOfInflightBatches() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
ArrayList<Future<RecordMetadata>> futureResponses = new ArrayList<>();
for (int i = 0; i < 50; i++) {
Future<RecordMetadata> response = producer.send(new ProducerRecord<>("topic", "value" + i));
futureResponses.add(response);
}
futureResponses.forEach(res -> assertTrue(!res.isDone()));
producer.flush();
futureResponses.forEach(res -> assertTrue(res.isDone()));
}
}
@Test
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
@ -816,6 +850,31 @@ public class KafkaProducerTest {
}
}
@Test
public void testAbortTransaction() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
client.prepareResponse(endTxnResponse(Errors.NONE));
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions();
producer.beginTransaction();
producer.abortTransaction();
}
}
@Test
public void testSendTxnOffsetsWithGroupId() {
Map<String, Object> configs = new HashMap<>();