diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 7fdf4398b9a..5136d6886dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -154,13 +154,25 @@ public interface Consumer extends Closeable { /** * @see KafkaConsumer#committed(TopicPartition) */ + @Deprecated OffsetAndMetadata committed(TopicPartition partition); /** * @see KafkaConsumer#committed(TopicPartition, Duration) */ + @Deprecated OffsetAndMetadata committed(TopicPartition partition, final Duration timeout); + /** + * @see KafkaConsumer#committed(Set) + */ + Map committed(Set partitions); + + /** + * @see KafkaConsumer#committed(Set, Duration) + */ + Map committed(Set partitions, final Duration timeout); + /** * @see KafkaConsumer#metrics() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index be3e176c2ee..7e023415dc3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1735,7 +1735,10 @@ public class KafkaConsumer implements Consumer { * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before * the timeout specified by {@code default.api.timeout.ms} expires. + * + * @deprecated since 2.4 Use {@link #committed(Set)} instead */ + @Deprecated @Override public OffsetAndMetadata committed(TopicPartition partition) { return committed(partition, Duration.ofMillis(defaultApiTimeoutMs)); @@ -1745,7 +1748,8 @@ public class KafkaConsumer implements Consumer { * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. *

- * This call will block to do a remote call to get the latest committed offsets from the server. + * This call will block until the position can be determined, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. * * @param partition The partition to check * @param timeout The maximum amount of time to await the current committed offset @@ -1760,21 +1764,85 @@ public class KafkaConsumer implements Consumer { * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before * expiration of the timeout + * + * @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead */ + @Deprecated @Override public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { + return committed(Collections.singleton(partition), timeout).get(partition); + } + + /** + * Get the last committed offsets for the given partitions (whether the commit happened by this process or + * another). The returned offsets will be used as the position for the consumer in the event of a failure. + *

+ * Partitions that do not have a committed offset would not be included in the returned map. + *

+ * If any of the partitions requested do not exist, an exception would be thrown. + *

+ * This call will do a remote call to get the latest committed offsets from the server, and will block until the + * committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to + * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). + * + * @param partitions The partitions to check + * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets + * would not be included in the returned result + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. + */ + @Override + public Map committed(final Set partitions) { + return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); + } + + /** + * Get the last committed offsets for the given partitions (whether the commit happened by this process or + * another). The returned offsets will be used as the position for the consumer in the event of a failure. + *

+ * Partitions that do not have a committed offset would not be included in the returned map. + *

+ * If any of the partitions requested do not exist, an exception would be thrown. + *

+ * This call will block to do a remote call to get the latest committed offsets from the server. + * + * @param partitions The partitions to check + * @param timeout The maximum amount of time to await the latest committed offsets + * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets + * would not be included in the returned result + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * expiration of the timeout + */ + @Override + public Map committed(final Set partitions, final Duration timeout) { acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); - Map offsets = coordinator.fetchCommittedOffsets( - Collections.singleton(partition), time.timer(timeout)); + Map offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); if (offsets == null) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + - "committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " + - "larger to relax the threshold."); + "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " + + "larger to relax the threshold."); } else { offsets.forEach(this::updateLastSeenEpochIfNewer); - return offsets.get(partition); + return offsets; } } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 947746f3ce1..b20ee9783fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -42,6 +42,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** @@ -290,18 +291,31 @@ public class MockConsumer implements Consumer { subscriptions.seek(partition, offsetAndMetadata.offset()); } + @Deprecated @Override - public synchronized OffsetAndMetadata committed(TopicPartition partition) { - ensureNotClosed(); - if (subscriptions.isAssigned(partition)) { - return committed.get(partition); - } - return new OffsetAndMetadata(0); + public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + return committed(Collections.singleton(partition)).get(partition); + } + + @Deprecated + @Override + public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) { + return committed(partition); } @Override - public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { - return committed(partition); + public synchronized Map committed(final Set partitions) { + ensureNotClosed(); + + return partitions.stream() + .filter(committed::containsKey) + .collect(Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ? + committed.get(tp) : new OffsetAndMetadata(0))); + } + + @Override + public synchronized Map committed(final Set partitions, final Duration timeout) { + return committed(partitions); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7892d6d1c92..a9c4d259aa4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -696,7 +696,7 @@ public class KafkaConsumerTest { // fetch offset for one topic client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(tp0).offset()); + assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); consumer.assign(Arrays.asList(tp0, tp1)); @@ -704,12 +704,12 @@ public class KafkaConsumerTest { Map offsets = new HashMap<>(); offsets.put(tp0, offset1); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(tp0).offset()); + assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); offsets.remove(tp0); offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset2, consumer.committed(tp1).offset()); + assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset()); consumer.close(Duration.ofMillis(0)); } @@ -1137,7 +1137,7 @@ public class KafkaConsumerTest { // fetch offset for one topic client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(singleton(tp0))); @@ -1195,7 +1195,7 @@ public class KafkaConsumerTest { client.prepareResponseFrom( offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(singleton(tp0))); @@ -1256,12 +1256,12 @@ public class KafkaConsumerTest { offsets.put(tp1, 0L); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); offsets.remove(tp0); offsets.put(tp1, 0L); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp1).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp1)).get(tp1).offset()); // fetch and verify consumer's position in the two partitions final Map offsetResponse = new HashMap<>(); @@ -1356,7 +1356,7 @@ public class KafkaConsumerTest { } try { - newConsumer((String) null).committed(tp0); + newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0); fail("Expected an InvalidGroupIdException"); } catch (InvalidGroupIdException e) { // OK, expected @@ -1383,7 +1383,7 @@ public class KafkaConsumerTest { consumer.assign(singleton(tp0)); try { - consumer.committed(tp0); + consumer.committed(Collections.singleton(tp0)).get(tp0); fail("Expected an InvalidGroupIdException"); } catch (InvalidGroupIdException e) { // OK, expected @@ -1636,7 +1636,7 @@ public class KafkaConsumerTest { @Test(expected = AuthenticationException.class) public void testCommittedAuthenticationFaiure() { final KafkaConsumer consumer = consumerWithPendingAuthenticationError(); - consumer.committed(tp0); + consumer.committed(Collections.singleton(tp0)).get(tp0); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index aad4d2973a0..5a012b2cf67 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -55,9 +55,10 @@ public class MockConsumerTest { assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); - assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + final TopicPartition tp = new TopicPartition("test", 0); + assertEquals(2L, consumer.position(tp)); consumer.commitSync(); - assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); + assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset()); } @SuppressWarnings("deprecation") @@ -81,9 +82,10 @@ public class MockConsumerTest { assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); - assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + final TopicPartition tp = new TopicPartition("test", 0); + assertEquals(2L, consumer.position(tp)); consumer.commitSync(); - assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); + assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset()); } @Test diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 209eac0f506..e355400fccc 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -108,7 +108,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { if (records.nonEmpty) { consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) if (consumer.position(tp) == numRecords) { consumer.seekToBeginning(Collections.emptyList()) @@ -153,7 +153,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } else if (coin == 2) { info("Committing offset.") consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) } } } @@ -485,7 +485,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { consumer.poll(time.Duration.ofSeconds(3L)) assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS)) if (committedRecords > 0) - assertEquals(committedRecords, consumer.committed(tp).offset) + assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset) consumer.close() } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index acb0d6b17ba..62b358e6fa2 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -278,8 +278,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { // now we should see the committed positions from another consumer val anotherConsumer = createConsumer() - assertEquals(300, anotherConsumer.committed(tp).offset) - assertEquals(500, anotherConsumer.committed(tp2).offset) + assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -305,8 +305,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { // now we should see the committed positions from another consumer val anotherConsumer = createConsumer() - assertEquals(300, anotherConsumer.committed(tp).offset) - assertEquals(500, anotherConsumer.committed(tp2).offset) + assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -480,17 +480,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { // sync commit val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo") consumer.commitSync(Map((tp, syncMetadata)).asJava) - assertEquals(syncMetadata, consumer.committed(tp)) + assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp)) // async commit val asyncMetadata = new OffsetAndMetadata(10, "bar") sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata))) - assertEquals(asyncMetadata, consumer.committed(tp)) + assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp)) // handle null metadata val nullMetadata = new OffsetAndMetadata(5, null) consumer.commitSync(Map(tp -> nullMetadata).asJava) - assertEquals(nullMetadata, consumer.committed(tp)) + assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp)) } @Test @@ -509,7 +509,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(None, callback.lastError) assertEquals(count, callback.successCount) - assertEquals(new OffsetAndMetadata(count), consumer.committed(tp)) + assertEquals(new OffsetAndMetadata(count), consumer.committed(Set(tp).asJava).get(tp)) } @Test @@ -623,7 +623,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { sendRecords(producer, numRecords = 5, tp) val consumer = createConsumer() - assertNull(consumer.committed(new TopicPartition(topic, 15))) + assertTrue(consumer.committed(Set(new TopicPartition(topic, 15)).asJava).isEmpty) // position() on a partition that we aren't subscribed to throws an exception intercept[IllegalStateException] { @@ -634,12 +634,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, consumer.position(tp)) consumer.commitSync() - assertEquals(0L, consumer.committed(tp).offset) + assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset) consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0) assertEquals("After consuming 5 records, position should be 5", 5L, consumer.position(tp)) consumer.commitSync() - assertEquals("Committed offset should be returned", 5L, consumer.committed(tp).offset) + assertEquals("Committed offset should be returned", 5L, consumer.committed(Set(tp).asJava).get(tp).offset) sendRecords(producer, numRecords = 1, tp) @@ -1024,12 +1024,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { // commit sync and verify onCommit is called val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava) - assertEquals(2, testConsumer.committed(tp).offset) + assertEquals(2, testConsumer.committed(Set(tp).asJava).get(tp).offset) assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) // commit async and verify onCommit is called sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L)))) - assertEquals(5, testConsumer.committed(tp).offset) + assertEquals(5, testConsumer.committed(Set(tp).asJava).get(tp).offset) assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) testConsumer.close() @@ -1076,8 +1076,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { rebalanceListener) // after rebalancing, we should have reset to the committed positions - assertEquals(10, testConsumer.committed(tp).offset) - assertEquals(20, testConsumer.committed(tp2).offset) + assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset) assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance) // verify commits are intercepted on close @@ -1321,19 +1321,19 @@ class PlaintextConsumerTest extends BaseConsumerTest { val pos1 = consumer.position(tp) val pos2 = consumer.position(tp2) consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) - assertEquals(3, consumer.committed(tp).offset) - assertNull(consumer.committed(tp2)) + assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) + assertNull(consumer.committed(Set(tp2).asJava).get(tp2)) // Positions should not change assertEquals(pos1, consumer.position(tp)) assertEquals(pos2, consumer.position(tp2)) consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) - assertEquals(3, consumer.committed(tp).offset) - assertEquals(5, consumer.committed(tp2).offset) + assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset) // Using async should pick up the committed changes after commit completes sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L)))) - assertEquals(7, consumer.committed(tp2).offset) + assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -1371,8 +1371,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, newAssignment) // after rebalancing, we should have reset to the committed positions - assertEquals(300, consumer.committed(tp).offset) - assertEquals(500, consumer.committed(tp2).offset) + assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -1808,7 +1808,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } try { - consumer2.committed(tp) + consumer2.committed(Set(tp).asJava) fail("Expected committed offset fetch to fail due to null group id") } catch { case e: InvalidGroupIdException => // OK diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 254ccadc890..e1bf3db4ff9 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness { val producer2 = transactionalProducers(1) producer2.initTransactions() - assertEquals(offsetAndMetadata, consumer.committed(tp)) + assertEquals(offsetAndMetadata, consumer.committed(Set(tp).asJava).get(tp)) } @Test diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index fdadec41e37..3ee83e25eea 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -27,7 +27,7 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer import org.junit.{After, Before} @@ -70,14 +70,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { props.put("group.id", group) val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) try { - consumer.partitionsFor(topic).asScala.flatMap { partitionInfo => - val tp = new TopicPartition(partitionInfo.topic, partitionInfo.partition) - val committed = consumer.committed(tp) - if (committed == null) - None - else - Some(tp -> committed.offset) - }.toMap + val partitions: Set[TopicPartition] = consumer.partitionsFor(topic) + .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)} + + consumer.committed(partitions.asJava).asScala.mapValues(_.offset()).toMap } finally { consumer.close() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d28bd99c553..c1951c010c5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1432,11 +1432,12 @@ object TestUtils extends Logging { offsetsToCommit.toMap } - def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = { + def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { + val committed = consumer.committed(consumer.assignment).asScala.mapValues(_.offset) + consumer.assignment.asScala.foreach { topicPartition => - val offset = consumer.committed(topicPartition) - if (offset != null) - consumer.seek(topicPartition, offset.offset) + if (committed.contains(topicPartition)) + consumer.seek(topicPartition, committed(topicPartition)) else consumer.seekToBeginning(Collections.singletonList(topicPartition)) } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 798f0b089a0..03a002127b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; @@ -35,7 +34,9 @@ import org.slf4j.Logger; import java.io.IOException; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public abstract class AbstractTask implements Task { @@ -250,16 +251,23 @@ public abstract class AbstractTask implements Task { return stateMgr.changelogPartitions(); } - long committedOffsetForPartition(final TopicPartition partition) { + Map committedOffsetForPartitions(final Set partitions) { try { - final OffsetAndMetadata metadata = consumer.committed(partition); - return metadata != null ? metadata.offset() : 0L; + final Map results = consumer.committed(partitions) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + + // those do not have a committed offset would default to 0 + for (final TopicPartition tp : partitions) { + results.putIfAbsent(tp, 0L); + } + + return results; } catch (final AuthorizationException e) { - throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e); + throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partitions), e); } catch (final WakeupException e) { throw e; } catch (final KafkaException e) { - throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partition), e); + throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partitions), e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2e80c94e5f5..9f5f29fcdfe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -247,13 +247,12 @@ public class ProcessorStateManager implements StateManager { standbyRestoredOffsets.put(storePartition, lastOffset + 1); } - void putOffsetLimit(final TopicPartition partition, - final long limit) { - log.trace("Updating store offset limit for partition {} to {}", partition, limit); - offsetLimits.put(partition, limit); + void putOffsetLimits(final Map offsets) { + log.trace("Updating store offset limit with {}", offsets); + offsetLimits.putAll(offsets); } - long offsetLimit(final TopicPartition partition) { + private long offsetLimit(final TopicPartition partition) { final Long limit = offsetLimits.get(partition); return limit != null ? limit : Long.MAX_VALUE; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 17b6e665d46..071b7d667bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -37,10 +37,10 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; * A StandbyTask */ public class StandbyTask extends AbstractTask { - private Map checkpointedOffsets = new HashMap<>(); + private boolean updateOffsetLimits; private final Sensor closeTaskSensor; private final Map offsetLimits = new HashMap<>(); - private final Set updateableOffsetLimits = new HashSet<>(); + private Map checkpointedOffsets = new HashMap<>(); /** * Create {@link StandbyTask} with its assigned partitions @@ -69,10 +69,8 @@ public class StandbyTask extends AbstractTask { final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); partitions.stream() .filter(tp -> changelogTopicNames.contains(tp.topic())) - .forEach(tp -> { - offsetLimits.put(tp, 0L); - updateableOffsetLimits.add(tp); - }); + .forEach(tp -> offsetLimits.put(tp, 0L)); + updateOffsetLimits = true; } @Override @@ -193,7 +191,7 @@ public class StandbyTask extends AbstractTask { // Check if we're unable to process records due to an offset limit (e.g. when our // partition is both a source and a changelog). If we're limited then try to refresh // the offset limit if possible. - if (record.offset() >= limit && updateableOffsetLimits.contains(partition)) { + if (record.offset() >= limit && updateOffsetLimits) { limit = updateOffsetLimits(partition); } @@ -222,18 +220,24 @@ public class StandbyTask extends AbstractTask { throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition); } - updateableOffsetLimits.remove(partition); + final Map newLimits = committedOffsetForPartitions(offsetLimits.keySet()); + + for (final Map.Entry newlimit : newLimits.entrySet()) { + final Long previousLimit = offsetLimits.get(newlimit.getKey()); + if (previousLimit != null && previousLimit > newlimit.getValue()) { + throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + + "New limit: " + newlimit.getValue() + ". Previous limit: " + previousLimit); + } - final long newLimit = committedOffsetForPartition(partition); - final long previousLimit = offsetLimits.put(partition, newLimit); - if (previousLimit > newLimit) { - throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + - "New limit: " + newLimit + ". Previous limit: " + previousLimit); } - return newLimit; + + offsetLimits.putAll(newLimits); + updateOffsetLimits = false; + + return offsetLimits.get(partition); } void allowUpdateOfOffsetLimit() { - updateableOffsetLimits.addAll(offsetLimits.keySet()); + updateOffsetLimits = true; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3eb1e559089..ccf522813f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -31,6 +31,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -249,13 +251,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // partitions of topics that are both sources and changelogs and set the consumer committed // offset via stateMgr as there is not a more direct route. final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); - partitions.stream() - .filter(tp -> changelogTopicNames.contains(tp.topic())) - .forEach(tp -> { - final long offset = committedOffsetForPartition(tp); - stateMgr.putOffsetLimit(tp, offset); - log.trace("Updating store offset limits {} for changelog {}", offset, tp); - }); + final Set sourcePartitionsAsChangelog = new HashSet<>(partitions) + .stream().filter(tp -> changelogTopicNames.contains(tp.topic())).collect(Collectors.toSet()); + final Map committedOffsets = committedOffsetForPartitions(sourcePartitionsAsChangelog); + stateMgr.putOffsetLimits(committedOffsets); + registerStateStores(); return changelogPartitions().isEmpty(); @@ -481,7 +481,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final long offset = entry.getValue() + 1; final long partitionTime = partitionTimes.get(partition); consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, encodeTimestamp(partitionTime))); - stateMgr.putOffsetLimit(partition, offset); } try { @@ -735,25 +734,30 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator taskClosed = true; } - private void initializeCommittedTimestamp(final TopicPartition partition) { - final OffsetAndMetadata metadata = consumer.committed(partition); - - if (metadata != null) { - final long committedTimestamp = decodeTimestamp(metadata.metadata()); - partitionGroup.setPartitionTime(partition, committedTimestamp); - log.debug("A committed timestamp was detected: setting the partition time of partition {}" - + " to {} in stream task {}", partition, committedTimestamp, this); - } else { - log.debug("No committed timestamp was found in metadata for partition {}", partition); - } - } - /** * Retrieves formerly committed timestamps and updates the local queue's partition time. */ public void initializeTaskTime() { - for (final TopicPartition partition : partitionGroup.partitions()) { - initializeCommittedTimestamp(partition); + final Map committed = consumer.committed(partitionGroup.partitions()); + + for (final Map.Entry entry : committed.entrySet()) { + final TopicPartition partition = entry.getKey(); + final OffsetAndMetadata metadata = entry.getValue(); + + if (metadata != null) { + final long committedTimestamp = decodeTimestamp(metadata.metadata()); + partitionGroup.setPartitionTime(partition, committedTimestamp); + log.debug("A committed timestamp was detected: setting the partition time of partition {}" + + " to {} in stream task {}", partition, committedTimestamp, this); + } else { + log.debug("No committed timestamp was found in metadata for partition {}", partition); + } + } + + final Set nonCommitted = new HashSet<>(partitionGroup.partitions()); + nonCommitted.removeAll(committed.keySet()); + for (final TopicPartition partition : nonCommitted) { + log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 033aeeefd7f..6f42fb26526 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -563,9 +563,9 @@ public class StandbyTaskTest { final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + public synchronized Map committed(final Set partitions) { committedCallCount.getAndIncrement(); - return super.committed(partition); + return super.committed(partitions); } }; @@ -596,9 +596,9 @@ public class StandbyTaskTest { final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + public synchronized Map committed(final Set partitions) { committedCallCount.getAndIncrement(); - return super.committed(partition); + return super.committed(partitions); } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index e90151f2822..a4f529d8795 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -657,7 +657,7 @@ public class StreamTaskTest { public void shouldRestorePartitionTimeAfterRestartWithEosDisabled() { createTaskWithProcessAndCommit(false); - assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(partition1).metadata())); + assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(Collections.singleton(partition1)).get(partition1).metadata())); // reset times here by creating a new task task = createStatelessTask(createConfig(false)); @@ -1585,7 +1585,7 @@ public class StreamTaskTest { private Consumer mockConsumerWithCommittedException(final RuntimeException toThrow) { return new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public OffsetAndMetadata committed(final TopicPartition partition) { + public Map committed(final Set partitions) { throw toThrow; } }; diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index a0ac1f188f2..cfbac1a4e4c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -195,13 +195,13 @@ public class TransactionalMessageCopier { } private static void resetToLastCommittedPositions(KafkaConsumer consumer) { - for (TopicPartition topicPartition : consumer.assignment()) { - OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); + final Map committed = consumer.committed(consumer.assignment()); + committed.forEach((tp, offsetAndMetadata) -> { if (offsetAndMetadata != null) - consumer.seek(topicPartition, offsetAndMetadata.offset()); + consumer.seek(tp, offsetAndMetadata.offset()); else - consumer.seekToBeginning(singleton(topicPartition)); - } + consumer.seekToBeginning(singleton(tp)); + }); } private static long messagesRemaining(KafkaConsumer consumer, TopicPartition partition) {