KAFKA-19351: AsyncConsumer#commitAsync should copy the input offsets (#19855)

`AsyncConsumer#commitAsync` and `AsyncConsumer#commitSync` should copy
the input offsets.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Lan Ding 2025-05-30 16:36:38 +08:00 committed by GitHub
parent a122ac9d51
commit 43f603cfb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 9 deletions

View File

@ -1003,7 +1003,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
*
* @param offsets A map of offsets by partition with associated metadata
* @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it
* is safe to mutate the map after returning.
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
* This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
* or if there is an active group with the same <code>group.id</code> which is using group management. In such cases,
@ -1054,7 +1055,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
*
* @param offsets A map of offsets by partition with associated metadata
* @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param timeout The maximum amount of time to await completion of the offset commit
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
* This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
@ -1143,7 +1145,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()}
* (and variants) returns.
*
* @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
* @param offsets A map of offsets by partition with associated metadata. This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param callback Callback to invoke when the commit completes
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer is using the classic group protocol
@ -1563,7 +1565,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
*
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within
* than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within
* the default timeout, and no corresponding message exists, the entry in the returned map will be {@code null}
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
@ -1590,7 +1592,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param timeout The maximum amount of time to await retrieval of the offsets
*
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within
* than or equal to the target timestamp. If the timestamp and offset for a specific partition cannot be found within
* timeout, and no corresponding message exists, the entry in the returned map will be {@code null}
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details

View File

@ -385,7 +385,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
setGroupAssignmentSnapshot(partitions);
}
};
public AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
@ -927,7 +927,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
commitAsync(Optional.of(offsets), callback);
commitAsync(Optional.of(new HashMap<>(offsets)), callback);
}
private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, OffsetCommitCallback callback) {
@ -1599,12 +1599,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(Optional.of(offsets), defaultApiTimeoutMs);
commitSync(Optional.of(new HashMap<>(offsets)), defaultApiTimeoutMs);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
commitSync(Optional.of(offsets), timeout);
commitSync(Optional.of(new HashMap<>(offsets)), timeout);
}
private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets, Duration timeout) {

View File

@ -357,6 +357,26 @@ public class AsyncKafkaConsumerTest {
assertSame(exception.getClass(), callback.exception.getClass());
}
@Test
public void testCommitAsyncShouldCopyOffsets() {
consumer = newConsumer();
TopicPartition tp = new TopicPartition("t0", 2);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp, new OffsetAndMetadata(10L));
markOffsetsReadyForCommitEvent();
consumer.commitAsync(offsets, null);
final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
assertTrue(commitEvent.offsets().isPresent());
assertTrue(commitEvent.offsets().get().containsKey(tp));
offsets.remove(tp);
assertTrue(commitEvent.offsets().get().containsKey(tp));
}
private static Stream<Exception> commitExceptionSupplier() {
return Stream.of(
new KafkaException("Test exception"),
@ -590,6 +610,26 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)), Duration.ofMillis(100)));
}
@Test
public void testCommitSyncShouldCopyOffsets() {
consumer = newConsumer();
TopicPartition tp = new TopicPartition("t0", 2);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp, new OffsetAndMetadata(10L));
completeCommitSyncApplicationEventSuccessfully();
consumer.commitSync(offsets);
final ArgumentCaptor<SyncCommitEvent> commitEventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
final SyncCommitEvent commitEvent = commitEventCaptor.getValue();
assertTrue(commitEvent.offsets().isPresent());
assertTrue(commitEvent.offsets().get().containsKey(tp));
offsets.remove(tp);
assertTrue(commitEvent.offsets().get().containsKey(tp));
}
private CompletableFuture<Void> setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) {
time = new MockTime(1);
consumer = newConsumer();