diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 29843c765c3..6f1f8c8bc64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -954,7 +954,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } private CompletableFuture> commit(final CommitEvent commitEvent) { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); offsetCommitCallbackInvoker.executeCallbacks(); if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) { @@ -1083,7 +1083,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { acquireAndEnsureOpen(); long start = time.nanoseconds(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); if (partitions.isEmpty()) { return Collections.emptyMap(); } @@ -1107,7 +1107,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - private void maybeThrowInvalidGroupIdException() { + private void throwIfGroupIdNotDefined() { if (groupMetadata.get().isEmpty()) { throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); @@ -1346,7 +1346,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { public ConsumerGroupMetadata groupMetadata() { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); return groupMetadata.get().get(); } finally { release(); @@ -2028,7 +2028,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private void subscribeInternal(Pattern pattern, Optional listener) { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); if (pattern == null || pattern.toString().isEmpty()) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); @@ -2052,7 +2052,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Optional listener) { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); throwIfSubscriptionPatternIsInvalid(pattern); log.info("Subscribing to regular expression {}", pattern); applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent( @@ -2076,7 +2076,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private void subscribeInternal(Collection topics, Optional listener) { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); if (topics == null) throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); if (topics.isEmpty()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 3a50ff037ab..0e4119b9e33 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -477,7 +477,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { private void subscribeInternal(Collection topics, Optional listener) { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); if (topics == null) throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); if (topics.isEmpty()) { @@ -558,7 +558,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { * configured at-least one partition assignment strategy */ private void subscribeInternal(Pattern pattern, Optional listener) { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); if (pattern == null || pattern.toString().isEmpty()) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty")); @@ -742,7 +742,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { acquireAndEnsureOpen(); long commitStart = time.nanoseconds(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); offsets.forEach(this::updateLastSeenEpochIfNewer); if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + @@ -768,7 +768,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { public void commitAsync(final Map offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); log.debug("Committing offsets: {}", offsets); offsets.forEach(this::updateLastSeenEpochIfNewer); coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); @@ -889,7 +889,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { acquireAndEnsureOpen(); long start = time.nanoseconds(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); final Map offsets; offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); if (offsets == null) { @@ -1078,7 +1078,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { public ConsumerGroupMetadata groupMetadata() { acquireAndEnsureOpen(); try { - maybeThrowInvalidGroupIdException(); + throwIfGroupIdNotDefined(); return coordinator.groupMetadata(); } finally { release(); @@ -1272,7 +1272,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } - private void maybeThrowInvalidGroupIdException() { + private void throwIfGroupIdNotDefined() { if (groupId.isEmpty()) throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");