MINOR: Update Consumer and Producer JavaDocs for committing offsets (#18336)

The consumer/producer JavaDocs still contain instruction for naively
computing the offset to be committed.

This PR updates the JavaDocs with regard to the improvements of KIP-1094.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
Matthias J. Sax 2025-01-06 13:39:20 -08:00 committed by GitHub
parent c4840f5e93
commit 3918f37af1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 9 deletions

View File

@ -70,8 +70,12 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* <h3>Offsets and Consumer Position</h3>
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer:
* which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5.
* Note that offsets are not guaranteed to be consecutive (such as compacted topic or when records have been produced
* using transactions). For example, if the consumer did read a record with offset 4, but 5 is not an offset
* with a record, its position might advance to 6 (or higher) directly. Similarly, if the consumer's position is 5,
* but there is no record with offset 5, the consumer will return the record with the next higher offset.
* There are actually two notions of position relevant to the user of the consumer:
* <p>
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
* out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
@ -266,8 +270,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* for (ConsumerRecord&lt;String, String&gt; record : partitionRecords) {
* System.out.println(record.offset() + &quot;: &quot; + record.value());
* }
* long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
* consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
* consumer.commitSync(Collections.singletonMap(partition, records.nextOffsets().get(partition)));
* }
* }
* } finally {
@ -276,7 +279,10 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* </pre>
*
* <b>Note: The committed offset should always be the offset of the next message that your application will read.</b>
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should use {@code nextRecordToBeProcessed.offset()}
* or if {@link ConsumerRecords} is exhausted already {@link ConsumerRecords#nextOffsets()} instead.
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
*
* <h4><a name="manualassignment">Manual Partition Assignment</a></h4>
*
@ -984,7 +990,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is a synchronous commit and will block until either the commit succeeds or an unrecoverable error is
@ -1033,7 +1042,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
@ -1117,7 +1129,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1. If automatic group management with {@link #subscribe(Collection)} is used,
* i.e. {@code nextRecordToBeProcessed.offset()} (or {@link ConsumerRecords#nextOffsets()}).
* You should also add the leader epoch as commit metadata, which can be obtained from
* {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* If automatic group management with {@link #subscribe(Collection)} is used,
* then the committed offsets must belong to the currently auto-assigned partitions.
* <p>
* This is an asynchronous call and will not block. Any errors encountered are either passed to the callback

View File

@ -21,6 +21,8 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@ -677,7 +679,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* be the next message your application will consume, i.e. {@code nextRecordToBeProcessed.offset()}
* (or {@link ConsumerRecords#nextOffsets()}). You should also add the leader epoch as commit metadata,
* which can be obtained from {@link ConsumerRecord#leaderEpoch()} or {@link ConsumerRecords#nextOffsets()}.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified