diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3a06e71335d..0e7d3d65b1f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1003,7 +1003,8 @@ public class KafkaConsumer implements Consumer { * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * - * @param offsets A map of offsets by partition with associated metadata + * @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it + * is safe to mutate the map after returning. * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, * or if there is an active group with the same group.id which is using group management. In such cases, @@ -1054,7 +1055,8 @@ public class KafkaConsumer implements Consumer { * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * - * @param offsets A map of offsets by partition with associated metadata + * @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it + * is safe to mutate the map after returning. * @param timeout The maximum amount of time to await completion of the offset commit * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, @@ -1143,7 +1145,7 @@ public class KafkaConsumer implements Consumer { * offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()} * (and variants) returns. * - * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it + * @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it * is safe to mutate the map after returning. * @param callback Callback to invoke when the commit completes * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol @@ -1563,7 +1565,7 @@ public class KafkaConsumer implements Consumer { * @param timestampsToSearch the mapping from partition to the timestamp to look up. * * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater - * than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within + * than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within * the default timeout, and no corresponding message exists, the entry in the returned map will be {@code null} * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details @@ -1590,7 +1592,7 @@ public class KafkaConsumer implements Consumer { * @param timeout The maximum amount of time to await retrieval of the offsets * * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater - * than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within + * than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within * timeout, and no corresponding message exists, the entry in the returned map will be {@code null} * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e301b6855c6..29843c765c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -385,7 +385,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { setGroupAssignmentSnapshot(partitions); } }; - + public AsyncKafkaConsumer(final ConsumerConfig config, final Deserializer keyDeserializer, final Deserializer valueDeserializer, @@ -927,7 +927,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { @Override public void commitAsync(Map offsets, OffsetCommitCallback callback) { - commitAsync(Optional.of(offsets), callback); + commitAsync(Optional.of(new HashMap<>(offsets)), callback); } private void commitAsync(Optional> offsets, OffsetCommitCallback callback) { @@ -1599,12 +1599,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { @Override public void commitSync(Map offsets) { - commitSync(Optional.of(offsets), defaultApiTimeoutMs); + commitSync(Optional.of(new HashMap<>(offsets)), defaultApiTimeoutMs); } @Override public void commitSync(Map offsets, Duration timeout) { - commitSync(Optional.of(offsets), timeout); + commitSync(Optional.of(new HashMap<>(offsets)), timeout); } private void commitSync(Optional> offsets, Duration timeout) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index bb9482aa636..ec5b6b1f6f8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -357,6 +357,26 @@ public class AsyncKafkaConsumerTest { assertSame(exception.getClass(), callback.exception.getClass()); } + @Test + public void testCommitAsyncShouldCopyOffsets() { + consumer = newConsumer(); + + TopicPartition tp = new TopicPartition("t0", 2); + Map offsets = new HashMap<>(); + offsets.put(tp, new OffsetAndMetadata(10L)); + + markOffsetsReadyForCommitEvent(); + consumer.commitAsync(offsets, null); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final AsyncCommitEvent commitEvent = commitEventCaptor.getValue(); + assertTrue(commitEvent.offsets().isPresent()); + assertTrue(commitEvent.offsets().get().containsKey(tp)); + offsets.remove(tp); + assertTrue(commitEvent.offsets().get().containsKey(tp)); + } + private static Stream commitExceptionSupplier() { return Stream.of( new KafkaException("Test exception"), @@ -590,6 +610,26 @@ public class AsyncKafkaConsumerTest { assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100))); } + @Test + public void testCommitSyncShouldCopyOffsets() { + consumer = newConsumer(); + + TopicPartition tp = new TopicPartition("t0", 2); + Map offsets = new HashMap<>(); + offsets.put(tp, new OffsetAndMetadata(10L)); + + completeCommitSyncApplicationEventSuccessfully(); + consumer.commitSync(offsets); + + final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class); + verify(applicationEventHandler).add(commitEventCaptor.capture()); + final SyncCommitEvent commitEvent = commitEventCaptor.getValue(); + assertTrue(commitEvent.offsets().isPresent()); + assertTrue(commitEvent.offsets().get().containsKey(tp)); + offsets.remove(tp); + assertTrue(commitEvent.offsets().get().containsKey(tp)); + } + private CompletableFuture setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) { time = new MockTime(1); consumer = newConsumer();