KAFKA-7140; Remove deprecated poll usages (#5319)

Reviewers: Matthias J. Sax <mjsax@apache.org>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Viktor Somogyi 2018-08-11 07:51:17 +02:00 committed by Jason Gustafson
parent 79a2f892ca
commit 8a78d76466
15 changed files with 57 additions and 36 deletions

View File

@ -53,6 +53,7 @@ import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -441,7 +442,7 @@ class WorkerSinkTask extends WorkerTask {
}
private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs));
// Exceptions raised from the task during a rebalance should be rethrown to stop the worker
if (rebalanceException != null) {

View File

@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
@ -253,7 +254,7 @@ public class KafkaBasedLog<K, V> {
private void poll(long timeoutMs) {
try {
ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
for (ConsumerRecord<K, V> record : records)
consumedCallback.onCompletion(null, record);
} catch (WakeupException e) {

View File

@ -65,6 +65,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -180,8 +181,8 @@ public class ErrorHandlingTaskTest {
// bad json
ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
sinkTask.put(EasyMock.anyObject());
EasyMock.expectLastCall().times(2);

View File

@ -58,6 +58,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -458,7 +459,7 @@ public class WorkerSinkTaskTest {
sinkTask.open(partitions);
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -893,7 +894,7 @@ public class WorkerSinkTaskTest {
// Expect the next poll to discover and perform the rebalance, THEN complete the previous callback handler,
// and then return one record for TP1 and one for TP3.
final AtomicBoolean rebalanced = new AtomicBoolean();
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -1273,7 +1274,7 @@ public class WorkerSinkTaskTest {
sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
EasyMock.expectLastCall().andReturn(Collections.emptyMap());
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -1298,7 +1299,7 @@ public class WorkerSinkTaskTest {
sinkTask.open(partitions);
EasyMock.expectLastCall().andThrow(e);
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -1315,7 +1316,7 @@ public class WorkerSinkTaskTest {
sinkTask.open(partitions);
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
rebalanceListener.getValue().onPartitionsAssigned(partitions);
@ -1332,7 +1333,7 @@ public class WorkerSinkTaskTest {
private void expectConsumerWakeup() {
consumer.wakeup();
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException());
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new WakeupException());
}
private void expectConsumerPoll(final int numMessages) {
@ -1340,7 +1341,7 @@ public class WorkerSinkTaskTest {
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {

View File

@ -55,6 +55,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -525,7 +526,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
sinkTask.open(partitions);
EasyMock.expectLastCall();
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
rebalanceListener.getValue().onPartitionsAssigned(partitions);
@ -557,7 +558,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
// Stub out all the consumer stream/iterator responses, which we just want to verify occur,
// but don't care about the exact details here.
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -595,7 +596,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
// Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
// returning empty data, we return one record. The expectation is that the data will be ignored by the
// response behavior specified using the return value of this method.
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
@ -625,7 +626,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
final Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(TOPIC_PARTITION, startOffset);
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {

View File

@ -19,6 +19,7 @@ package kafka.tools
import java.io.PrintStream
import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
import java.util.{Collections, Locale, Properties, Random}
@ -388,7 +389,7 @@ object ConsoleConsumer extends Logging {
private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String],
consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
consumerInit()
var recordIter = consumer.poll(0).iterator
var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
def consumerInit() {
(topic, partitionId, offset, whitelist) match {
@ -432,7 +433,7 @@ object ConsoleConsumer extends Logging {
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
if (!recordIter.hasNext) {
recordIter = consumer.poll(timeoutMs).iterator
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
if (!recordIter.hasNext)
throw new TimeoutException()
}

View File

@ -28,8 +28,8 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import kafka.utils.{CommandLineUtils, ToolsUtils}
import java.util.{Collections, Properties, Random}
import java.text.SimpleDateFormat
import java.time.Duration
import com.typesafe.scalalogging.LazyLogging
@ -127,7 +127,7 @@ object ConsumerPerformance extends LazyLogging {
var currentTimeMillis = lastConsumedTime
while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
val records = consumer.poll(100).asScala
val records = consumer.poll(Duration.ofMillis(100)).asScala
currentTimeMillis = System.currentTimeMillis
if (records.nonEmpty)
lastConsumedTime = currentTimeMillis

View File

@ -18,11 +18,13 @@
package kafka.tools
import java.nio.charset.StandardCharsets
import java.util.{Arrays, Collections, Properties}
import java.time.Duration
import java.util.{Arrays, Properties}
import kafka.utils.Exit
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._
@ -69,9 +71,7 @@ object EndToEndLatency {
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumer.subscribe(Collections.singletonList(topic))
val producerProps = loadProps
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@ -82,16 +82,21 @@ object EndToEndLatency {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
// sends a dummy message to create the topic if it doesn't exist
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get()
def finalise() {
consumer.commitSync()
producer.close()
consumer.close()
}
//Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
//a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
consumer.seekToEnd(Collections.emptyList())
consumer.poll(0)
val topicPartitions = consumer.partitionsFor(topic).asScala
.map(p => new TopicPartition(p.topic(), p.partition())).asJava
consumer.assign(topicPartitions)
consumer.seekToEnd(topicPartitions)
consumer.assignment().asScala.foreach(consumer.position)
var totalTime = 0.0
val latencies = new Array[Long](numMessages)
@ -103,7 +108,7 @@ object EndToEndLatency {
//Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
val recordIter = consumer.poll(timeout).iterator
val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
val elapsed = System.nanoTime - begin

View File

@ -17,6 +17,7 @@
package kafka.tools
import java.time.Duration
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
@ -452,7 +453,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// uncommitted record since last poll. Using one second as poll's timeout ensures that
// offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset
// commit.
recordIter = consumer.poll(1000).iterator
recordIter = consumer.poll(Duration.ofSeconds(1)).iterator
if (!recordIter.hasNext)
throw new NoRecordsException
}

View File

@ -47,6 +47,7 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -57,6 +58,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch.
@ -313,10 +315,12 @@ public class StreamsResetter {
config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
client.subscribe(topicsToSubscribe);
client.poll(1);
Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
.flatMap(Collection::stream)
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
client.assign(partitions);
final Set<TopicPartition> partitions = client.assignment();
final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@ -47,7 +48,7 @@ public class Consumer extends ShutdownableThread {
@Override
public void doWork() {
consumer.subscribe(Collections.singletonList(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(1000);
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -287,7 +288,7 @@ public class TransactionalMessageCopier {
try {
producer.beginTransaction();
while (messagesInCurrentTransaction < numMessagesForNextTransaction) {
ConsumerRecords<String, String> records = consumer.poll(200L);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
for (ConsumerRecord<String, String> record : records) {
producer.send(producerRecordFromConsumerRecord(outputTopic, record));
messagesInCurrentTransaction++;

View File

@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -220,7 +221,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
consumer.subscribe(Collections.singletonList(topic), this);
while (!isFinished()) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);
if (!useAutoCommit) {

View File

@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.kafka.trogdor.task.TaskWorker;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@ -135,7 +136,7 @@ public class ConsumeBenchWorker implements TaskWorker {
long startBatchMs = startTimeMs;
try {
while (messagesConsumed < spec.maxMessages()) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
if (records.isEmpty()) {
continue;
}

View File

@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -337,7 +338,7 @@ public class RoundTripWorker implements TaskWorker {
while (true) {
try {
pollInvoked++;
ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
for (Iterator<ConsumerRecord<byte[], byte[]>> iter = records.iterator(); iter.hasNext(); ) {
ConsumerRecord<byte[], byte[]> record = iter.next();
int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();