MINOR: improve JavaDocs for consumer CloseOptions (#19546)

Reviewers: TengYao Chi <frankvicky@apache.org>, PoAn Yang <payang@apache.org>, Lianet Magrans <lmagrans@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Matthias J. Sax 2025-04-29 16:38:16 -07:00 committed by GitHub
parent 988fa3f272
commit 3bb15c5dee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 47 additions and 28 deletions

View File

@ -277,20 +277,18 @@ public interface Consumer<K, V> extends Closeable {
void close(); void close();
/** /**
* This method has been deprecated since Kafka 4.0 and should use {@link Consumer#close(CloseOptions)} instead.
*
* @see KafkaConsumer#close(Duration) * @see KafkaConsumer#close(Duration)
*/ */
@Deprecated @Deprecated
void close(Duration timeout); void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
/** /**
* @see KafkaConsumer#close(CloseOptions) * @see KafkaConsumer#close(CloseOptions)
*/ */
void close(final CloseOptions option); void close(final CloseOptions option);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
} }

View File

@ -1761,9 +1761,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
} }
/** /**
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * Close the consumer with {@link CloseOptions.GroupMembershipOperation#DEFAULT default leave group behavior},
* waiting for up to the default timeout of 30 seconds for any needed cleanup.
* If auto-commit is enabled, this will commit the current offsets if possible within the default * If auto-commit is enabled, this will commit the current offsets if possible within the default
* timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()} * timeout. See {@link #close(CloseOptions)} for details. Note that {@link #wakeup()}
* cannot be used to interrupt close. * cannot be used to interrupt close.
* *
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
@ -1776,10 +1777,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
} }
/** /**
* Tries to close the consumer cleanly within the specified timeout. This method waits up to * This method has been deprecated since Kafka 4.1 and should use {@link KafkaConsumer#close(CloseOptions)} instead.
* {@code timeout} for the consumer to complete pending commits and leave the group. * <p>
* Close the consumer with {@link CloseOptions.GroupMembershipOperation#DEFAULT default leave group behavior}
* cleanly within the specified timeout. This method waits up to
* {@code timeout} for the consumer to complete pending commits and maybe leave the group (if the member is dynamic).
* If auto-commit is enabled, this will commit the current offsets if possible within the * If auto-commit is enabled, this will commit the current offsets if possible within the
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group * timeout. If the consumer is unable to complete offset commits and to gracefully leave the group (if applicable)
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close. * used to interrupt close.
* <p> * <p>
@ -1797,12 +1801,40 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws InterruptException If the thread is interrupted before or while this function is called * @throws InterruptException If the thread is interrupted before or while this function is called
* @throws org.apache.kafka.common.KafkaException for any other error during close * @throws org.apache.kafka.common.KafkaException for any other error during close
*/ */
@Deprecated(since = "4.1")
@Override @Override
@SuppressWarnings("deprecation")
public void close(Duration timeout) { public void close(Duration timeout) {
delegate.close(timeout); delegate.close(timeout);
} }
/**
* Close the consumer cleanly. {@link CloseOptions} allows to specify a timeout and a
* {@link CloseOptions.GroupMembershipOperation leave group behavior}.
* If no timeout is specified, the default timeout of 30 seconds is used.
* If no leave group behavior is specified, the {@link CloseOptions.GroupMembershipOperation#DEFAULT default
* leave group behavior} is used.
* <p>
* This method waits up to the timeout for the consumer to complete pending commits and maybe leave the group,
* depending on the specified leave group behavior.
* If auto-commit is enabled, this will commit the current offsets if possible within the
* timeout. If the consumer is unable to complete offset commits and to gracefully leave the group (if applicable)
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
* <p>
* The actual maximum wait time is bounded by the {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} setting, which
* only applies to operations performed with the broker (coordinator-related requests and
* fetch sessions). Even if a larger timeout is specified, the consumer will not wait longer than
* {@link ConsumerConfig#REQUEST_TIMEOUT_MS_CONFIG} for these requests to complete during the close operation.
* Note that the execution time of callbacks (such as {@link OffsetCommitCallback} and
* {@link ConsumerRebalanceListener}) does not consume time from the close timeout.
*
* @param option see {@link CloseOptions}; cannot be {@code null}
*/
@Override
public void close(CloseOptions option) {
delegate.close(option);
}
/** /**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
* The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}. * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
@ -1813,17 +1845,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
delegate.wakeup(); delegate.wakeup();
} }
/**
* This method allows the caller to specify shutdown behavior using the {@link CloseOptions} class.
* If {@code null} is provided, the default behavior will be applied, equivalent to providing a new {@link CloseOptions} instance.
*
* @param option see {@link CloseOptions}
*/
@Override
public void close(CloseOptions option) {
delegate.close(option);
}
// Functions below are for testing only // Functions below are for testing only
String clientId() { String clientId() {
return delegate.clientId(); return delegate.clientId();

View File

@ -575,8 +575,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
} }
@Deprecated
@Override @Override
@SuppressWarnings("deprecation")
public synchronized void close(Duration timeout) { public synchronized void close(Duration timeout) {
this.closed = true; this.closed = true;
} }

View File

@ -1365,8 +1365,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS))); close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
} }
@Deprecated
@Override @Override
@SuppressWarnings("deprecation")
public void close(Duration timeout) { public void close(Duration timeout) {
close(CloseOptions.timeout(timeout)); close(CloseOptions.timeout(timeout));
} }

View File

@ -1109,8 +1109,8 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS))); close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
} }
@Deprecated
@Override @Override
@SuppressWarnings("deprecation")
public void close(Duration timeout) { public void close(Duration timeout) {
close(CloseOptions.timeout(timeout)); close(CloseOptions.timeout(timeout));
} }

View File

@ -138,7 +138,7 @@ public class Fetcher<K, V> extends AbstractFetch {
// here. // here.
log.debug("All requests couldn't be sent in the specific timeout period {}ms. " + log.debug("All requests couldn't be sent in the specific timeout period {}ms. " +
"This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " + "This may result in unnecessary fetch sessions at the broker. Consider increasing the timeout passed for " +
"KafkaConsumer.close(Duration timeout)", timer.timeoutMs()); "KafkaConsumer.close(...)", timer.timeoutMs());
} }
} }