diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index fa9e1bd48f8..bb8b63a0da8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -800,13 +800,11 @@ public class StreamsConfig extends AbstractConfig { /** {@code upgrade.from} */ @SuppressWarnings("WeakerAccess") public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; - private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " + - "This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " + - "When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. " + - "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + - UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + - UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" + - UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" + + private static final String UPGRADE_FROM_DOC = "Allows live upgrading (and downgrading in some cases -- see upgrade guide) in a backward compatible way. Default is `null`. " + + "Please refer to the Kafka Streams upgrade guide for instructions on how and when to use this config. " + + "Note that when upgrading from 3.5 to a newer version it is never required to specify this config, " + + "while upgrading live directly to 4.0+ from 2.3 or below is no longer supported even with this config. " + + "Accepted values are \"" + UPGRADE_FROM_24 + "\", \"" + UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" + UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" + UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 4e26b697e05..f458629cf1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -92,6 +92,7 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableSet; import static java.util.Map.Entry.comparingByKey; import static org.apache.kafka.common.utils.Utils.filterMap; @@ -216,11 +217,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private Queue nonFatalExceptionsToHandle; private Time time; + // since live upgrades from 2.3 (or earlier) to 4.0 or above are no longer supported, we can always + // start with the latest supported metadata version since version probing will take + // care of downgrading it if/when necessary protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION; private InternalTopicManager internalTopicManager; private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer; - private RebalanceProtocol rebalanceProtocol; private AssignmentListener assignmentListener; private Supplier> @@ -242,7 +245,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf logPrefix = assignorConfiguration.logPrefix(); log = new LogContext(logPrefix).logger(getClass()); - usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion); final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer(); mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified"); @@ -258,7 +260,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf userEndPoint = assignorConfiguration.userEndPoint(); internalTopicManager = assignorConfiguration.internalTopicManager(); copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer(); - rebalanceProtocol = assignorConfiguration.rebalanceProtocol(); customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor; legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); @@ -273,12 +274,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf @Override public List supportedProtocols() { - final List supportedProtocols = new ArrayList<>(); - supportedProtocols.add(RebalanceProtocol.EAGER); - if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) { - supportedProtocols.add(rebalanceProtocol); - } - return supportedProtocols; + return singletonList(RebalanceProtocol.COOPERATIVE); } @Override @@ -1669,10 +1665,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf this.internalTopicManager = internalTopicManager; } - RebalanceProtocol rebalanceProtocol() { - return rebalanceProtocol; - } - protected String userEndPoint() { return userEndPoint; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index bc2324044da..b210e638eca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; @@ -37,7 +36,6 @@ import java.util.Optional; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS; -import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; public final class AssignorConfiguration { private final String internalTaskAssignorClass; @@ -61,6 +59,8 @@ public final class AssignorConfiguration { final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); + validateUpgradeFrom(); + { final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR); if (o == null) { @@ -94,7 +94,9 @@ public final class AssignorConfiguration { return referenceContainer; } - public RebalanceProtocol rebalanceProtocol() { + // cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed + // in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release + public void validateUpgradeFrom() { final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); if (upgradeFrom != null) { switch (UpgradeFromValues.fromString(upgradeFrom)) { @@ -108,106 +110,20 @@ public final class AssignorConfiguration { case UPGRADE_FROM_21: case UPGRADE_FROM_22: case UPGRADE_FROM_23: - // ATTENTION: The following log messages is used for verification in system test - // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance - // If you change it, please do also change the system test accordingly and - // verify whether the test passes. - log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom); - log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." + - " Please be prepared to remove the 'upgrade.from' config soon."); - return RebalanceProtocol.EAGER; - case UPGRADE_FROM_24: - case UPGRADE_FROM_25: - case UPGRADE_FROM_26: - case UPGRADE_FROM_27: - case UPGRADE_FROM_28: - case UPGRADE_FROM_30: - case UPGRADE_FROM_31: - case UPGRADE_FROM_32: - case UPGRADE_FROM_33: - case UPGRADE_FROM_34: - case UPGRADE_FROM_35: - case UPGRADE_FROM_36: - case UPGRADE_FROM_37: - case UPGRADE_FROM_38: - case UPGRADE_FROM_39: - // we need to add new version when new "upgrade.from" values become available + final String errMsg = String.format( + "The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible." + + " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom); + log.error(errMsg); + throw new ConfigException(errMsg); - // This config is for explicitly sending FK response to a requested partition - // and should not affect the rebalance protocol - break; - default: - throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom); } } - // ATTENTION: The following log messages is used for verification in system test - // streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance - // If you change it, please do also change the system test accordingly and - // verify whether the test passes. - log.info("Cooperative rebalancing protocol is enabled now"); - return RebalanceProtocol.COOPERATIVE; } public String logPrefix() { return logPrefix; } - public int configuredMetadataVersion(final int priorVersion) { - final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG); - if (upgradeFrom != null) { - switch (UpgradeFromValues.fromString(upgradeFrom)) { - case UPGRADE_FROM_0100: - log.info( - "Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.", - LATEST_SUPPORTED_VERSION - ); - return 1; - case UPGRADE_FROM_0101: - case UPGRADE_FROM_0102: - case UPGRADE_FROM_0110: - case UPGRADE_FROM_10: - case UPGRADE_FROM_11: - log.info( - "Downgrading metadata.version from {} to 2 for upgrade from {}.x.", - LATEST_SUPPORTED_VERSION, - upgradeFrom - ); - return 2; - case UPGRADE_FROM_20: - case UPGRADE_FROM_21: - case UPGRADE_FROM_22: - case UPGRADE_FROM_23: - // These configs are for cooperative rebalancing and should not affect the metadata version - break; - case UPGRADE_FROM_24: - case UPGRADE_FROM_25: - case UPGRADE_FROM_26: - case UPGRADE_FROM_27: - case UPGRADE_FROM_28: - case UPGRADE_FROM_30: - case UPGRADE_FROM_31: - case UPGRADE_FROM_32: - case UPGRADE_FROM_33: - case UPGRADE_FROM_34: - case UPGRADE_FROM_35: - case UPGRADE_FROM_36: - case UPGRADE_FROM_37: - case UPGRADE_FROM_38: - case UPGRADE_FROM_39: - // we need to add new version when new "upgrade.from" values become available - - // This config is for explicitly sending FK response to a requested partition - // and should not affect the metadata version - break; - default: - throw new IllegalArgumentException( - "Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom - ); - } - } - return priorVersion; - } - public String userEndPoint() { final String configuredUserEndpoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG); if (configuredUserEndpoint != null && !configuredUserEndpoint.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 592326fae87..29fa204a579 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -159,7 +159,6 @@ import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -371,23 +370,11 @@ public class StreamsPartitionAssignorTest { @ParameterizedTest @MethodSource("parameter") - public void shouldUseEagerRebalancingProtocol(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - createDefaultMockTaskManager(); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig); - - assertEquals(1, partitionAssignor.supportedProtocols().size()); - assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER)); - assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldUseCooperativeRebalancingProtocol(final Map parameterizedConfig) { + public void shouldSupportOnlyCooperativeRebalancingProtocol(final Map parameterizedConfig) { setUp(parameterizedConfig, false); configureDefault(parameterizedConfig); - assertEquals(2, partitionAssignor.supportedProtocols().size()); + assertEquals(1, partitionAssignor.supportedProtocols().size()); assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE)); } @@ -586,7 +573,7 @@ public class StreamsPartitionAssignorTest { @ParameterizedTest @MethodSource("parameter") - public void testEagerSubscription(final Map parameterizedConfig) { + public void shouldThrowOnEagerSubscription(final Map parameterizedConfig) { setUp(parameterizedConfig, false); builder.addSource(null, "source1", null, null, null, "topic1"); builder.addSource(null, "source2", null, null, null, "topic2"); @@ -600,17 +587,10 @@ public class StreamsPartitionAssignorTest { ); createMockTaskManager(prevTasks, standbyTasks); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig); - assertThat(partitionAssignor.rebalanceProtocol(), equalTo(RebalanceProtocol.EAGER)); - - final Set topics = Set.of("topic1", "topic2"); - final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); - - Collections.sort(subscription.topics()); - assertEquals(asList("topic1", "topic2"), subscription.topics()); - - final SubscriptionInfo info = getInfo(PID_1, prevTasks, standbyTasks, uniqueField); - assertEquals(info, SubscriptionInfo.decode(subscription.userData())); + assertThrows( + ConfigException.class, + () -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig) + ); } @ParameterizedTest @@ -2135,64 +2115,6 @@ public class StreamsPartitionAssignorTest { assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion)); } - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion1(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - createDefaultMockTaskManager(); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100), parameterizedConfig); - - final Set topics = Set.of("topic1"); - final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); - - assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1)); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion2For0101(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101, parameterizedConfig); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion2For0102(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102, parameterizedConfig); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion2For0110(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110, parameterizedConfig); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion2For10(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10, parameterizedConfig); - } - - @ParameterizedTest - @MethodSource("parameter") - public void shouldDownGradeSubscriptionToVersion2For11(final Map parameterizedConfig) { - setUp(parameterizedConfig, false); - shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11, parameterizedConfig); - } - - private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue, final Map parameterizedConfig) { - createDefaultMockTaskManager(); - configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue), parameterizedConfig); - - final Set topics = Set.of("topic1"); - final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics)); - - assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2)); - } - @ParameterizedTest @MethodSource("parameter") public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins(final Map parameterizedConfig) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java index e6ac8aa50ad..261f718f09b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java @@ -30,6 +30,7 @@ import java.util.Map; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; @@ -54,30 +55,21 @@ public class AssignorConfigurationTest { } @Test - public void rebalanceProtocolShouldSupportAllUpgradeFromVersions() { + public void shouldSupportAllUpgradeFromVersionsFromCooperativeRebalancingOn() { + boolean beforeCooperative = true; for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) { - config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); - final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config); + if (upgradeFrom.toString().equals("2.4")) { + beforeCooperative = false; + } - try { - assignorConfiguration.rebalanceProtocol(); - } catch (final Exception error) { - throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!"); + config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); + + if (beforeCooperative) { + assertThrows(ConfigException.class, () -> new AssignorConfiguration(config)); + } else { + assertDoesNotThrow(() -> new AssignorConfiguration(config)); } } } - @Test - public void configuredMetadataVersionShouldSupportAllUpgradeFromVersions() { - for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) { - config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString()); - final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config); - - try { - assignorConfiguration.configuredMetadataVersion(0); - } catch (final Exception error) { - throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!"); - } - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java deleted file mode 100644 index 0a7bbe14f5c..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.tests; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreams.State; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TaskMetadata; -import org.apache.kafka.streams.ThreadMetadata; -import org.apache.kafka.streams.kstream.ForeachAction; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -public class StreamsUpgradeToCooperativeRebalanceTest { - - - public static void main(final String[] args) throws Exception { - if (args.length < 1) { - System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but no args provided"); - } - System.out.println("Args are " + Arrays.toString(args)); - final String propFileName = args[0]; - final Properties streamsProperties = Utils.loadProps(propFileName); - - final Properties config = new Properties(); - System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest)"); - System.out.println("props=" + streamsProperties); - - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade"); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); - config.putAll(streamsProperties); - - final String sourceTopic = streamsProperties.getProperty("source.topic", "source"); - final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink"); - final String taskDelimiter = "#"; - final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100")); - final String upgradePhase = streamsProperties.getProperty("upgrade.phase", ""); - - final StreamsBuilder builder = new StreamsBuilder(); - - builder.stream(sourceTopic) - .peek(new ForeachAction() { - int recordCounter = 0; - - @Override - public void apply(final String key, final String value) { - if (recordCounter++ % reportInterval == 0) { - System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter); - System.out.flush(); - } - } - } - ).to(sinkTopic); - - final KafkaStreams streams = new KafkaStreams(builder.build(), config); - - streams.setStateListener((newState, oldState) -> { - if (newState == State.RUNNING && oldState == State.REBALANCING) { - System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase); - final Set allThreadMetadata = streams.metadataForLocalThreads(); - final StringBuilder taskReportBuilder = new StringBuilder(); - final List activeTasks = new ArrayList<>(); - final List standbyTasks = new ArrayList<>(); - for (final ThreadMetadata threadMetadata : allThreadMetadata) { - getTasks(threadMetadata.activeTasks(), activeTasks); - if (!threadMetadata.standbyTasks().isEmpty()) { - getTasks(threadMetadata.standbyTasks(), standbyTasks); - } - } - addTasksToBuilder(activeTasks, taskReportBuilder); - taskReportBuilder.append(taskDelimiter); - if (!standbyTasks.isEmpty()) { - addTasksToBuilder(standbyTasks, taskReportBuilder); - } - System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder); - } - - if (newState == State.REBALANCING) { - System.out.printf("%sStarting a REBALANCE%n", upgradePhase); - } - }); - - - streams.start(); - - Exit.addShutdownHook("streams-shutdown-hook", () -> { - streams.close(); - System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase); - System.out.flush(); - }); - } - - private static void addTasksToBuilder(final List tasks, final StringBuilder builder) { - if (!tasks.isEmpty()) { - for (final String task : tasks) { - builder.append(task).append(","); - } - builder.setLength(builder.length() - 1); - } - } - - private static void getTasks(final Set taskMetadata, - final List taskList) { - for (final TaskMetadata task : taskMetadata) { - final Set topicPartitions = task.topicPartitions(); - for (final TopicPartition topicPartition : topicPartitions) { - taskList.add(topicPartition.toString()); - } - } - } -} diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py index 70d30b35012..1ae56ac0d95 100644 --- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py @@ -23,19 +23,18 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_VERSION, KafkaVersion -smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), - str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), - str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), - str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), - str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), - str(LATEST_3_8), str(LATEST_3_9)] + +smoke_test_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), + str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), + str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), + str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), + str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9)] class StreamsUpgradeTest(Test): """ - Test upgrading Kafka Streams (all version combination) - If metadata was changes, upgrade is more difficult - Metadata version was bumped in 0.10.1.0 and - subsequently bumped in 2.0.0 + Test upgrading Kafka Streams (all possible version combination) + Directly upgrading from 2.3 or below is no longer supported as + of version 4.0 """ def __init__(self, test_context): diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py deleted file mode 100644 index a478f11f340..00000000000 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ /dev/null @@ -1,206 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time -from ducktape.mark import matrix -from ducktape.mark.resource import cluster -from ducktape.tests.test import Test -from kafkatest.services.kafka import KafkaService, quorum -from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3 -from kafkatest.services.streams import CooperativeRebalanceUpgradeService -from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running - - -class StreamsCooperativeRebalanceUpgradeTest(Test): - """ - Test of a rolling upgrade from eager rebalance to - cooperative rebalance - """ - - source_topic = "source" - sink_topic = "sink" - task_delimiter = "#" - report_interval = "1000" - processing_message = "Processed [0-9]* records so far" - stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED" - running_state_msg = "STREAMS in a RUNNING State" - cooperative_turned_off_msg = "Eager rebalancing protocol is enabled now for upgrade from %s" - cooperative_enabled_msg = "Cooperative rebalancing protocol is enabled now" - first_bounce_phase = "first_bounce_phase-" - second_bounce_phase = "second_bounce_phase-" - - # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED - streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)] - - def __init__(self, test_context): - super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context) - self.topics = { - self.source_topic: {'partitions': 9}, - self.sink_topic: {'partitions': 9} - } - - self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=None, topics=self.topics, - controller_num_nodes_override=1) - - self.producer = VerifiableProducer(self.test_context, - 1, - self.kafka, - self.source_topic, - throughput=1000, - acks=1) - - @cluster(num_nodes=8) - @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions, metadata_quorum=[quorum.combined_kraft]) - def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version, metadata_quorum): - self.kafka.start() - - processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka) - processor2 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka) - processor3 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka) - - processors = [processor1, processor2, processor3] - - # produce records continually during the test - self.producer.start() - - # start all processors without upgrade_from config; normal operations mode - self.logger.info("Starting all streams clients in normal running mode") - for processor in processors: - processor.set_version(upgrade_from_version) - self.set_props(processor) - processor.CLEAN_NODE_ENABLED = False - # can't use state as older version don't have state listener - # so just verify up and running - verify_running(processor, self.processing_message) - - # all running rebalancing has ceased - for processor in processors: - self.verify_processing(processor, self.processing_message) - - # first rolling bounce with "upgrade.from" config set - previous_phase = "" - self.maybe_upgrade_rolling_bounce_and_verify(processors, - previous_phase, - self.first_bounce_phase, - upgrade_from_version) - - # All nodes processing, rebalancing has ceased - for processor in processors: - self.verify_processing(processor, self.first_bounce_phase + self.processing_message) - - # second rolling bounce without "upgrade.from" config - self.maybe_upgrade_rolling_bounce_and_verify(processors, - self.first_bounce_phase, - self.second_bounce_phase) - - # All nodes processing, rebalancing has ceased - for processor in processors: - self.verify_processing(processor, self.second_bounce_phase + self.processing_message) - - # now verify tasks are unique - for processor in processors: - self.get_tasks_for_processor(processor) - self.logger.info("Active tasks %s" % processor.active_tasks) - - overlapping_tasks = processor1.active_tasks.intersection(processor2.active_tasks) - assert len(overlapping_tasks) == int(0), \ - "Final task assignments are not unique %s %s" % (processor1.active_tasks, processor2.active_tasks) - - overlapping_tasks = processor1.active_tasks.intersection(processor3.active_tasks) - assert len(overlapping_tasks) == int(0), \ - "Final task assignments are not unique %s %s" % (processor1.active_tasks, processor3.active_tasks) - - overlapping_tasks = processor2.active_tasks.intersection(processor3.active_tasks) - assert len(overlapping_tasks) == int(0), \ - "Final task assignments are not unique %s %s" % (processor2.active_tasks, processor3.active_tasks) - - # test done close all down - stop_processors(processors, self.second_bounce_phase + self.stopped_message) - - self.producer.stop() - self.kafka.stop() - - def maybe_upgrade_rolling_bounce_and_verify(self, - processors, - previous_phase, - current_phase, - upgrade_from_version=None): - for processor in processors: - # stop the processor in prep for setting "update.from" or removing "update.from" - verify_stopped(processor, previous_phase + self.stopped_message) - # upgrade to version with cooperative rebalance - processor.set_version("") - processor.set_upgrade_phase(current_phase) - - if upgrade_from_version is not None: - # need to remove minor version numbers for check of valid upgrade from numbers - upgrade_version = upgrade_from_version[:upgrade_from_version.rfind('.')] - rebalance_mode_msg = self.cooperative_turned_off_msg % upgrade_version - else: - upgrade_version = None - rebalance_mode_msg = self.cooperative_enabled_msg - - self.set_props(processor, upgrade_version) - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: - with node.account.monitor_log(processor.LOG_FILE) as log_monitor: - processor.start() - # verify correct rebalance mode either turned off for upgrade or enabled after upgrade - log_monitor.wait_until(rebalance_mode_msg, - timeout_sec=60, - err_msg="Never saw '%s' message " % rebalance_mode_msg + str(processor.node.account)) - - # verify rebalanced into a running state - rebalance_msg = current_phase + self.running_state_msg - stdout_monitor.wait_until(rebalance_msg, - timeout_sec=60, - err_msg="Never saw '%s' message " % rebalance_msg + str( - processor.node.account)) - - # verify processing - verify_processing_msg = current_phase + self.processing_message - stdout_monitor.wait_until(verify_processing_msg, - timeout_sec=60, - err_msg="Never saw '%s' message " % verify_processing_msg + str( - processor.node.account)) - - def verify_processing(self, processor, pattern): - self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern) - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - monitor.wait_until(pattern, - timeout_sec=60, - err_msg="Never saw processing of %s " % pattern + str(processor.node.account)) - - def get_tasks_for_processor(self, processor): - retries = 0 - while retries < 5: - found_tasks = list(processor.node.account.ssh_capture("grep TASK-ASSIGNMENTS %s | tail -n 1" % processor.STDOUT_FILE, allow_fail=True)) - self.logger.info("Returned %s from assigned task check" % found_tasks) - if len(found_tasks) > 0: - task_string = str(found_tasks[0]).strip() - self.logger.info("Converted %s from assigned task check" % task_string) - processor.set_tasks(task_string) - return - retries += 1 - time.sleep(1) - return - - def set_props(self, processor, upgrade_from=None): - processor.SOURCE_TOPIC = self.source_topic - processor.SINK_TOPIC = self.sink_topic - processor.REPORT_INTERVAL = self.report_interval - processor.UPGRADE_FROM = upgrade_from diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 10ec025f194..f44680f2b43 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -33,8 +33,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), st str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)] -metadata_2_versions = [str(LATEST_0_11), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), +metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0 # -> https://issues.apache.org/jira/browse/KAFKA-14646