diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 27dcdad3ccb..779a9933fd7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -35,7 +35,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Utils; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,8 +61,12 @@ public class ConsumerConfig extends AbstractConfig { // a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added. // This is to help optimize ConsumerCoordinator#performAssignment method - public static final List ASSIGN_FROM_SUBSCRIBED_ASSIGNORS = - List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME); + public static final List ASSIGN_FROM_SUBSCRIBED_ASSIGNORS = List.of( + RANGE_ASSIGNOR_NAME, + ROUNDROBIN_ASSIGNOR_NAME, + STICKY_ASSIGNOR_NAME, + COOPERATIVE_STICKY_ASSIGNOR_NAME + ); /* * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS @@ -376,6 +380,22 @@ public class ConsumerConfig extends AbstractConfig { private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + /** + * A list of configuration keys not supported for CLASSIC protocol. + */ + private static final List CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = Collections.singletonList( + GROUP_REMOTE_ASSIGNOR_CONFIG + ); + + /** + * A list of configuration keys not supported for CONSUMER protocol. + */ + private static final List CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = List.of( + PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + HEARTBEAT_INTERVAL_MS_CONFIG, + SESSION_TIMEOUT_MS_CONFIG + ); + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -409,7 +429,7 @@ public class ConsumerConfig extends AbstractConfig { HEARTBEAT_INTERVAL_MS_DOC) .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.LIST, - Arrays.asList(RangeAssignor.class, CooperativeStickyAssignor.class), + List.of(RangeAssignor.class, CooperativeStickyAssignor.class), new ConfigDef.NonNullValidator(), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) @@ -667,7 +687,7 @@ public class ConsumerConfig extends AbstractConfig { Map refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); maybeOverrideClientId(refinedConfigs); maybeOverrideEnableAutoCommit(refinedConfigs); - checkGroupRemoteAssignor(); + checkUnsupportedConfigs(); return refinedConfigs; } @@ -714,9 +734,28 @@ public class ConsumerConfig extends AbstractConfig { } } - private void checkGroupRemoteAssignor() { - if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name()) && getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null && !getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) { - throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name()); + private void checkUnsupportedConfigs() { + String groupProtocol = getString(GROUP_PROTOCOL_CONFIG); + if (GroupProtocol.CLASSIC.name().equalsIgnoreCase(groupProtocol)) { + checkUnsupportedConfigs(GroupProtocol.CLASSIC, CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS); + } else if (GroupProtocol.CONSUMER.name().equalsIgnoreCase(groupProtocol)) { + checkUnsupportedConfigs(GroupProtocol.CONSUMER, CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS); + } + } + + private void checkUnsupportedConfigs(GroupProtocol groupProtocol, List unsupportedConfigs) { + if (getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) { + List invalidConfigs = new ArrayList<>(); + unsupportedConfigs.forEach(configName -> { + Object config = originals().get(configName); + if (config != null && !Utils.isBlank(config.toString())) { + invalidConfigs.add(configName); + } + }); + if (!invalidConfigs.isEmpty()) { + throw new ConfigException(String.join(", ", invalidConfigs) + + " cannot be set when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name()); + } } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java index cbe0bd6ef35..2fa5515fb40 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java @@ -237,4 +237,23 @@ public class ConsumerConfigTest { assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); } } + + @Test + public void testUnsupportedConfigsWithConsumerGroupProtocol() { + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "RoundRobinAssignor"); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); + testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + } + + private void testUnsupportedConfigsWithConsumerGroupProtocol(String configName, Object value) { + final Map configs = Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass, + ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name(), + configName, value + ); + ConfigException exception = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); + assertEquals(configName + " cannot be set when " + + ConsumerConfig.GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CONSUMER.name(), exception.getMessage()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 50f28fb549e..e8a084ebd54 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -3065,7 +3065,10 @@ public class KafkaConsumerTest { configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minBytes); configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); - configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs); + if (groupProtocol == GroupProtocol.CLASSIC) { + configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs); + configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); + } configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString()); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize); @@ -3074,7 +3077,6 @@ public class KafkaConsumerTest { configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); configs.put(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs); configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); - configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); configs.put(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, throwOnStableOffsetNotSupported); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); groupInstanceId.ifPresent(gi -> configs.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, gi)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 0f87fb9037c..a3c59956c5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -1535,18 +1535,6 @@ public class AsyncKafkaConsumerTest { assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG)); } - @Test - public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() { - final Properties props = requiredConsumerConfig(); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroup1"); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "CooperativeStickyAssignor"); - final ConsumerConfig config = new ConsumerConfig(props); - consumer = newConsumer(config); - - assertTrue(config.unused().contains(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)); - } - @Test public void testGroupIdNull() { final Properties props = requiredConsumerConfig(); diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index a7e7424191a..a8b67d9a275 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -17,7 +17,7 @@ package kafka.api import kafka.utils.TestInfoUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, GroupProtocol} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.common.header.Headers import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo} @@ -85,8 +85,10 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testCoordinatorFailover(quorum: String, groupProtocol: String): Unit = { val listener = new TestConsumerReassignmentListener() - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001") - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5001") + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000") + } // Use higher poll timeout to avoid consumer leaving the group due to timeout this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000") val consumer = createConsumer() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index df3352e0076..0c840e4bf64 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2386,7 +2386,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) // Increase timeouts to avoid having a rebalance during the test newConsumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.MAX_VALUE.toString) - newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString) + if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) { + newConsumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT.toString) + } Using.resource(createConsumer(configOverrides = newConsumerConfig)) { consumer => consumer.subscribe(Collections.singletonList(testTopicName)) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala index 9280b6af475..c52228acbca 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala @@ -61,8 +61,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testMaxPollIntervalMs(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString) + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 2000.toString) + } val consumer = createConsumer() @@ -87,8 +89,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testMaxPollIntervalMsDelayInRevocation(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString) - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString) + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString) + } this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) val consumer = createConsumer() @@ -129,8 +133,10 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testMaxPollIntervalMsDelayInAssignment(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000.toString) - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString) + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000.toString) + } this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) val consumer = createConsumer() @@ -154,7 +160,9 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testMaxPollIntervalMsShorterThanPollTimeout(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000.toString) - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500.toString) + } val consumer = createConsumer() val listener = new TestConsumerReassignmentListener diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala index dd2b0d4cffd..2a3a2fcfdf7 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala @@ -211,8 +211,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testUnsubscribeTopic(quorum: String, groupProtocol: String): Unit = { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") + } val consumer = createConsumer() val listener = new TestConsumerReassignmentListener() diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 91544011ea8..2469a482ab5 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -388,8 +388,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testPauseStateNotPreservedByRebalance(quorum: String, groupProtocol: String): Unit = { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") + if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) { + this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test + this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") + } val consumer = createConsumer() val producer = createProducer() diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index cf9c31cdd4a..3af38c44806 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -337,8 +337,9 @@ public class DeleteConsumerGroupsTest { configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol); - configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - + if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) { + configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + } configs.putAll(customConfigs); return configs; } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index 4e8db3f963c..bd621e64d88 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -251,7 +251,9 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest { consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); // Increase timeouts to avoid having a rebalance during the test consumerConfig.putIfAbsent(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); - consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT)); + if (groupProtocol == GroupProtocol.CLASSIC) { + consumerConfig.putIfAbsent(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT)); + } return new KafkaConsumer<>(consumerConfig); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 4efcd244e24..57ec0efbcb3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -992,7 +992,6 @@ public class DescribeConsumerGroupTest { configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); - configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); configs.putAll(customConfigs); return configs; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index 6222e28654d..b16bf8fc0bf 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -511,7 +511,9 @@ public class ListConsumerGroupTest { configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol); - configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) { + configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + } configs.putAll(customConfigs); return configs; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index abfc25cfb0a..5aa73dae2de 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -800,9 +800,11 @@ public class ResetConsumerGroupOffsetTest { configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name); configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); configs.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); configs.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000); + if (GroupProtocol.CLASSIC == groupProtocol) { + configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + } return configs; }