KAFKA-18839: Drop EAGER rebalancing support in Kafka Streams (#18988)

In 3.1 we deprecated the eager rebalancing protocol and marked it for
removal in a later release. We aim to officially drop support and remove
the protocol from Streams in 4.0.

The effect of this PR is that it will no longer be possible to perform a
live upgrade Kafka Streams directly to 4.0 from version 2.3 or below.
Users will have to go through a bridge release between 2.4 - 3.9
instead.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2025-02-25 19:05:03 -08:00 committed by GitHub
parent 4a8a0637e0
commit f20f299492
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 49 additions and 573 deletions

View File

@ -800,13 +800,11 @@ public class StreamsConfig extends AbstractConfig {
/** {@code upgrade.from} */
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " +
"This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " +
"When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. " +
"Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" +
UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" +
UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" +
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
private static final String UPGRADE_FROM_DOC = "Allows live upgrading (and downgrading in some cases -- see upgrade guide) in a backward compatible way. Default is `null`. " +
"Please refer to the Kafka Streams upgrade guide for instructions on how and when to use this config. " +
"Note that when upgrading from 3.5 to a newer version it is never required to specify this config, " +
"while upgrading live directly to 4.0+ from 2.3 or below is no longer supported even with this config. " +
"Accepted values are \"" + UPGRADE_FROM_24 + "\", \"" +
UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" +

View File

@ -92,6 +92,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableSet;
import static java.util.Map.Entry.comparingByKey;
import static org.apache.kafka.common.utils.Utils.filterMap;
@ -216,11 +217,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private Queue<StreamsException> nonFatalExceptionsToHandle;
private Time time;
// since live upgrades from 2.3 (or earlier) to 4.0 or above are no longer supported, we can always
// start with the latest supported metadata version since version probing will take
// care of downgrading it if/when necessary
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;
private Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
@ -242,7 +245,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
@ -258,7 +260,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
userEndPoint = assignorConfiguration.userEndPoint();
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
@ -273,12 +274,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
@Override
public List<RebalanceProtocol> supportedProtocols() {
final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
supportedProtocols.add(RebalanceProtocol.EAGER);
if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
supportedProtocols.add(rebalanceProtocol);
}
return supportedProtocols;
return singletonList(RebalanceProtocol.COOPERATIVE);
}
@Override
@ -1669,10 +1665,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
this.internalTopicManager = internalTopicManager;
}
RebalanceProtocol rebalanceProtocol() {
return rebalanceProtocol;
}
protected String userEndPoint() {
return userEndPoint;
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
@ -37,7 +36,6 @@ import java.util.Optional;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
public final class AssignorConfiguration {
private final String internalTaskAssignorClass;
@ -61,6 +59,8 @@ public final class AssignorConfiguration {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
validateUpgradeFrom();
{
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
@ -94,7 +94,9 @@ public final class AssignorConfiguration {
return referenceContainer;
}
public RebalanceProtocol rebalanceProtocol() {
// cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed
// in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release
public void validateUpgradeFrom() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
@ -108,106 +110,20 @@ public final class AssignorConfiguration {
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom);
log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
" Please be prepared to remove the 'upgrade.from' config soon.");
return RebalanceProtocol.EAGER;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available
final String errMsg = String.format(
"The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible."
+ " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom);
log.error(errMsg);
throw new ConfigException(errMsg);
// This config is for explicitly sending FK response to a requested partition
// and should not affect the rebalance protocol
break;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Cooperative rebalancing protocol is enabled now");
return RebalanceProtocol.COOPERATIVE;
}
public String logPrefix() {
return logPrefix;
}
public int configuredMetadataVersion(final int priorVersion) {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
log.info(
"Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.",
LATEST_SUPPORTED_VERSION
);
return 1;
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
log.info(
"Downgrading metadata.version from {} to 2 for upgrade from {}.x.",
LATEST_SUPPORTED_VERSION,
upgradeFrom
);
return 2;
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// These configs are for cooperative rebalancing and should not affect the metadata version
break;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available
// This config is for explicitly sending FK response to a requested partition
// and should not affect the metadata version
break;
default:
throw new IllegalArgumentException(
"Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
);
}
}
return priorVersion;
}
public String userEndPoint() {
final String configuredUserEndpoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (configuredUserEndpoint != null && !configuredUserEndpoint.isEmpty()) {

View File

@ -159,7 +159,6 @@ import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -371,23 +370,11 @@ public class StreamsPartitionAssignorTest {
@ParameterizedTest
@MethodSource("parameter")
public void shouldUseEagerRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
assertEquals(1, partitionAssignor.supportedProtocols().size());
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER));
assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldUseCooperativeRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
public void shouldSupportOnlyCooperativeRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
configureDefault(parameterizedConfig);
assertEquals(2, partitionAssignor.supportedProtocols().size());
assertEquals(1, partitionAssignor.supportedProtocols().size());
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
}
@ -586,7 +573,7 @@ public class StreamsPartitionAssignorTest {
@ParameterizedTest
@MethodSource("parameter")
public void testEagerSubscription(final Map<String, Object> parameterizedConfig) {
public void shouldThrowOnEagerSubscription(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
@ -600,17 +587,10 @@ public class StreamsPartitionAssignorTest {
);
createMockTaskManager(prevTasks, standbyTasks);
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
assertThat(partitionAssignor.rebalanceProtocol(), equalTo(RebalanceProtocol.EAGER));
final Set<String> topics = Set.of("topic1", "topic2");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());
final SubscriptionInfo info = getInfo(PID_1, prevTasks, standbyTasks, uniqueField);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
assertThrows(
ConfigException.class,
() -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
);
}
@ParameterizedTest
@ -2135,64 +2115,6 @@ public class StreamsPartitionAssignorTest {
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion));
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion1(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100), parameterizedConfig);
final Set<String> topics = Set.of("topic1");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0101(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101, parameterizedConfig);
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0102(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102, parameterizedConfig);
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0110(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110, parameterizedConfig);
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For10(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10, parameterizedConfig);
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For11(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11, parameterizedConfig);
}
private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue, final Map<String, Object> parameterizedConfig) {
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue), parameterizedConfig);
final Set<String> topics = Set.of("topic1");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
}
@ParameterizedTest
@MethodSource("parameter")
public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins(final Map<String, Object> parameterizedConfig) {

View File

@ -30,6 +30,7 @@ import java.util.Map;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@ -54,30 +55,21 @@ public class AssignorConfigurationTest {
}
@Test
public void rebalanceProtocolShouldSupportAllUpgradeFromVersions() {
public void shouldSupportAllUpgradeFromVersionsFromCooperativeRebalancingOn() {
boolean beforeCooperative = true;
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {
config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config);
if (upgradeFrom.toString().equals("2.4")) {
beforeCooperative = false;
}
try {
assignorConfiguration.rebalanceProtocol();
} catch (final Exception error) {
throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!");
config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
if (beforeCooperative) {
assertThrows(ConfigException.class, () -> new AssignorConfiguration(config));
} else {
assertDoesNotThrow(() -> new AssignorConfiguration(config));
}
}
}
@Test
public void configuredMetadataVersionShouldSupportAllUpgradeFromVersions() {
for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) {
config.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom.toString());
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(config);
try {
assignorConfiguration.configuredMetadataVersion(0);
} catch (final Exception error) {
throw new AssertionError("Upgrade from " + upgradeFrom + " failed with " + error.getMessage() + "!");
}
}
}
}

