KAFKA-19376: Throw an error message if any unsupported feature is used with KIP-1071 (#19908)

We should be mindful of ours users and let them know early if they are
using an unsupported feature in 4.1.

Unsupported features:

- Regular expressions
- Warm-up replicas (high availability assignor)
- Static membership
- Standby replicas enabled through local config
- Named topologies (already checked)
- Non-default kafka-client supplier

Reviewers: Bill Bejeck <bbejeck@apache.org>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Lucas Brutschy 2025-06-10 11:13:34 +02:00 committed by GitHub
parent 08aa469af7
commit 793dcee541
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 148 additions and 5 deletions

View File

@ -60,6 +60,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
@ -971,13 +972,16 @@ public class KafkaStreams implements AutoCloseable {
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);
// use client id instead of thread client id since this admin client may be shared among threads
this.clientSupplier = clientSupplier;
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
log.info("Kafka Streams version: {}", ClientMetrics.version());
log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol();
// use client id instead of thread client id since this admin client may be shared among threads
adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId)));
metrics = createMetrics(applicationConfigs, time, clientId);
final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId);
metrics.addReporter(reporter);
@ -1047,6 +1051,22 @@ public class KafkaStreams implements AutoCloseable {
rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
}
private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() {
if (applicationConfigs.isStreamsProtocolEnabled()) {
log.info("Streams rebalance protocol enabled");
if (topologyMetadata.hasNamedTopologies()) {
throw new UnsupportedOperationException("Named topologies are not supported with the STREAMS protocol.");
}
if (topologyMetadata.usesPatternSubscription()) {
throw new UnsupportedOperationException("Pattern subscriptions are not supported with the STREAMS protocol.");
}
if (!(clientSupplier instanceof DefaultKafkaClientSupplier)) {
log.warn("A non-default kafka client supplier was supplied. Note that supplying a custom main consumer" +
" is not supported with the STREAMS protocol.");
}
}
}
private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) {
final StreamThread streamThread = StreamThread.create(
topologyMetadata,

View File

@ -1525,10 +1525,27 @@ public class StreamsConfig extends AbstractConfig {
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
verifyStreamsProtocolCompatibility(doLog);
}
if (doLog && getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT))) {
private void verifyStreamsProtocolCompatibility(final boolean doLog) {
if (doLog && isStreamsProtocolEnabled()) {
log.warn("The streams rebalance protocol is still in development and should not be used in production. "
+ "Please set group.protocol=classic (default) in all production use cases.");
final Map<String, Object> mainConsumerConfigs = getMainConsumerConfigs("dummy", "dummy", -1);
final String instanceId = (String) mainConsumerConfigs.get(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG);
if (instanceId != null && !instanceId.isEmpty()) {
throw new ConfigException("Streams rebalance protocol does not support static membership. "
+ "Please set group.protocol=classic or remove group.instance.id from the configuration.");
}
if (getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG) != 0) {
log.warn("Warmup replicas are not supported yet with the streams protocol and will be ignored. "
+ "If you want to use warmup replicas, please set group.protocol=classic.");
}
if (getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG) != 0) {
log.warn("Standby replicas are configured broker-side in the streams group protocol and will be ignored. "
+ "Please use the admin client or kafka-configs.sh to set the streams groups's standby replicas.");
}
}
}
@ -2125,6 +2142,10 @@ public class StreamsConfig extends AbstractConfig {
return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class);
}
protected boolean isStreamsProtocolEnabled() {
return getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name());
}
/**
* Override any client properties in the original configs with overrides
*

View File

@ -539,8 +539,6 @@ public class StreamThread extends Thread implements ProcessingThread {
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalStateException("Named topologies and the STREAMS protocol cannot be used at the same time.");
}
log.info("Streams rebalance protocol enabled");
final Optional<StreamsRebalanceData> streamsRebalanceData = Optional.of(
initStreamsRebalanceData(
processId,

View File

@ -63,6 +63,7 @@ import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -1840,6 +1841,42 @@ public class KafkaStreamsTest {
assertThat(didAssertGlobalThread.get(), equalTo(true));
}
@Test
public void shouldThrowIfPatternSubscriptionUsedWithStreamsProtocol() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018");
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
// Simulate pattern subscription
final Topology topology = new Topology();
topology.addSource("source", java.util.regex.Pattern.compile("topic-.*"));
final UnsupportedOperationException ex = assertThrows(
UnsupportedOperationException.class,
() -> new KafkaStreams(topology, props)
);
assert ex.getMessage().contains("Pattern subscriptions are not supported with the STREAMS protocol");
}
@Test
public void shouldLogWarningIfNonDefaultClientSupplierUsedWithStreamsProtocol() {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018");
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
final Topology topology = new Topology();
topology.addSource("source", "topic");
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) {
appender.setClassLogger(KafkaStreams.class, Level.WARN);
try (@SuppressWarnings("unused") final KafkaStreams ignored = new KafkaStreams(topology, new StreamsConfig(props), new MockClientSupplier())) {
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("A non-default kafka client supplier was supplied. " +
"Note that supplying a custom main consumer is not supported with the STREAMS protocol.")));
}
}
}
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,

View File

@ -48,6 +48,8 @@ import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.lang.reflect.Field;
@ -1600,6 +1602,7 @@ public class StreamsConfigTest {
@Test
public void shouldSetGroupProtocolToClassicByDefault() {
assertTrue(GroupProtocol.CLASSIC.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
assertFalse(streamsConfig.isStreamsProtocolEnabled());
}
@Test
@ -1607,6 +1610,7 @@ public class StreamsConfigTest {
props.put(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
streamsConfig = new StreamsConfig(props);
assertTrue(GroupProtocol.CLASSIC.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
assertFalse(streamsConfig.isStreamsProtocolEnabled());
}
@Test
@ -1614,6 +1618,69 @@ public class StreamsConfigTest {
props.put(GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
streamsConfig = new StreamsConfig(props);
assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
assertTrue(streamsConfig.isStreamsProtocolEnabled());
}
@Test
public void shouldLogWarningWhenStreamsProtocolIsUsed() {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
new StreamsConfig(props);
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("The streams rebalance protocol is still in development and should " +
"not be used in production. Please set group.protocol=classic (default) in all production use cases.")));
}
}
@Test
public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
new StreamsConfig(props);
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("Warmup replicas are not supported yet with the streams protocol and " +
"will be ignored. If you want to use warmup replicas, please set group.protocol=classic.")));
}
}
@Test
public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() {
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
appender.setClassLogger(StreamsConfig.class, Level.WARN);
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
new StreamsConfig(props);
assertTrue(appender.getMessages().stream()
.anyMatch(msg -> msg.contains("Standby replicas are configured broker-side in the streams group " +
"protocol and will be ignored. Please use the admin client or kafka-configs.sh to set the streams " +
"groups's standby replicas.")));
}
}
@ParameterizedTest
@ValueSource(strings = {"", StreamsConfig.CONSUMER_PREFIX, StreamsConfig.MAIN_CONSUMER_PREFIX})
public void shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership(final String prefix) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
props.put(prefix + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1");
final ConfigException exception = assertThrows(
ConfigException.class,
() -> new StreamsConfig(props)
);
assertTrue(exception.getMessage().contains("Streams rebalance protocol does not support static membership. " +
"Please set group.protocol=classic or remove group.instance.id from the configuration."));
}
static class MisconfiguredSerde implements Serde<Object> {