From 8a78d76466bacd8a2a3487cc84890d29c9bc4a3d Mon Sep 17 00:00:00 2001 From: Viktor Somogyi Date: Sat, 11 Aug 2018 07:51:17 +0200 Subject: [PATCH] KAFKA-7140; Remove deprecated poll usages (#5319) Reviewers: Matthias J. Sax , Jason Gustafson --- .../kafka/connect/runtime/WorkerSinkTask.java | 3 ++- .../kafka/connect/util/KafkaBasedLog.java | 3 ++- .../runtime/ErrorHandlingTaskTest.java | 5 +++-- .../connect/runtime/WorkerSinkTaskTest.java | 15 ++++++------- .../runtime/WorkerSinkTaskThreadedTest.java | 9 ++++---- .../scala/kafka/tools/ConsoleConsumer.scala | 5 +++-- .../kafka/tools/ConsumerPerformance.scala | 4 ++-- .../scala/kafka/tools/EndToEndLatency.scala | 21 ++++++++++++------- .../main/scala/kafka/tools/MirrorMaker.scala | 3 ++- .../scala/kafka/tools/StreamsResetter.java | 10 ++++++--- .../main/java/kafka/examples/Consumer.java | 3 ++- .../tools/TransactionalMessageCopier.java | 3 ++- .../kafka/tools/VerifiableConsumer.java | 3 ++- .../trogdor/workload/ConsumeBenchWorker.java | 3 ++- .../trogdor/workload/RoundTripWorker.java | 3 ++- 15 files changed, 57 insertions(+), 36 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 47f8529e2d1..692331ed13f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -53,6 +53,7 @@ import org.apache.kafka.connect.util.SinkUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -441,7 +442,7 @@ class WorkerSinkTask extends WorkerTask { } private ConsumerRecords pollConsumer(long timeoutMs) { - ConsumerRecords msgs = consumer.poll(timeoutMs); + ConsumerRecords msgs = consumer.poll(Duration.ofMillis(timeoutMs)); // Exceptions raised from the task during a rebalance should be rethrown to stop the worker if (rebalanceException != null) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index de1ceb3be10..ea9b4c621f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Iterator; @@ -253,7 +254,7 @@ public class KafkaBasedLog { private void poll(long timeoutMs) { try { - ConsumerRecords records = consumer.poll(timeoutMs); + ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs)); for (ConsumerRecord record : records) consumedCallback.onCompletion(null, record); } catch (WakeupException e) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 1bf9c717068..6d92c34adef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -65,6 +65,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -180,8 +181,8 @@ public class ErrorHandlingTaskTest { // bad json ConsumerRecord record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes()); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1)); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2)); sinkTask.put(EasyMock.anyObject()); EasyMock.expectLastCall().times(2); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 4a7c760fc74..33ab2ef06e0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -58,6 +58,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -458,7 +459,7 @@ public class WorkerSinkTaskTest { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -893,7 +894,7 @@ public class WorkerSinkTaskTest { // Expect the next poll to discover and perform the rebalance, THEN complete the previous callback handler, // and then return one record for TP1 and one for TP3. final AtomicBoolean rebalanced = new AtomicBoolean(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -1273,7 +1274,7 @@ public class WorkerSinkTaskTest { sinkTask.preCommit(EasyMock.>anyObject()); EasyMock.expectLastCall().andReturn(Collections.emptyMap()); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -1298,7 +1299,7 @@ public class WorkerSinkTaskTest { sinkTask.open(partitions); EasyMock.expectLastCall().andThrow(e); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -1315,7 +1316,7 @@ public class WorkerSinkTaskTest { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { rebalanceListener.getValue().onPartitionsAssigned(partitions); @@ -1332,7 +1333,7 @@ public class WorkerSinkTaskTest { private void expectConsumerWakeup() { consumer.wakeup(); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException()); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new WakeupException()); } private void expectConsumerPoll(final int numMessages) { @@ -1340,7 +1341,7 @@ public class WorkerSinkTaskTest { } private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) { - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 73689d35710..d0089e92b6b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -55,6 +55,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -525,7 +526,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { sinkTask.open(partitions); EasyMock.expectLastCall(); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer>() { + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { rebalanceListener.getValue().onPartitionsAssigned(partitions); @@ -557,7 +558,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private Capture> expectPolls(final long pollDelayMs) throws Exception { // Stub out all the consumer stream/iterator responses, which we just want to verify occur, // but don't care about the exact details here. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -595,7 +596,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of // returning empty data, we return one record. The expectation is that the data will be ignored by the // response behavior specified using the return value of this method. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { @@ -625,7 +626,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { final Map offsets = new HashMap<>(); offsets.put(TOPIC_PARTITION, startOffset); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer( new IAnswer>() { @Override public ConsumerRecords answer() throws Throwable { diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 7e2c5644a3b..365652a75b5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -19,6 +19,7 @@ package kafka.tools import java.io.PrintStream import java.nio.charset.StandardCharsets +import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.regex.Pattern import java.util.{Collections, Locale, Properties, Random} @@ -388,7 +389,7 @@ object ConsoleConsumer extends Logging { private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset: Option[Long], whitelist: Option[String], consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long = Long.MaxValue) { consumerInit() - var recordIter = consumer.poll(0).iterator + var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator() def consumerInit() { (topic, partitionId, offset, whitelist) match { @@ -432,7 +433,7 @@ object ConsoleConsumer extends Logging { def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = { if (!recordIter.hasNext) { - recordIter = consumer.poll(timeoutMs).iterator + recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator if (!recordIter.hasNext) throw new TimeoutException() } diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 5af55a8d7f1..2e7b8ddf094 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -28,8 +28,8 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import kafka.utils.{CommandLineUtils, ToolsUtils} import java.util.{Collections, Properties, Random} - import java.text.SimpleDateFormat +import java.time.Duration import com.typesafe.scalalogging.LazyLogging @@ -127,7 +127,7 @@ object ConsumerPerformance extends LazyLogging { var currentTimeMillis = lastConsumedTime while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) { - val records = consumer.poll(100).asScala + val records = consumer.poll(Duration.ofMillis(100)).asScala currentTimeMillis = System.currentTimeMillis if (records.nonEmpty) lastConsumedTime = currentTimeMillis diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala index 3beaf827f59..4849b1ed8c6 100755 --- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala @@ -18,11 +18,13 @@ package kafka.tools import java.nio.charset.StandardCharsets -import java.util.{Arrays, Collections, Properties} +import java.time.Duration +import java.util.{Arrays, Properties} import kafka.utils.Exit import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ @@ -69,9 +71,7 @@ object EndToEndLatency { consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching - val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) - consumer.subscribe(Collections.singletonList(topic)) val producerProps = loadProps producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) @@ -82,16 +82,21 @@ object EndToEndLatency { producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + // sends a dummy message to create the topic if it doesn't exist + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get() + def finalise() { consumer.commitSync() producer.close() consumer.close() } - //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when - //a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write. - consumer.seekToEnd(Collections.emptyList()) - consumer.poll(0) + + val topicPartitions = consumer.partitionsFor(topic).asScala + .map(p => new TopicPartition(p.topic(), p.partition())).asJava + consumer.assign(topicPartitions) + consumer.seekToEnd(topicPartitions) + consumer.assignment().asScala.foreach(consumer.position) var totalTime = 0.0 val latencies = new Array[Long](numMessages) @@ -103,7 +108,7 @@ object EndToEndLatency { //Send message (of random bytes) synchronously then immediately poll for it producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get() - val recordIter = consumer.poll(timeout).iterator + val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator val elapsed = System.nanoTime - begin diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index d7e09e4efdb..d55d96bd65b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -17,6 +17,7 @@ package kafka.tools +import java.time.Duration import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, TimeUnit} @@ -452,7 +453,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { // uncommitted record since last poll. Using one second as poll's timeout ensures that // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset // commit. - recordIter = consumer.poll(1000).iterator + recordIter = consumer.poll(Duration.ofSeconds(1)).iterator if (!recordIter.hasNext) throw new NoRecordsException } diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 3c045c69eb2..09d3b9ea9a1 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -47,6 +47,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -57,6 +58,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. @@ -313,10 +315,12 @@ public class StreamsResetter { config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); try (final KafkaConsumer client = new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - client.subscribe(topicsToSubscribe); - client.poll(1); + Collection partitions = topicsToSubscribe.stream().map(client::partitionsFor) + .flatMap(Collection::stream) + .map(info -> new TopicPartition(info.topic(), info.partition())) + .collect(Collectors.toList()); + client.assign(partitions); - final Set partitions = client.assignment(); final Set inputTopicPartitions = new HashSet<>(); final Set intermediateTopicPartitions = new HashSet<>(); diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index be062b309df..26d6e23a3f8 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -47,7 +48,7 @@ public class Consumer extends ShutdownableThread { @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); - ConsumerRecords records = consumer.poll(1000); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 0d74645379e..27e7c7fda5d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; import java.io.IOException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -287,7 +288,7 @@ public class TransactionalMessageCopier { try { producer.beginTransaction(); while (messagesInCurrentTransaction < numMessagesForNextTransaction) { - ConsumerRecords records = consumer.poll(200L); + ConsumerRecords records = consumer.poll(Duration.ofMillis(200)); for (ConsumerRecord record : records) { producer.send(producerRecordFromConsumerRecord(outputTopic, record)); messagesInCurrentTransaction++; diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index cc09b233167..58f34718b8a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.Utils; import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -220,7 +221,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons consumer.subscribe(Collections.singletonList(topic), this); while (!isFinished()) { - ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + ConsumerRecords records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); Map offsets = onRecordsReceived(records); if (!useAutoCommit) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 1a852964070..c3a90e4da6a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.kafka.trogdor.task.TaskWorker; +import java.time.Duration; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -135,7 +136,7 @@ public class ConsumeBenchWorker implements TaskWorker { long startBatchMs = startTimeMs; try { while (messagesConsumed < spec.maxMessages()) { - ConsumerRecords records = consumer.poll(50); + ConsumerRecords records = consumer.poll(Duration.ofMillis(50)); if (records.isEmpty()) { continue; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 570f6a11e34..669fafcc75e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -337,7 +338,7 @@ public class RoundTripWorker implements TaskWorker { while (true) { try { pollInvoked++; - ConsumerRecords records = consumer.poll(50); + ConsumerRecords records = consumer.poll(Duration.ofMillis(50)); for (Iterator> iter = records.iterator(); iter.hasNext(); ) { ConsumerRecord record = iter.next(); int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();