KAFKA-17324: Set config group.protocol to classic in Streams (#16878)

Streams is not compatible with the new consumer rebalance protocol proposed in KIP-848. Thus, Streams should set/override config group.protocol to classic at startup to ensure that the classic protocol is used.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
TengYao Chi 2024-08-26 20:01:56 +08:00 committed by GitHub
parent 6bc0e1dcba
commit 4886414aa0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 26 additions and 1 deletions

View File

@ -823,7 +823,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG};
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
@ -1225,6 +1225,7 @@ public class StreamsConfig extends AbstractConfig {
tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
tempConsumerDefaultOverrides.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic");
CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}

View File

@ -470,6 +470,22 @@ public class StreamsConfigTest {
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
public void shouldResetToDefaultIfConsumerGroupProtocolIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_PROTOCOL_CONFIG), "consumer");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getMainConsumerConfigs("a", "b", threadIdx);
assertEquals("classic", consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}
@Test
public void shouldResetToDefaultIfRestoreConsumerGroupProtocolIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_PROTOCOL_CONFIG), "consumer");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals("classic", consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}
@Test
public void testGetRestoreConsumerConfigsWithRestoreConsumerOverriddenPrefix() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");
@ -518,6 +534,14 @@ public class StreamsConfigTest {
assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
@Test
public void shouldResetToDefaultIfGlobalConsumerGroupProtocolIsOverridden() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_PROTOCOL_CONFIG), "consumer");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getGlobalConsumerConfigs(clientId);
assertEquals("classic", consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}
@Test
public void testGetGlobalConsumerConfigsWithGlobalConsumerOverriddenPrefix() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "5");