diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 795a762a494..9cd5766ea3e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -277,7 +277,7 @@ public class ConsumerConfig extends AbstractConfig { ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) - .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) + .define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, 10000, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3a756721fd8..5c673a58c10 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -37,6 +37,8 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; @@ -557,6 +559,7 @@ public class KafkaConsumer implements Consumer { private final Logger log; private final String clientId; + private String groupId; private final ConsumerCoordinator coordinator; private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; @@ -654,18 +657,23 @@ public class KafkaConsumer implements Consumer { } @SuppressWarnings("unchecked") - private KafkaConsumer(ConsumerConfig config, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { + private KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, Deserializer valueDeserializer) { try { String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); - + this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); this.log = logContext.logger(getClass()); + boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided + if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) + enableAutoCommit = false; + else if (enableAutoCommit) + throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); + } else if (groupId.isEmpty()) + log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -678,8 +686,7 @@ public class KafkaConsumer implements Consumer { .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class, - Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); + MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -691,16 +698,14 @@ public class KafkaConsumer implements Consumer { ConsumerInterceptor.class); this.interceptors = new ConsumerInterceptors<>(interceptorList); if (keyDeserializer == null) { - this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.keyDeserializer.configure(config.originals(), true); } else { config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); this.keyDeserializer = keyDeserializer; } if (valueDeserializer == null) { - this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.valueDeserializer.configure(config.originals(), false); } else { config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); @@ -710,17 +715,14 @@ public class KafkaConsumer implements Consumer { this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), true, false, clusterResourceListeners); List addresses = ClientUtils.parseAndValidateAddresses( - config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); + config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); this.metadata.bootstrap(addresses, time.milliseconds()); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time); - IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics); - int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); NetworkClient netClient = new NetworkClient( @@ -755,24 +757,26 @@ public class KafkaConsumer implements Consumer { int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); - this.coordinator = new ConsumerCoordinator(logContext, - this.client, - groupId, - maxPollIntervalMs, - sessionTimeoutMs, - new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), - assignors, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - this.time, - retryBackoffMs, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), - config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), - config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); + // no coordinator will be constructed for the default (null) group id + this.coordinator = groupId == null ? null : + new ConsumerCoordinator(logContext, + this.client, + groupId, + maxPollIntervalMs, + sessionTimeoutMs, + new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs), + assignors, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + this.time, + retryBackoffMs, + enableAutoCommit, + config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); this.fetcher = new Fetcher<>( logContext, this.client, @@ -795,11 +799,9 @@ public class KafkaConsumer implements Consumer { config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); - log.debug("Kafka consumer initialized"); } catch (Throwable t) { - // call close methods if internal objects are already constructed - // this is to prevent resource leak. see KAFKA-2121 + // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 close(0, true); // now propagate the exception throw new KafkaException("Failed to construct kafka consumer", t); @@ -822,7 +824,8 @@ public class KafkaConsumer implements Consumer { long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, - List assignors) { + List assignors, + String groupId) { this.log = logContext.logger(getClass()); this.clientId = clientId; this.coordinator = coordinator; @@ -839,6 +842,7 @@ public class KafkaConsumer implements Consumer { this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.assignors = assignors; + this.groupId = groupId; } /** @@ -911,9 +915,10 @@ public class KafkaConsumer implements Consumer { public void subscribe(Collection topics, ConsumerRebalanceListener listener) { acquireAndEnsureOpen(); try { - if (topics == null) { + maybeThrowInvalidGroupIdException(); + if (topics == null) throw new IllegalArgumentException("Topic collection to subscribe to cannot be null"); - } else if (topics.isEmpty()) { + if (topics.isEmpty()) { // treat subscribing to empty topic list as the same as unsubscribing this.unsubscribe(); } else { @@ -980,6 +985,7 @@ public class KafkaConsumer implements Consumer { */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { + maybeThrowInvalidGroupIdException(); if (pattern == null) throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); @@ -1026,7 +1032,8 @@ public class KafkaConsumer implements Consumer { try { fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); this.subscriptions.unsubscribe(); - this.coordinator.maybeLeaveGroup(); + if (this.coordinator != null) + this.coordinator.maybeLeaveGroup(); this.metadata.needMetadataForAllTopics(false); log.info("Unsubscribed all topics or patterns and assigned partitions"); } finally { @@ -1073,7 +1080,8 @@ public class KafkaConsumer implements Consumer { // make sure the offsets of topic partitions the consumer is unsubscribing from // are committed since there will be no following rebalance - this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); + if (coordinator != null) + this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds()); log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); this.subscriptions.assignFromUser(new HashSet<>(partitions)); @@ -1211,7 +1219,7 @@ public class KafkaConsumer implements Consumer { * Visible for testing */ boolean updateAssignmentMetadataIfNeeded(final Timer timer) { - if (!coordinator.poll(timer)) { + if (coordinator != null && !coordinator.poll(timer)) { return false; } @@ -1219,7 +1227,8 @@ public class KafkaConsumer implements Consumer { } private Map>> pollForFetches(Timer timer) { - long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); + long pollTimeout = coordinator == null ? timer.remainingMs() : + Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately final Map>> records = fetcher.fetchedRecords(); @@ -1249,7 +1258,7 @@ public class KafkaConsumer implements Consumer { // after the long poll, we should check whether the group needs to rebalance // prior to returning data so that the group can stabilize faster - if (coordinator.rejoinNeededOrPending()) { + if (coordinator != null && coordinator.rejoinNeededOrPending()) { return Collections.emptyMap(); } @@ -1324,6 +1333,7 @@ public class KafkaConsumer implements Consumer { public void commitSync(Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + "committing the current consumed offsets"); @@ -1406,6 +1416,7 @@ public class KafkaConsumer implements Consumer { public void commitSync(final Map offsets, final Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " + "committing offsets " + offsets); @@ -1475,6 +1486,7 @@ public class KafkaConsumer implements Consumer { public void commitAsync(final Map offsets, OffsetCommitCallback callback) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); log.debug("Committing offsets: {}", offsets); coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback); } finally { @@ -1686,6 +1698,7 @@ public class KafkaConsumer implements Consumer { public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { acquireAndEnsureOpen(); try { + maybeThrowInvalidGroupIdException(); Map offsets = coordinator.fetchCommittedOffsets( Collections.singleton(partition), time.timer(timeout)); if (offsets == null) { @@ -2163,7 +2176,7 @@ public class KafkaConsumer implements Consumer { // coordinator lookup if there are partitions which have missing positions, so // a consumer with manually assigned partitions can avoid a coordinator dependence // by always ensuring that assigned partitions have an initial position. - if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false; + if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false; // If there are partitions still needing a position and a reset policy is defined, // request reset using the default policy. If no reset strategy is defined and there @@ -2216,6 +2229,12 @@ public class KafkaConsumer implements Consumer { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } + private void maybeThrowInvalidGroupIdException() { + if (groupId == null) + throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); + } + // Visible for testing String getClientId() { return clientId; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d9830877ba7..335e0f21f9f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -121,7 +121,7 @@ public abstract class AbstractCoordinator implements Closeable { private Generation generation = Generation.NO_GENERATION; private RequestFuture findCoordinatorFuture = null; - + /** * Initialize the coordination manager. */ @@ -139,7 +139,8 @@ public abstract class AbstractCoordinator implements Closeable { this.log = logContext.logger(AbstractCoordinator.class); this.client = client; this.time = time; - this.groupId = groupId; + this.groupId = Objects.requireNonNull(groupId, + "Expected a non-null group id for coordinator construction"); this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs; this.leaveGroupOnClose = leaveGroupOnClose; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 987bad22219..3657077d033 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; @@ -138,6 +140,8 @@ public class KafkaConsumerTest { // a concurrent heartbeat request private final int autoCommitIntervalMs = 500; + private final String groupId = "mock-group"; + @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -203,7 +207,7 @@ public class KafkaConsumerTest { @Test public void testSubscription() { - KafkaConsumer consumer = newConsumer(); + KafkaConsumer consumer = newConsumer(groupId); consumer.subscribe(singletonList(topic)); assertEquals(singleton(topic), consumer.subscription()); @@ -226,21 +230,21 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullTopicCollection() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.subscribe((List) null); } } @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullTopic() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.subscribe(singletonList((String) null)); } } @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnEmptyTopic() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { String emptyTopic = " "; consumer.subscribe(singletonList(emptyTopic)); } @@ -248,7 +252,7 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testSubscriptionOnNullPattern() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.subscribe((Pattern) null); } } @@ -258,6 +262,7 @@ public class KafkaConsumerTest { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, ""); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); try (KafkaConsumer consumer = newConsumer(props)) { consumer.subscribe(singletonList(topic)); } @@ -265,7 +270,7 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testSeekNegative() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition("nonExistTopic", 0))); consumer.seek(new TopicPartition("nonExistTopic", 0), -1); } @@ -273,14 +278,14 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicPartition() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer((String) null)) { consumer.assign(null); } } @Test public void testAssignOnEmptyTopicPartition() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.assign(Collections.emptyList()); assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); @@ -289,14 +294,14 @@ public class KafkaConsumerTest { @Test(expected = IllegalArgumentException.class) public void testAssignOnNullTopicInPartition() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition(null, 0))); } } @Test(expected = IllegalArgumentException.class) public void testAssignOnEmptyTopicInPartition() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer((String) null)) { consumer.assign(singleton(new TopicPartition(" ", 0))); } } @@ -328,7 +333,7 @@ public class KafkaConsumerTest { @Test public void testPause() { - KafkaConsumer consumer = newConsumer(); + KafkaConsumer consumer = newConsumer(groupId); consumer.assign(singletonList(tp0)); assertEquals(singleton(tp0), consumer.assignment()); @@ -346,11 +351,19 @@ public class KafkaConsumerTest { consumer.close(); } - private KafkaConsumer newConsumer() { + private KafkaConsumer newConsumer(String groupId) { + return newConsumer(groupId, Optional.empty()); + } + + private KafkaConsumer newConsumer(String groupId, Optional enableAutoCommit) { Properties props = new Properties(); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + if (groupId != null) + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if (enableAutoCommit.isPresent()) + props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.get().toString()); return newConsumer(props); } @@ -554,7 +567,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.NONE, true); + OffsetResetStrategy.NONE, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -577,7 +590,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.NONE, true); + OffsetResetStrategy.NONE, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -601,7 +614,7 @@ public class KafkaConsumerTest { PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, metadata, assignor, - OffsetResetStrategy.LATEST, true); + OffsetResetStrategy.LATEST, true, groupId); consumer.assign(singletonList(tp0)); client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node); @@ -1210,14 +1223,14 @@ public class KafkaConsumerTest { @Test(expected = IllegalStateException.class) public void testPollWithNoSubscription() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer((String) null)) { consumer.poll(Duration.ZERO); } } @Test(expected = IllegalStateException.class) public void testPollWithEmptySubscription() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.subscribe(Collections.emptyList()); consumer.poll(Duration.ZERO); } @@ -1225,7 +1238,7 @@ public class KafkaConsumerTest { @Test(expected = IllegalStateException.class) public void testPollWithEmptyUserAssignment() { - try (KafkaConsumer consumer = newConsumer()) { + try (KafkaConsumer consumer = newConsumer(groupId)) { consumer.assign(Collections.emptySet()); consumer.poll(Duration.ZERO); } @@ -1265,12 +1278,77 @@ public class KafkaConsumerTest { @Test public void closeShouldBeIdempotent() { - KafkaConsumer consumer = newConsumer(); + KafkaConsumer consumer = newConsumer((String) null); consumer.close(); consumer.close(); consumer.close(); } + @Test + public void testOperationsBySubscribingConsumerWithDefaultGroupId() { + try { + newConsumer(null, Optional.of(Boolean.TRUE)); + fail("Expected an InvalidConfigurationException"); + } catch (KafkaException e) { + assertEquals(InvalidConfigurationException.class, e.getCause().getClass()); + } + + try { + newConsumer((String) null).subscribe(Collections.singleton(topic)); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).committed(tp0); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).commitAsync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + newConsumer((String) null).commitSync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + } + + @Test + public void testOperationsByAssigningConsumerWithDefaultGroupId() { + KafkaConsumer consumer = newConsumer((String) null); + consumer.assign(singleton(tp0)); + + try { + consumer.committed(tp0); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + consumer.commitAsync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + + try { + consumer.commitSync(); + fail("Expected an InvalidGroupIdException"); + } catch (InvalidGroupIdException e) { + // OK, expected + } + } + @Test public void testMetricConfigRecordingLevel() { Properties props = new Properties(); @@ -1681,13 +1759,20 @@ public class KafkaConsumerTest { Metadata metadata, PartitionAssignor assignor, boolean autoCommitEnabled) { - return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled); + return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled, groupId); } private KafkaConsumer newConsumerNoAutoCommit(Time time, KafkaClient client, Metadata metadata) { - return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false); + return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false, groupId); + } + + private KafkaConsumer newConsumer(Time time, + KafkaClient client, + Metadata metadata, + String groupId) { + return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.LATEST, true, groupId); } private KafkaConsumer newConsumer(Time time, @@ -1695,9 +1780,9 @@ public class KafkaConsumerTest { Metadata metadata, PartitionAssignor assignor, OffsetResetStrategy resetStrategy, - boolean autoCommitEnabled) { + boolean autoCommitEnabled, + String groupId) { String clientId = "mock-consumer"; - String groupId = "mock-group"; String metricGroupPrefix = "consumer"; long retryBackoffMs = 100; int requestTimeoutMs = 30000; @@ -1782,7 +1867,8 @@ public class KafkaConsumerTest { retryBackoffMs, requestTimeoutMs, defaultApiTimeoutMs, - assignors); + assignors, + groupId); } private static class FetchInfo { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java index 938fe948b1f..61606ab8e89 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java @@ -84,6 +84,7 @@ public class ClientAuthenticationFailureTest { public void testConsumerWithInvalidCredentials() { Map props = new HashMap<>(saslClientConfigs); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, ""); StringDeserializer deserializer = new StringDeserializer(); try (KafkaConsumer consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 0e2797a5de1..5a200055742 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer, valueDeserializer: Deserializer[V] = new ByteArrayDeserializer, - configOverrides: Properties = new Properties): KafkaConsumer[K, V] = { + configOverrides: Properties = new Properties, + configsToRemove: List[String] = List()): KafkaConsumer[K, V] = { val props = new Properties props ++= consumerConfig props ++= configOverrides + configsToRemove.foreach(props.remove(_)) val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer) consumers += consumer consumer diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index c06a796040e..c11fc12d16d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -14,6 +14,7 @@ package kafka.api import java.time.Duration import java.util +import java.util.Arrays.asList import java.util.regex.Pattern import java.util.{Collections, Locale, Optional, Properties} @@ -23,7 +24,7 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException} import org.apache.kafka.common.header.Headers import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.serialization._ @@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } + @Test + def testConsumingWithNullGroupId(): Unit = { + val topic = "test_topic" + val partition = 0; + val tp = new TopicPartition(topic, partition) + createTopic(topic, 1, 1) + + TestUtils.waitUntilTrue(() => { + this.zkClient.topicExists(topic) + }, "Failed to create topic") + + val producer = createProducer() + producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k3".getBytes, "v3".getBytes)).get() + producer.close() + + // consumer 1 uses the default group id and consumes from earliest offset + val consumer1Config = new Properties(consumerConfig) + consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") + val consumer1 = createConsumer( + configOverrides = consumer1Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + // consumer 2 uses the default group id and consumes from latest offset + val consumer2Config = new Properties(consumerConfig) + consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2") + val consumer2 = createConsumer( + configOverrides = consumer2Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + // consumer 3 uses the default group id and starts from an explicit offset + val consumer3Config = new Properties(consumerConfig) + consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3") + val consumer3 = createConsumer( + configOverrides = consumer3Config, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + + consumer1.assign(asList(tp)) + consumer2.assign(asList(tp)) + consumer3.assign(asList(tp)) + consumer3.seek(tp, 1) + + val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count() + + try { + consumer1.commitSync() + fail("Expected offset commit to fail due to null group id") + } catch { + case e: InvalidGroupIdException => // OK + } + + try { + consumer2.committed(tp) + fail("Expected committed offset fetch to fail due to null group id") + } catch { + case e: InvalidGroupIdException => // OK + } + + val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count() + val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count() + + consumer1.unsubscribe() + consumer2.unsubscribe() + consumer3.unsubscribe() + + consumer1.close() + consumer2.close() + consumer3.close() + + assertEquals("Expected consumer1 to consume from earliest offset", 3, numRecords1) + assertEquals("Expected consumer2 to consume from latest offset", 0, numRecords2) + assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3) + } + + @Test + def testConsumingWithEmptyGroupId(): Unit = { + val topic = "test_topic" + val partition = 0; + val tp = new TopicPartition(topic, partition) + createTopic(topic, 1, 1) + + TestUtils.waitUntilTrue(() => { + this.zkClient.topicExists(topic) + }, "Failed to create topic") + + val producer = createProducer() + producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get() + producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get() + producer.close() + + // consumer 1 uses the empty group id + val consumer1Config = new Properties(consumerConfig) + consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "") + consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") + consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") + val consumer1 = createConsumer(configOverrides = consumer1Config) + + // consumer 2 uses the empty group id and consumes from latest offset if there is no committed offset + val consumer2Config = new Properties(consumerConfig) + consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") + consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "") + consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2") + consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") + val consumer2 = createConsumer(configOverrides = consumer2Config) + + consumer1.assign(asList(tp)) + consumer2.assign(asList(tp)) + + val records1 = consumer1.poll(Duration.ofMillis(5000)) + consumer1.commitSync() + + val records2 = consumer2.poll(Duration.ofMillis(5000)) + consumer2.commitSync() + + consumer1.close() + consumer2.close() + + assertTrue("Expected consumer1 to consume one message from offset 0", + records1.count() == 1 && records1.records(tp).asScala.head.offset == 0) + assertTrue("Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1", + records2.count() == 1 && records2.records(tp).asScala.head.offset == 1) + } } diff --git a/docs/upgrade.html b/docs/upgrade.html index 33d9964113a..154547b10a0 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,15 @@