mirror of https://github.com/apache/kafka.git
While reading through the code, I found the method name to be somewhat ambiguous and not fully descriptive of its purpose. So I renamed the method to make its purpose clearer and more self-explanatory. If there was another reason for the original naming, I’d be happy to hear about it. Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
3479ce793b
commit
c4a769bc8b
|
@ -954,7 +954,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final CommitEvent commitEvent) {
|
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final CommitEvent commitEvent) {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
offsetCommitCallbackInvoker.executeCallbacks();
|
offsetCommitCallbackInvoker.executeCallbacks();
|
||||||
|
|
||||||
if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) {
|
if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) {
|
||||||
|
@ -1083,7 +1083,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
long start = time.nanoseconds();
|
long start = time.nanoseconds();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
if (partitions.isEmpty()) {
|
if (partitions.isEmpty()) {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
@ -1107,7 +1107,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeThrowInvalidGroupIdException() {
|
private void throwIfGroupIdNotDefined() {
|
||||||
if (groupMetadata.get().isEmpty()) {
|
if (groupMetadata.get().isEmpty()) {
|
||||||
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
|
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
|
||||||
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
|
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
|
||||||
|
@ -1346,7 +1346,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
public ConsumerGroupMetadata groupMetadata() {
|
public ConsumerGroupMetadata groupMetadata() {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
return groupMetadata.get().get();
|
return groupMetadata.get().get();
|
||||||
} finally {
|
} finally {
|
||||||
release();
|
release();
|
||||||
|
@ -2028,7 +2028,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
|
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
if (pattern == null || pattern.toString().isEmpty())
|
if (pattern == null || pattern.toString().isEmpty())
|
||||||
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
|
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
|
||||||
"null" : "empty"));
|
"null" : "empty"));
|
||||||
|
@ -2052,7 +2052,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
Optional<ConsumerRebalanceListener> listener) {
|
Optional<ConsumerRebalanceListener> listener) {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
throwIfSubscriptionPatternIsInvalid(pattern);
|
throwIfSubscriptionPatternIsInvalid(pattern);
|
||||||
log.info("Subscribing to regular expression {}", pattern);
|
log.info("Subscribing to regular expression {}", pattern);
|
||||||
applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent(
|
applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent(
|
||||||
|
@ -2076,7 +2076,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
|
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
if (topics == null)
|
if (topics == null)
|
||||||
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
|
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
|
||||||
if (topics.isEmpty()) {
|
if (topics.isEmpty()) {
|
||||||
|
|
|
@ -477,7 +477,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
|
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
if (topics == null)
|
if (topics == null)
|
||||||
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
|
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
|
||||||
if (topics.isEmpty()) {
|
if (topics.isEmpty()) {
|
||||||
|
@ -558,7 +558,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
* configured at-least one partition assignment strategy
|
* configured at-least one partition assignment strategy
|
||||||
*/
|
*/
|
||||||
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
|
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
if (pattern == null || pattern.toString().isEmpty())
|
if (pattern == null || pattern.toString().isEmpty())
|
||||||
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
|
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
|
||||||
"null" : "empty"));
|
"null" : "empty"));
|
||||||
|
@ -742,7 +742,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
long commitStart = time.nanoseconds();
|
long commitStart = time.nanoseconds();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
offsets.forEach(this::updateLastSeenEpochIfNewer);
|
offsets.forEach(this::updateLastSeenEpochIfNewer);
|
||||||
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
|
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
|
||||||
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
|
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
|
||||||
|
@ -768,7 +768,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
|
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
log.debug("Committing offsets: {}", offsets);
|
log.debug("Committing offsets: {}", offsets);
|
||||||
offsets.forEach(this::updateLastSeenEpochIfNewer);
|
offsets.forEach(this::updateLastSeenEpochIfNewer);
|
||||||
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
|
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
|
||||||
|
@ -889,7 +889,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
long start = time.nanoseconds();
|
long start = time.nanoseconds();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
final Map<TopicPartition, OffsetAndMetadata> offsets;
|
final Map<TopicPartition, OffsetAndMetadata> offsets;
|
||||||
offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
|
offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
|
||||||
if (offsets == null) {
|
if (offsets == null) {
|
||||||
|
@ -1078,7 +1078,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
public ConsumerGroupMetadata groupMetadata() {
|
public ConsumerGroupMetadata groupMetadata() {
|
||||||
acquireAndEnsureOpen();
|
acquireAndEnsureOpen();
|
||||||
try {
|
try {
|
||||||
maybeThrowInvalidGroupIdException();
|
throwIfGroupIdNotDefined();
|
||||||
return coordinator.groupMetadata();
|
return coordinator.groupMetadata();
|
||||||
} finally {
|
} finally {
|
||||||
release();
|
release();
|
||||||
|
@ -1272,7 +1272,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
|
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void maybeThrowInvalidGroupIdException() {
|
private void throwIfGroupIdNotDefined() {
|
||||||
if (groupId.isEmpty())
|
if (groupId.isEmpty())
|
||||||
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
|
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
|
||||||
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
|
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
|
||||||
|
|
Loading…
Reference in New Issue