KAFKA-3549: Close consumers instantiated in consumer tests

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1217 from granthenke/close-consumers
This commit is contained in:
Grant Henke 2016-04-14 22:02:19 -07:00 committed by Ewen Cheslack-Postava
parent 4fa456bc6e
commit 065ddf9019
5 changed files with 40 additions and 30 deletions

View File

@ -77,6 +77,8 @@ public class KafkaConsumerTest {
consumer.unsubscribe();
Assert.assertTrue(consumer.subscription().isEmpty());
Assert.assertTrue(consumer.assignment().isEmpty());
consumer.close();
}
@Test(expected = IllegalArgumentException.class)
@ -85,10 +87,13 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
try {
consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
} finally {
consumer.close();
}
}
@Test
@ -129,6 +134,8 @@ public class KafkaConsumerTest {
consumer.unsubscribe();
Assert.assertTrue(consumer.paused().isEmpty());
consumer.close();
}
private KafkaConsumer<byte[], byte[]> newConsumer() {

View File

@ -13,7 +13,6 @@
package kafka.api
import java.util
import kafka.coordinator.GroupCoordinator
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
@ -92,6 +91,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
val numRecords = 10000
sendRecords(numRecords)
@ -184,6 +184,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
consumer0.subscribe(List(topic).asJava, listener)
// the initial subscription should cause a callback execution
@ -209,8 +211,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// only expect one revocation since revoke is not invoked on initial membership
assertEquals(2, listener.callsToRevoked)
consumer0.close()
}
@Test
@ -219,20 +219,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
try {
val listener = new TestConsumerReassignmentListener()
consumer0.subscribe(List(topic).asJava, listener)
val listener = new TestConsumerReassignmentListener()
consumer0.subscribe(List(topic).asJava, listener)
// the initial subscription should cause a callback execution
while (listener.callsToAssigned == 0)
consumer0.poll(50)
// the initial subscription should cause a callback execution
while (listener.callsToAssigned == 0)
consumer0.poll(50)
consumer0.subscribe(List[String]().asJava)
assertEquals(0, consumer0.assignment.size())
} finally {
consumer0.close()
}
consumer0.subscribe(List[String]().asJava)
assertEquals(0, consumer0.assignment.size())
}
@Test
@ -240,6 +237,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
sendRecords(5)
consumer0.subscribe(List(topic).asJava)

View File

@ -48,6 +48,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
consumer0.assign(List(tp).asJava)
consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
@ -405,6 +407,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testFetchInvalidOffset() {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
// produce one record
val totalRecords = 2
@ -426,8 +429,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNotNull(outOfRangePartitions)
assertEquals(1, outOfRangePartitions.size)
assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
consumer0.close()
}
@Test
@ -435,6 +436,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val maxFetchBytes = 10 * 1024
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
// produce a record that is larger than the configured fetch size
val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
@ -450,8 +452,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(1, oversizedPartitions.size)
// the oversized message is at offset 0
assertEquals(0L, oversizedPartitions.get(tp))
consumer0.close()
}
@Test
@ -460,6 +460,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += consumer0
// create two new topics, each having 2 partitions
val topic1 = "topic1"
@ -512,13 +513,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
// add one more consumer and validate re-assignment
addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1, topic2), subscriptions)
addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic1, topic2), subscriptions)
// done with pollers and consumers
for (poller <- consumerPollers)
poller.shutdown()
for (consumer <- rrConsumers)
for (consumer <- consumers)
consumer.unsubscribe()
}
@ -688,6 +689,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
producerProps.put("mock.interceptor.append", appendStr)
val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
producers += testProducer
// producing records should succeed
testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
@ -695,6 +697,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create consumer with interceptor that has different key and value types from the consumer
this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
consumers += testConsumer
testConsumer.assign(List(tp).asJava)
testConsumer.seek(tp, 0)
@ -702,9 +706,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val records = consumeRecords(testConsumer, 1)
val record = records.get(0)
assertEquals(s"value will not be modified", new String(record.value()))
testConsumer.close()
testProducer.close()
}
def testConsumeMessagesWithCreateTime() {
@ -762,12 +763,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create one more consumer and add it to the group; we will timeout this consumer
val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer)
// Close the consumer on test teardown, unless this test will manually
if(!closeConsumer)
consumers += timeoutConsumer
val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller)
consumerPollers += timeoutPoller
// validate the initial assignment
validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
// stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
timeoutPoller.shutdown()
@ -859,6 +862,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
for (i <- 0 until consumerCount)
consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
consumers ++= consumerGroup
// create consumer pollers, wait for assignment and validate it
val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe, subscriptions)

View File

@ -253,6 +253,7 @@ public class SimpleBenchmark {
long endTime = System.currentTimeMillis();
consumer.close();
System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
}

View File

@ -278,7 +278,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
}
consumer.close();
System.out.println("-------------------");
System.out.println("Result Verification");