MINOR: Improve javadoc for share consumer (#19533)

Small improvements to share consumer javadoc.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-04-22 15:54:05 +01:00 committed by GitHub
parent 22c5794bc3
commit e78e106221
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 16 deletions

View File

@ -51,8 +51,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
/**
* A client that consumes records from a Kafka cluster using a share group.
* <p>
* <em>This is an early access feature under development which is introduced by KIP-932.
* It is not suitable for production use until it is fully implemented and released.</em>
* <em>This is a preview feature introduced by KIP-932. It is not yet recommended for production use.</em>
*
* <h3>Cross-Version Compatibility</h3>
* This client can communicate with brokers that are a version that supports share groups. You will receive an
@ -100,7 +99,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
* time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
* the lock duration is 30 seconds, but it can also be controlled using the group {@code group.share.record.lock.duration.ms}
* configuration parameter. The idea is that the lock is automatically released once the lock duration has elapsed, and
* configuration property. The idea is that the lock is automatically released once the lock duration has elapsed, and
* then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
* the following ways:
* <ul>
@ -116,8 +115,8 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
* releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
* <p>
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the
* consumer {@code share.acknowledgement.mode} property.
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the
* consumer {@code share.acknowledgement.mode} configuration property.
* <p>
* If the application sets the property to "implicit" or does not set it at all, then the consumer is using
* <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by:
@ -129,7 +128,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
* <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li>
* </ul>
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>.
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgement</em>.
* The application must acknowledge all records returned from {@link #poll(Duration)} using
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}.
* If the application calls {@link #poll(Duration)} without having acknowledged all records, an
@ -162,6 +161,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
@ -181,6 +181,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
@ -203,6 +204,7 @@ import static org.apache.kafka.common.utils.Utils.propsToMap;
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
* props.setProperty(&quot;share.acknowledgement.mode&quot;, &quot;explicit&quot;);
*
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
* while (true) {
@ -443,7 +445,7 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
}
/**
* Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
* Deliver records for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
* subscribed to any topics before polling for data.
*
* <p>
@ -452,13 +454,14 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
*
* @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
*
* @return map of topic to records since the last fetch for the subscribed list of topics
* @return map of topic to records
*
* @throws AuthenticationException if authentication fails. See the exception for more details
* @throws AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the share group. See the exception for more details
* @throws IllegalArgumentException if the timeout value is negative
* @throws IllegalStateException if the consumer is not subscribed to any topics
* @throws IllegalStateException if the consumer is not subscribed to any topics, or it is using
* explicit acknowledgement and has not acknowledged all records previously delivered
* @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
* @throws InvalidTopicException if the current subscription contains any invalid
* topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
@ -475,11 +478,12 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
* Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
* The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
* {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
*
* @param record The record to acknowledge
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/
@Override
public void acknowledge(ConsumerRecord<K, V> record) {
@ -489,14 +493,14 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
/**
* Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
* it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
* {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
* <b>explicit acknowledgement</b>.
* {@link #commitAsync()} or {@link #poll(Duration)} call.
* <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
*
* @param record The record to acknowledge
* @param type The acknowledgement type which indicates whether it was processed successfully
*
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
* used implicit acknowledgement
* @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
* explicit acknowledgement
*/
@Override
public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
@ -585,7 +589,7 @@ public class KafkaShareConsumer<K, V> implements ShareConsumer<K, V> {
* client to complete the request.
* <p>
* Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
* configuration option.
* configuration property.
*
* @param timeout The maximum time to wait for consumer client to determine its client instance ID.
* The value must be non-negative. Specifying a timeout of zero means do not