KAFKA-6774; Improve the default group id behavior in KafkaConsumer (KIP-289) (#5877)

Improve the default group id behavior by:
* changing the default consumer group to null, where no offset commit or fetch, or group management operations are allowed
* deprecating the use of empty (`""`) consumer group on the client

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Vahid Hashemian 2018-11-16 00:58:56 -08:00 committed by Jason Gustafson
parent 1a4d44f206
commit c3e7d6252c
8 changed files with 318 additions and 74 deletions

View File

@ -277,7 +277,7 @@ public class ConsumerConfig extends AbstractConfig {
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
Importance.MEDIUM,
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, null, Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
10000,

View File

@ -37,6 +37,8 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
@ -557,6 +559,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Logger log;
private final String clientId;
private String groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@ -654,18 +657,23 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
@SuppressWarnings("unchecked")
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
try {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.isEmpty())
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
this.groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
enableAutoCommit = false;
else if (enableAutoCommit)
throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
} else if (groupId.isEmpty())
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@ -678,8 +686,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
MetricsReporter.class, Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
@ -691,16 +698,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerInterceptor.class);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
@ -710,17 +715,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
this.metadata.bootstrap(addresses, time.milliseconds());
String metricGrpPrefix = "consumer";
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer");
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry.fetcherMetrics);
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
NetworkClient netClient = new NetworkClient(
@ -755,24 +757,26 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
int maxPollIntervalMs = config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
int sessionTimeoutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
this.coordinator = new ConsumerCoordinator(logContext,
this.client,
groupId,
maxPollIntervalMs,
sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
new ConsumerCoordinator(logContext,
this.client,
groupId,
maxPollIntervalMs,
sessionTimeoutMs,
new Heartbeat(time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, retryBackoffMs),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
enableAutoCommit,
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),
config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));
this.fetcher = new Fetcher<>(
logContext,
this.client,
@ -795,11 +799,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
log.debug("Kafka consumer initialized");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
// call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
close(0, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
@ -822,7 +824,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
long retryBackoffMs,
long requestTimeoutMs,
int defaultApiTimeoutMs,
List<PartitionAssignor> assignors) {
List<PartitionAssignor> assignors,
String groupId) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
this.coordinator = coordinator;
@ -839,6 +842,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
this.groupId = groupId;
}
/**
@ -911,9 +915,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquireAndEnsureOpen();
try {
if (topics == null) {
maybeThrowInvalidGroupIdException();
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
@ -980,6 +985,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
maybeThrowInvalidGroupIdException();
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
@ -1026,7 +1032,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
if (this.coordinator != null)
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} finally {
@ -1073,7 +1080,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
if (coordinator != null)
this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
@ -1211,7 +1219,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Visible for testing
*/
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (!coordinator.poll(timer)) {
if (coordinator != null && !coordinator.poll(timer)) {
return false;
}
@ -1219,7 +1227,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
@ -1249,7 +1258,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
if (coordinator != null && coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
@ -1324,6 +1333,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commitSync(Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing the current consumed offsets");
@ -1406,6 +1416,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
"committing offsets " + offsets);
@ -1475,6 +1486,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
log.debug("Committing offsets: {}", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
} finally {
@ -1686,6 +1698,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
Collections.singleton(partition), time.timer(timeout));
if (offsets == null) {
@ -2163,7 +2176,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// coordinator lookup if there are partitions which have missing positions, so
// a consumer with manually assigned partitions can avoid a coordinator dependence
// by always ensuring that assigned partitions have an initial position.
if (!coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
// If there are partitions still needing a position and a reset policy is defined,
// request reset using the default policy. If no reset strategy is defined and there
@ -2216,6 +2229,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}
private void maybeThrowInvalidGroupIdException() {
if (groupId == null)
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
}
// Visible for testing
String getClientId() {
return clientId;

View File

@ -121,7 +121,7 @@ public abstract class AbstractCoordinator implements Closeable {
private Generation generation = Generation.NO_GENERATION;
private RequestFuture<Void> findCoordinatorFuture = null;
/**
* Initialize the coordination manager.
*/
@ -139,7 +139,8 @@ public abstract class AbstractCoordinator implements Closeable {
this.log = logContext.logger(AbstractCoordinator.class);
this.client = client;
this.time = time;
this.groupId = groupId;
this.groupId = Objects.requireNonNull(groupId,
"Expected a non-null group id for coordinator construction");
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.sessionTimeoutMs = sessionTimeoutMs;
this.leaveGroupOnClose = leaveGroupOnClose;

View File

@ -35,6 +35,8 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
@ -138,6 +140,8 @@ public class KafkaConsumerTest {
// a concurrent heartbeat request
private final int autoCommitIntervalMs = 500;
private final String groupId = "mock-group";
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -203,7 +207,7 @@ public class KafkaConsumerTest {
@Test
public void testSubscription() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
consumer.subscribe(singletonList(topic));
assertEquals(singleton(topic), consumer.subscription());
@ -226,21 +230,21 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullTopicCollection() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe((List<String>) null);
}
}
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullTopic() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe(singletonList((String) null));
}
}
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnEmptyTopic() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
String emptyTopic = " ";
consumer.subscribe(singletonList(emptyTopic));
}
@ -248,7 +252,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullPattern() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe((Pattern) null);
}
}
@ -258,6 +262,7 @@ public class KafkaConsumerTest {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) {
consumer.subscribe(singletonList(topic));
}
@ -265,7 +270,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSeekNegative() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.assign(singleton(new TopicPartition("nonExistTopic", 0)));
consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
}
@ -273,14 +278,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicPartition() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.assign(null);
}
}
@Test
public void testAssignOnEmptyTopicPartition() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.assign(Collections.<TopicPartition>emptyList());
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
@ -289,14 +294,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testAssignOnNullTopicInPartition() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.assign(singleton(new TopicPartition(null, 0)));
}
}
@Test(expected = IllegalArgumentException.class)
public void testAssignOnEmptyTopicInPartition() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.assign(singleton(new TopicPartition(" ", 0)));
}
}
@ -328,7 +333,7 @@ public class KafkaConsumerTest {
@Test
public void testPause() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
consumer.assign(singletonList(tp0));
assertEquals(singleton(tp0), consumer.assignment());
@ -346,11 +351,19 @@ public class KafkaConsumerTest {
consumer.close();
}
private KafkaConsumer<byte[], byte[]> newConsumer() {
private KafkaConsumer<byte[], byte[]> newConsumer(String groupId) {
return newConsumer(groupId, Optional.empty());
}
private KafkaConsumer<byte[], byte[]> newConsumer(String groupId, Optional<Boolean> enableAutoCommit) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my.consumer");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
if (groupId != null)
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
if (enableAutoCommit.isPresent())
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit.get().toString());
return newConsumer(props);
}
@ -554,7 +567,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
OffsetResetStrategy.NONE, true);
OffsetResetStrategy.NONE, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@ -577,7 +590,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
OffsetResetStrategy.NONE, true);
OffsetResetStrategy.NONE, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@ -601,7 +614,7 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
OffsetResetStrategy.LATEST, true);
OffsetResetStrategy.LATEST, true, groupId);
consumer.assign(singletonList(tp0));
client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
@ -1210,14 +1223,14 @@ public class KafkaConsumerTest {
@Test(expected = IllegalStateException.class)
public void testPollWithNoSubscription() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) {
consumer.poll(Duration.ZERO);
}
}
@Test(expected = IllegalStateException.class)
public void testPollWithEmptySubscription() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.subscribe(Collections.<String>emptyList());
consumer.poll(Duration.ZERO);
}
@ -1225,7 +1238,7 @@ public class KafkaConsumerTest {
@Test(expected = IllegalStateException.class)
public void testPollWithEmptyUserAssignment() {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) {
consumer.assign(Collections.<TopicPartition>emptySet());
consumer.poll(Duration.ZERO);
}
@ -1265,12 +1278,77 @@ public class KafkaConsumerTest {
@Test
public void closeShouldBeIdempotent() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
consumer.close();
consumer.close();
consumer.close();
}
@Test
public void testOperationsBySubscribingConsumerWithDefaultGroupId() {
try {
newConsumer(null, Optional.of(Boolean.TRUE));
fail("Expected an InvalidConfigurationException");
} catch (KafkaException e) {
assertEquals(InvalidConfigurationException.class, e.getCause().getClass());
}
try {
newConsumer((String) null).subscribe(Collections.singleton(topic));
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
newConsumer((String) null).committed(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
newConsumer((String) null).commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
newConsumer((String) null).commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
}
@Test
public void testOperationsByAssigningConsumerWithDefaultGroupId() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null);
consumer.assign(singleton(tp0));
try {
consumer.committed(tp0);
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
consumer.commitAsync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
try {
consumer.commitSync();
fail("Expected an InvalidGroupIdException");
} catch (InvalidGroupIdException e) {
// OK, expected
}
}
@Test
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
@ -1681,13 +1759,20 @@ public class KafkaConsumerTest {
Metadata metadata,
PartitionAssignor assignor,
boolean autoCommitEnabled) {
return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled);
return newConsumer(time, client, metadata, assignor, OffsetResetStrategy.EARLIEST, autoCommitEnabled, groupId);
}
private KafkaConsumer<String, String> newConsumerNoAutoCommit(Time time,
KafkaClient client,
Metadata metadata) {
return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false);
return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.EARLIEST, false, groupId);
}
private KafkaConsumer<String, String> newConsumer(Time time,
KafkaClient client,
Metadata metadata,
String groupId) {
return newConsumer(time, client, metadata, new RangeAssignor(), OffsetResetStrategy.LATEST, true, groupId);
}
private KafkaConsumer<String, String> newConsumer(Time time,
@ -1695,9 +1780,9 @@ public class KafkaConsumerTest {
Metadata metadata,
PartitionAssignor assignor,
OffsetResetStrategy resetStrategy,
boolean autoCommitEnabled) {
boolean autoCommitEnabled,
String groupId) {
String clientId = "mock-consumer";
String groupId = "mock-group";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
int requestTimeoutMs = 30000;
@ -1782,7 +1867,8 @@ public class KafkaConsumerTest {
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
assignors);
assignors,
groupId);
}
private static class FetchInfo {

View File

@ -84,6 +84,7 @@ public class ClientAuthenticationFailureTest {
public void testConsumerWithInvalidCredentials() {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "");
StringDeserializer deserializer = new StringDeserializer();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) {

View File

@ -113,10 +113,12 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
def createConsumer[K, V](keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
valueDeserializer: Deserializer[V] = new ByteArrayDeserializer,
configOverrides: Properties = new Properties): KafkaConsumer[K, V] = {
configOverrides: Properties = new Properties,
configsToRemove: List[String] = List()): KafkaConsumer[K, V] = {
val props = new Properties
props ++= consumerConfig
props ++= configOverrides
configsToRemove.foreach(props.remove(_))
val consumer = new KafkaConsumer[K, V](props, keyDeserializer, valueDeserializer)
consumers += consumer
consumer

View File

@ -14,6 +14,7 @@ package kafka.api
import java.time.Duration
import java.util
import java.util.Arrays.asList
import java.util.regex.Pattern
import java.util.{Collections, Locale, Optional, Properties}
@ -23,7 +24,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{MetricName, TopicPartition}
import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
@ -1814,4 +1815,129 @@ class PlaintextConsumerTest extends BaseConsumerTest {
s"The current assignment is ${consumer.assignment()}")
}
@Test
def testConsumingWithNullGroupId(): Unit = {
val topic = "test_topic"
val partition = 0;
val tp = new TopicPartition(topic, partition)
createTopic(topic, 1, 1)
TestUtils.waitUntilTrue(() => {
this.zkClient.topicExists(topic)
}, "Failed to create topic")
val producer = createProducer()
producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get()
producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get()
producer.send(new ProducerRecord(topic, partition, "k3".getBytes, "v3".getBytes)).get()
producer.close()
// consumer 1 uses the default group id and consumes from earliest offset
val consumer1Config = new Properties(consumerConfig)
consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
val consumer1 = createConsumer(
configOverrides = consumer1Config,
configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
// consumer 2 uses the default group id and consumes from latest offset
val consumer2Config = new Properties(consumerConfig)
consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
val consumer2 = createConsumer(
configOverrides = consumer2Config,
configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
// consumer 3 uses the default group id and starts from an explicit offset
val consumer3Config = new Properties(consumerConfig)
consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3")
val consumer3 = createConsumer(
configOverrides = consumer3Config,
configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer1.assign(asList(tp))
consumer2.assign(asList(tp))
consumer3.assign(asList(tp))
consumer3.seek(tp, 1)
val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count()
try {
consumer1.commitSync()
fail("Expected offset commit to fail due to null group id")
} catch {
case e: InvalidGroupIdException => // OK
}
try {
consumer2.committed(tp)
fail("Expected committed offset fetch to fail due to null group id")
} catch {
case e: InvalidGroupIdException => // OK
}
val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count()
val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count()
consumer1.unsubscribe()
consumer2.unsubscribe()
consumer3.unsubscribe()
consumer1.close()
consumer2.close()
consumer3.close()
assertEquals("Expected consumer1 to consume from earliest offset", 3, numRecords1)
assertEquals("Expected consumer2 to consume from latest offset", 0, numRecords2)
assertEquals("Expected consumer3 to consume from offset 1", 2, numRecords3)
}
@Test
def testConsumingWithEmptyGroupId(): Unit = {
val topic = "test_topic"
val partition = 0;
val tp = new TopicPartition(topic, partition)
createTopic(topic, 1, 1)
TestUtils.waitUntilTrue(() => {
this.zkClient.topicExists(topic)
}, "Failed to create topic")
val producer = createProducer()
producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get()
producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get()
producer.close()
// consumer 1 uses the empty group id
val consumer1Config = new Properties(consumerConfig)
consumer1Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1")
consumer1Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
val consumer1 = createConsumer(configOverrides = consumer1Config)
// consumer 2 uses the empty group id and consumes from latest offset if there is no committed offset
val consumer2Config = new Properties(consumerConfig)
consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
consumer2Config.put(ConsumerConfig.GROUP_ID_CONFIG, "")
consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2")
consumer2Config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")
val consumer2 = createConsumer(configOverrides = consumer2Config)
consumer1.assign(asList(tp))
consumer2.assign(asList(tp))
val records1 = consumer1.poll(Duration.ofMillis(5000))
consumer1.commitSync()
val records2 = consumer2.poll(Duration.ofMillis(5000))
consumer2.commitSync()
consumer1.close()
consumer2.close()
assertTrue("Expected consumer1 to consume one message from offset 0",
records1.count() == 1 && records1.records(tp).asScala.head.offset == 0)
assertTrue("Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1",
records2.count() == 1 && records2.records(tp).asScala.head.offset == 1)
}
}

View File

@ -19,6 +19,15 @@
<script id="upgrade-template" type="text/x-handlebars-template">
<h4><a id="upgrade_2_2_0" href="#upgrade_2_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x to 2.2.0</a></h4>
<h5><a id="upgrade_220_notable" href="#upgrade_220_notable">Notable changes in 2.2.0</a></h5>
<ul>
<li>The default consumer group id has been changed from the empty string (<code>""</code>) to <code>null</code>. Consumers who use the new default group id will not be able to subscribe to topics,
and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now
have to explicitly provide it as part of their consumer config. For more information see
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer">KIP-289</a>.</li>
</ul>
<h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
<p><b>Note that 2.1.x contains a change to the internal schema used to store consumer offsets. Once the upgrade is