From 793dcee541b192115f632efb7c4d28e09f897315 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Tue, 10 Jun 2025 11:13:34 +0200 Subject: [PATCH] KAFKA-19376: Throw an error message if any unsupported feature is used with KIP-1071 (#19908) We should be mindful of ours users and let them know early if they are using an unsupported feature in 4.1. Unsupported features: - Regular expressions - Warm-up replicas (high availability assignor) - Static membership - Standby replicas enabled through local config - Named topologies (already checked) - Non-default kafka-client supplier Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../apache/kafka/streams/KafkaStreams.java | 24 ++++++- .../apache/kafka/streams/StreamsConfig.java | 23 ++++++- .../processor/internals/StreamThread.java | 2 - .../kafka/streams/KafkaStreamsTest.java | 37 ++++++++++ .../kafka/streams/StreamsConfigTest.java | 67 +++++++++++++++++++ 5 files changed, 148 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 3219c9c8995..b3723ef447d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -60,6 +60,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -971,13 +972,16 @@ public class KafkaStreams implements AutoCloseable { this.log = logContext.logger(getClass()); topologyMetadata.setLog(logContext); - // use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; - adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); log.info("Kafka Streams version: {}", ClientMetrics.version()); log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); + throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol(); + + // use client id instead of thread client id since this admin client may be shared among threads + adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.adminClientId(clientId))); + metrics = createMetrics(applicationConfigs, time, clientId); final StreamsClientMetricsDelegatingReporter reporter = new StreamsClientMetricsDelegatingReporter(adminClient, clientId); metrics.addReporter(reporter); @@ -1047,6 +1051,22 @@ public class KafkaStreams implements AutoCloseable { rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); } + private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol() { + if (applicationConfigs.isStreamsProtocolEnabled()) { + log.info("Streams rebalance protocol enabled"); + if (topologyMetadata.hasNamedTopologies()) { + throw new UnsupportedOperationException("Named topologies are not supported with the STREAMS protocol."); + } + if (topologyMetadata.usesPatternSubscription()) { + throw new UnsupportedOperationException("Pattern subscriptions are not supported with the STREAMS protocol."); + } + if (!(clientSupplier instanceof DefaultKafkaClientSupplier)) { + log.warn("A non-default kafka client supplier was supplied. Note that supplying a custom main consumer" + + " is not supported with the STREAMS protocol."); + } + } + } + private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) { final StreamThread streamThread = StreamThread.create( topologyMetadata, diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index c8fa251b6be..944446b4bec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1525,10 +1525,27 @@ public class StreamsConfig extends AbstractConfig { } verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG)); verifyClientTelemetryConfigs(); + verifyStreamsProtocolCompatibility(doLog); + } - if (doLog && getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT))) { + private void verifyStreamsProtocolCompatibility(final boolean doLog) { + if (doLog && isStreamsProtocolEnabled()) { log.warn("The streams rebalance protocol is still in development and should not be used in production. " + "Please set group.protocol=classic (default) in all production use cases."); + final Map mainConsumerConfigs = getMainConsumerConfigs("dummy", "dummy", -1); + final String instanceId = (String) mainConsumerConfigs.get(CommonClientConfigs.GROUP_INSTANCE_ID_CONFIG); + if (instanceId != null && !instanceId.isEmpty()) { + throw new ConfigException("Streams rebalance protocol does not support static membership. " + + "Please set group.protocol=classic or remove group.instance.id from the configuration."); + } + if (getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG) != 0) { + log.warn("Warmup replicas are not supported yet with the streams protocol and will be ignored. " + + "If you want to use warmup replicas, please set group.protocol=classic."); + } + if (getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG) != 0) { + log.warn("Standby replicas are configured broker-side in the streams group protocol and will be ignored. " + + "Please use the admin client or kafka-configs.sh to set the streams groups's standby replicas."); + } } } @@ -2125,6 +2142,10 @@ public class StreamsConfig extends AbstractConfig { return getConfiguredInstance(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, ProcessingExceptionHandler.class); } + protected boolean isStreamsProtocolEnabled() { + return getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name()); + } + /** * Override any client properties in the original configs with overrides * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c0cdff2421b..86e12bf3f65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -539,8 +539,6 @@ public class StreamThread extends Thread implements ProcessingThread { if (topologyMetadata.hasNamedTopologies()) { throw new IllegalStateException("Named topologies and the STREAMS protocol cannot be used at the same time."); } - log.info("Streams rebalance protocol enabled"); - final Optional streamsRebalanceData = Optional.of( initStreamsRebalanceData( processId, diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 499498224bf..d417b5be2ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -63,6 +63,7 @@ import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; +import org.apache.logging.log4j.Level; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -1840,6 +1841,42 @@ public class KafkaStreamsTest { assertThat(didAssertGlobalThread.get(), equalTo(true)); } + @Test + public void shouldThrowIfPatternSubscriptionUsedWithStreamsProtocol() { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018"); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + // Simulate pattern subscription + final Topology topology = new Topology(); + topology.addSource("source", java.util.regex.Pattern.compile("topic-.*")); + + final UnsupportedOperationException ex = assertThrows( + UnsupportedOperationException.class, + () -> new KafkaStreams(topology, props) + ); + assert ex.getMessage().contains("Pattern subscriptions are not supported with the STREAMS protocol"); + } + + @Test + public void shouldLogWarningIfNonDefaultClientSupplierUsedWithStreamsProtocol() { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018"); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + final Topology topology = new Topology(); + topology.addSource("source", "topic"); + + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class)) { + appender.setClassLogger(KafkaStreams.class, Level.WARN); + try (@SuppressWarnings("unused") final KafkaStreams ignored = new KafkaStreams(topology, new StreamsConfig(props), new MockClientSupplier())) { + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("A non-default kafka client supplier was supplied. " + + "Note that supplying a custom main consumer is not supported with the STREAMS protocol."))); + } + } + } + private Topology getStatefulTopology(final String inputTopic, final String outputTopic, final String globalTopicName, diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index f1ee8df371d..6505cde08ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -48,6 +48,8 @@ import org.apache.logging.log4j.Level; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.lang.reflect.Field; @@ -1600,6 +1602,7 @@ public class StreamsConfigTest { @Test public void shouldSetGroupProtocolToClassicByDefault() { assertTrue(GroupProtocol.CLASSIC.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG))); + assertFalse(streamsConfig.isStreamsProtocolEnabled()); } @Test @@ -1607,6 +1610,7 @@ public class StreamsConfigTest { props.put(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()); streamsConfig = new StreamsConfig(props); assertTrue(GroupProtocol.CLASSIC.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG))); + assertFalse(streamsConfig.isStreamsProtocolEnabled()); } @Test @@ -1614,6 +1618,69 @@ public class StreamsConfigTest { props.put(GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name()); streamsConfig = new StreamsConfig(props); assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG))); + assertTrue(streamsConfig.isStreamsProtocolEnabled()); + } + + @Test + public void shouldLogWarningWhenStreamsProtocolIsUsed() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("The streams rebalance protocol is still in development and should " + + "not be used in production. Please set group.protocol=classic (default) in all production use cases."))); + } + } + + @Test + public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Warmup replicas are not supported yet with the streams protocol and " + + "will be ignored. If you want to use warmup replicas, please set group.protocol=classic."))); + } + } + + @Test + public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() { + try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) { + appender.setClassLogger(StreamsConfig.class, Level.WARN); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + + new StreamsConfig(props); + + assertTrue(appender.getMessages().stream() + .anyMatch(msg -> msg.contains("Standby replicas are configured broker-side in the streams group " + + "protocol and will be ignored. Please use the admin client or kafka-configs.sh to set the streams " + + "groups's standby replicas."))); + } + } + + @ParameterizedTest + @ValueSource(strings = {"", StreamsConfig.CONSUMER_PREFIX, StreamsConfig.MAIN_CONSUMER_PREFIX}) + public void shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership(final String prefix) { + final Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); + props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams"); + props.put(prefix + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1"); + + final ConfigException exception = assertThrows( + ConfigException.class, + () -> new StreamsConfig(props) + ); + assertTrue(exception.getMessage().contains("Streams rebalance protocol does not support static membership. " + + "Please set group.protocol=classic or remove group.instance.id from the configuration.")); } static class MisconfiguredSerde implements Serde {