View File

@ -1,136 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.kstream.ForeachAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but no args provided");
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = "#";
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.printf("%sProcessed %d records so far%n", upgradePhase, recordCounter);
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.printf("%sSTREAMS in a RUNNING State%n", upgradePhase);
final Set<ThreadMetadata> allThreadMetadata = streams.metadataForLocalThreads();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
}
});
streams.start();
Exit.addShutdownHook("streams-shutdown-hook", () -> {
streams.close();
System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", upgradePhase);
System.out.flush();
});
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -23,19 +23,18 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_VERSION, KafkaVersion
smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4),
str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1),
str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4),
str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7),
str(LATEST_3_8), str(LATEST_3_9)]
smoke_test_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0),
str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3),
str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9)]
class StreamsUpgradeTest(Test):
"""
Test upgrading Kafka Streams (all version combination)
If metadata was changes, upgrade is more difficult
Metadata version was bumped in 0.10.1.0 and
subsequently bumped in 2.0.0
Test upgrading Kafka Streams (all possible version combination)
Directly upgrading from 2.3 or below is no longer supported as
of version 4.0
"""
def __init__(self, test_context):

View File

@ -1,206 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3
from kafkatest.services.streams import CooperativeRebalanceUpgradeService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
class StreamsCooperativeRebalanceUpgradeTest(Test):
"""
Test of a rolling upgrade from eager rebalance to
cooperative rebalance
"""
source_topic = "source"
sink_topic = "sink"
task_delimiter = "#"
report_interval = "1000"
processing_message = "Processed [0-9]* records so far"
stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED"
running_state_msg = "STREAMS in a RUNNING State"
cooperative_turned_off_msg = "Eager rebalancing protocol is enabled now for upgrade from %s"
cooperative_enabled_msg = "Cooperative rebalancing protocol is enabled now"
first_bounce_phase = "first_bounce_phase-"
second_bounce_phase = "second_bounce_phase-"
# !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED
streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]
def __init__(self, test_context):
super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context)
self.topics = {
self.source_topic: {'partitions': 9},
self.sink_topic: {'partitions': 9}
}
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=None, topics=self.topics,
controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context,
1,
self.kafka,
self.source_topic,
throughput=1000,
acks=1)
@cluster(num_nodes=8)
@matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions, metadata_quorum=[quorum.combined_kraft])
def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version, metadata_quorum):
self.kafka.start()
processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processor2 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processor3 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processors = [processor1, processor2, processor3]
# produce records continually during the test
self.producer.start()
# start all processors without upgrade_from config; normal operations mode
self.logger.info("Starting all streams clients in normal running mode")
for processor in processors:
processor.set_version(upgrade_from_version)
self.set_props(processor)
processor.CLEAN_NODE_ENABLED = False
# can't use state as older version don't have state listener
# so just verify up and running
verify_running(processor, self.processing_message)
# all running rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.processing_message)
# first rolling bounce with "upgrade.from" config set
previous_phase = ""
self.maybe_upgrade_rolling_bounce_and_verify(processors,
previous_phase,
self.first_bounce_phase,
upgrade_from_version)
# All nodes processing, rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.first_bounce_phase + self.processing_message)
# second rolling bounce without "upgrade.from" config
self.maybe_upgrade_rolling_bounce_and_verify(processors,
self.first_bounce_phase,
self.second_bounce_phase)
# All nodes processing, rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.second_bounce_phase + self.processing_message)
# now verify tasks are unique
for processor in processors:
self.get_tasks_for_processor(processor)
self.logger.info("Active tasks %s" % processor.active_tasks)
overlapping_tasks = processor1.active_tasks.intersection(processor2.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor1.active_tasks, processor2.active_tasks)
overlapping_tasks = processor1.active_tasks.intersection(processor3.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor1.active_tasks, processor3.active_tasks)
overlapping_tasks = processor2.active_tasks.intersection(processor3.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor2.active_tasks, processor3.active_tasks)
# test done close all down
stop_processors(processors, self.second_bounce_phase + self.stopped_message)
self.producer.stop()
self.kafka.stop()
def maybe_upgrade_rolling_bounce_and_verify(self,
processors,
previous_phase,
current_phase,
upgrade_from_version=None):
for processor in processors:
# stop the processor in prep for setting "update.from" or removing "update.from"
verify_stopped(processor, previous_phase + self.stopped_message)
# upgrade to version with cooperative rebalance
processor.set_version("")
processor.set_upgrade_phase(current_phase)
if upgrade_from_version is not None:
# need to remove minor version numbers for check of valid upgrade from numbers
upgrade_version = upgrade_from_version[:upgrade_from_version.rfind('.')]
rebalance_mode_msg = self.cooperative_turned_off_msg % upgrade_version
else:
upgrade_version = None
rebalance_mode_msg = self.cooperative_enabled_msg
self.set_props(processor, upgrade_version)
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
processor.start()
# verify correct rebalance mode either turned off for upgrade or enabled after upgrade
log_monitor.wait_until(rebalance_mode_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % rebalance_mode_msg + str(processor.node.account))
# verify rebalanced into a running state
rebalance_msg = current_phase + self.running_state_msg
stdout_monitor.wait_until(rebalance_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % rebalance_msg + str(
processor.node.account))
# verify processing
verify_processing_msg = current_phase + self.processing_message
stdout_monitor.wait_until(verify_processing_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % verify_processing_msg + str(
processor.node.account))
def verify_processing(self, processor, pattern):
self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern)
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % pattern + str(processor.node.account))
def get_tasks_for_processor(self, processor):
retries = 0
while retries < 5:
found_tasks = list(processor.node.account.ssh_capture("grep TASK-ASSIGNMENTS %s | tail -n 1" % processor.STDOUT_FILE, allow_fail=True))
self.logger.info("Returned %s from assigned task check" % found_tasks)
if len(found_tasks) > 0:
task_string = str(found_tasks[0]).strip()
self.logger.info("Converted %s from assigned task check" % task_string)
processor.set_tasks(task_string)
return
retries += 1
time.sleep(1)
return
def set_props(self, processor, upgrade_from=None):
processor.SOURCE_TOPIC = self.source_topic
processor.SINK_TOPIC = self.sink_topic
processor.REPORT_INTERVAL = self.report_interval
processor.UPGRADE_FROM = upgrade_from

View File

@ -33,8 +33,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), st
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)]
metadata_2_versions = [str(LATEST_0_11), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0
# -> https://issues.apache.org/jira/browse/KAFKA-14646