KAFKA-8496: System test for KIP-429 upgrades and compatibility (#7529)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bill Bejeck 2019-10-17 01:29:33 -04:00 committed by Guozhang Wang
parent 78f5da914e
commit b62f2a1123
15 changed files with 1589 additions and 2 deletions

View File

@ -147,13 +147,13 @@ public final class AssignorConfiguration {
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
log.info("Turning off cooperative rebalancing for upgrade from {}.x", upgradeFrom);
log.info("Eager rebalancing enabled now for upgrade from {}.x", upgradeFrom);
return RebalanceProtocol.EAGER;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}
log.info("Cooperative rebalancing enabled now");
return RebalanceProtocol.COOPERATIVE;
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires one argument (properties-file) but no args provided");
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = "#";
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
}
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.0)");
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + config);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source");
final String sinkTopic = config.getProperty("sink.topic", "sink");
final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
final String upgradePhase = config.getProperty("upgrade.phase", "");
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
upgradeStream.foreach(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
);
upgradeStream.to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 3) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : "")
+ (args.length > 1 ? args[1] : ""));
}
final String zookeeper = args[1];
final String propFileName = args.length > 2 ? args[2] : null;
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.1)");
System.out.println("zookeeper=" + zookeeper);
System.out.println("props=" + config);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source");
final String sinkTopic = config.getProperty("sink.topic", "sink");
final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
final String upgradePhase = config.getProperty("upgrade.phase", "");
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
upgradeStream.foreach(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
);
upgradeStream.to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.10.2)");
System.out.println("props=" + config);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source");
final String sinkTopic = config.getProperty("sink.topic", "sink");
final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
final String upgradePhase = config.getProperty("upgrade.phase", "");
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
upgradeStream.foreach(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
);
upgradeStream.to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeTest requires three argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] + " " : ""));
}
final String kafka = args[0];
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v0.11.0)");
System.out.println("kafka=" + kafka);
System.out.println("props=" + config);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = config.getProperty("source.topic", "source");
final String sinkTopic = config.getProperty("sink.topic", "sink");
final int reportInterval = Integer.parseInt(config.getProperty("report.interval", "100"));
final String upgradePhase = config.getProperty("upgrade.phase", "");
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> upgradeStream = builder.stream(sourceTopic);
upgradeStream.foreach(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
);
upgradeStream.to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v1.0)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v1.1)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.0)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.2)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.2)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final String upgradePhase = streamsProperties.getProperty("upgrade.phase", "");
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("%sProcessed %d records so far", upgradePhase, recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println(String.format("%sSTREAMS in a RUNNING State", upgradePhase));
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
final List<String> activeTasks = new ArrayList<>();
final List<String> standbyTasks = new ArrayList<>();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
getTasks(threadMetadata.activeTasks(), activeTasks);
if (!threadMetadata.standbyTasks().isEmpty()) {
getTasks(threadMetadata.standbyTasks(), standbyTasks);
}
}
addTasksToBuilder(activeTasks, taskReportBuilder);
taskReportBuilder.append(taskDelimiter);
if (!standbyTasks.isEmpty()) {
addTasksToBuilder(standbyTasks, taskReportBuilder);
}
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println(String.format("%sStarting a REBALANCE", upgradePhase));
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase));
System.out.flush();
}));
}
private static void addTasksToBuilder(final List<String> tasks, final StringBuilder builder) {
if (!tasks.isEmpty()) {
for (final String task : tasks) {
builder.append(task).append(",");
}
builder.setLength(builder.length() - 1);
}
}
private static void getTasks(final Set<TaskMetadata> taskMetadata,
final List<String> taskList) {
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskList.add(topicPartition.toString());
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
public class StreamsUpgradeToCooperativeRebalanceTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 2) {
System.err.println("StreamsUpgradeToCooperativeRebalanceTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
+ (args.length > 0 ? args[0] : ""));
}
System.out.println("Args are " + Arrays.toString(args));
final String propFileName = args[1];
final Properties streamsProperties = Utils.loadProps(propFileName);
final Properties config = new Properties();
System.out.println("StreamsTest instance started (StreamsUpgradeToCooperativeRebalanceTest v2.3)");
System.out.println("props=" + streamsProperties);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "cooperative-rebalance-upgrade");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final String sourceTopic = streamsProperties.getProperty("source.topic", "source");
final String sinkTopic = streamsProperties.getProperty("sink.topic", "sink");
final String threadDelimiter = streamsProperties.getProperty("thread.delimiter", "&");
final String taskDelimiter = streamsProperties.getProperty("task.delimiter", "#");
final int reportInterval = Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(sourceTopic)
.peek(new ForeachAction<String, String>() {
int recordCounter = 0;
@Override
public void apply(final String key, final String value) {
if (recordCounter++ % reportInterval == 0) {
System.out.println(String.format("Processed %d records so far", recordCounter));
System.out.flush();
}
}
}
).to(sinkTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING && oldState == State.REBALANCING) {
System.out.println("STREAMS in a RUNNING State");
final Set<ThreadMetadata> allThreadMetadata = streams.localThreadsMetadata();
final StringBuilder taskReportBuilder = new StringBuilder();
for (final ThreadMetadata threadMetadata : allThreadMetadata) {
buildTaskAssignmentReport(taskReportBuilder, threadMetadata.activeTasks(), "ACTIVE-TASKS:");
if (!threadMetadata.standbyTasks().isEmpty()) {
taskReportBuilder.append(taskDelimiter);
buildTaskAssignmentReport(taskReportBuilder, threadMetadata.standbyTasks(), "STANDBY-TASKS:");
}
taskReportBuilder.append(threadDelimiter);
}
taskReportBuilder.setLength(taskReportBuilder.length() - 1);
System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
}
if (newState == State.REBALANCING) {
System.out.println("Starting a REBALANCE");
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println("COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
}
private static void buildTaskAssignmentReport(final StringBuilder taskReportBuilder,
final Set<TaskMetadata> taskMetadata,
final String taskType) {
taskReportBuilder.append(taskType);
for (final TaskMetadata task : taskMetadata) {
final Set<TopicPartition> topicPartitions = task.topicPartitions();
for (final TopicPartition topicPartition : topicPartitions) {
taskReportBuilder.append(topicPartition.toString()).append(",");
}
}
taskReportBuilder.setLength(taskReportBuilder.length() - 1);
}
}

View File

@ -536,6 +536,7 @@ class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
cfg = KafkaConfig(**properties)
return cfg.render()
class StaticMemberTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka, group_instance_id, num_threads):
super(StaticMemberTestService, self).__init__(test_context,
@ -556,3 +557,87 @@ class StaticMemberTestService(StreamsTestBaseService):
cfg = KafkaConfig(**properties)
return cfg.render()
class CooperativeRebalanceUpgradeService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(CooperativeRebalanceUpgradeService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsUpgradeToCooperativeRebalanceTest",
"")
self.UPGRADE_FROM = None
# these properties will be overridden in test
self.SOURCE_TOPIC = None
self.SINK_TOPIC = None
self.TASK_DELIMITER = "#"
self.REPORT_INTERVAL = None
self.standby_tasks = None
self.active_tasks = None
self.upgrade_phase = None
def set_tasks(self, task_string):
label = "TASK-ASSIGNMENTS:"
task_string_substr = task_string[len(label):]
all_tasks = task_string_substr.split(self.TASK_DELIMITER)
self.active_tasks = set(all_tasks[0].split(","))
if len(all_tasks) > 1:
self.standby_tasks = set(all_tasks[1].split(","))
def set_version(self, kafka_streams_version):
self.KAFKA_STREAMS_VERSION = kafka_streams_version
def set_upgrade_phase(self, upgrade_phase):
self.upgrade_phase = upgrade_phase
def start_cmd(self, node):
args = self.args.copy()
if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2),
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)]:
args['kafka'] = self.kafka.bootstrap_servers()
else:
args['kafka'] = ""
if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
args['zk'] = self.kafka.zk.connect_setting()
else:
args['zk'] = ""
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['version'] = self.KAFKA_STREAMS_VERSION
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
" %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM
else:
try:
del properties['upgrade.from']
except KeyError:
self.logger.info("Key 'upgrade.from' not there, better safe than sorry")
if self.upgrade_phase is not None:
properties['upgrade.phase'] = self.upgrade_phase
properties['source.topic'] = self.SOURCE_TOPIC
properties['sink.topic'] = self.SINK_TOPIC
properties['task.delimiter'] = self.TASK_DELIMITER
properties['report.interval'] = self.REPORT_INTERVAL
cfg = KafkaConfig(**properties)
return cfg.render()

View File

@ -0,0 +1,209 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from ducktape.mark import matrix
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, DEV_BRANCH, DEV_VERSION, KafkaVersion
from kafkatest.services.streams import CooperativeRebalanceUpgradeService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
class StreamsCooperativeRebalanceUpgradeTest(Test):
"""
Test of a rolling upgrade from eager rebalance to
cooperative rebalance
"""
source_topic = "source"
sink_topic = "sink"
task_delimiter = "#"
report_interval = "1000"
processing_message = "Processed [0-9]* records so far"
stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED"
running_state_msg = "STREAMS in a RUNNING State"
cooperative_turned_off_msg = "Eager rebalancing enabled now for upgrade from %s"
cooperative_enabled_msg = "Cooperative rebalancing enabled now"
first_bounce_phase = "first_bounce_phase-"
second_bounce_phase = "second_bounce_phase-"
streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0),
str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2),
str(LATEST_2_3)]
def __init__(self, test_context):
super(StreamsCooperativeRebalanceUpgradeTest, self).__init__(test_context)
self.topics = {
self.source_topic: {'partitions': 9},
self.sink_topic: {'partitions': 9}
}
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics)
self.producer = VerifiableProducer(self.test_context,
1,
self.kafka,
self.source_topic,
throughput=1000,
acks=1)
@matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions)
def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version):
self.zookeeper.start()
self.kafka.start()
processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processor2 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processor3 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
processors = [processor1, processor2, processor3]
# produce records continually during the test
self.producer.start()
# start all processors without upgrade_from config; normal operations mode
self.logger.info("Starting all streams clients in normal running mode")
for processor in processors:
processor.set_version(upgrade_from_version)
self.set_props(processor)
processor.CLEAN_NODE_ENABLED = False
# can't use state as older version don't have state listener
# so just verify up and running
verify_running(processor, self.processing_message)
# all running rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.processing_message)
# first rolling bounce with "upgrade.from" config set
previous_phase = ""
self.maybe_upgrade_rolling_bounce_and_verify(processors,
previous_phase,
self.first_bounce_phase,
upgrade_from_version)
# All nodes processing, rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.first_bounce_phase + self.processing_message)
# second rolling bounce without "upgrade.from" config
self.maybe_upgrade_rolling_bounce_and_verify(processors,
self.first_bounce_phase,
self.second_bounce_phase)
# All nodes processing, rebalancing has ceased
for processor in processors:
self.verify_processing(processor, self.second_bounce_phase + self.processing_message)
# now verify tasks are unique
for processor in processors:
self.get_tasks_for_processor(processor)
self.logger.info("Active tasks %s" % processor.active_tasks)
overlapping_tasks = processor1.active_tasks.intersection(processor2.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor1.active_tasks, processor2.active_tasks)
overlapping_tasks = processor1.active_tasks.intersection(processor3.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor1.active_tasks, processor3.active_tasks)
overlapping_tasks = processor2.active_tasks.intersection(processor3.active_tasks)
assert len(overlapping_tasks) == int(0), \
"Final task assignments are not unique %s %s" % (processor2.active_tasks, processor3.active_tasks)
# test done close all down
stop_processors(processors, self.second_bounce_phase + self.stopped_message)
self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()
def maybe_upgrade_rolling_bounce_and_verify(self,
processors,
previous_phase,
current_phase,
upgrade_from_version=None):
for processor in processors:
# stop the processor in prep for setting "update.from" or removing "update.from"
verify_stopped(processor, previous_phase + self.stopped_message)
# upgrade to version with cooperative rebalance
processor.set_version("")
processor.set_upgrade_phase(current_phase)
if upgrade_from_version is not None:
# need to remove minor version numbers for check of valid upgrade from numbers
upgrade_version = upgrade_from_version[:upgrade_from_version.rfind('.')]
rebalance_mode_msg = self.cooperative_turned_off_msg % upgrade_version
else:
upgrade_version = None
rebalance_mode_msg = self.cooperative_enabled_msg
self.set_props(processor, upgrade_version)
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
processor.start()
# verify correct rebalance mode either turned off for upgrade or enabled after upgrade
log_monitor.wait_until(rebalance_mode_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % rebalance_mode_msg + str(processor.node.account))
# verify rebalanced into a running state
rebalance_msg = current_phase + self.running_state_msg
stdout_monitor.wait_until(rebalance_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % rebalance_msg + str(
processor.node.account))
# verify processing
verify_processing_msg = current_phase + self.processing_message
stdout_monitor.wait_until(verify_processing_msg,
timeout_sec=60,
err_msg="Never saw '%s' message " % verify_processing_msg + str(
processor.node.account))
def verify_processing(self, processor, pattern):
self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern)
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % pattern + str(processor.node.account))
def get_tasks_for_processor(self, processor):
retries = 0
while retries < 5:
found_tasks = list(processor.node.account.ssh_capture("grep TASK-ASSIGNMENTS %s | tail -n 1" % processor.STDOUT_FILE, allow_fail=True))
self.logger.info("Returned %s from assigned task check" % found_tasks)
if len(found_tasks) > 0:
task_string = str(found_tasks[0]).strip()
self.logger.info("Converted %s from assigned task check" % task_string)
processor.set_tasks(task_string)
return
retries += 1
time.sleep(1)
return
def set_props(self, processor, upgrade_from=None):
processor.SOURCE_TOPIC = self.source_topic
processor.SINK_TOPIC = self.sink_topic
processor.REPORT_INTERVAL = self.report_interval
processor.UPGRADE_FROM = upgrade_from

View File

@ -129,4 +129,5 @@ LATEST_2_2 = V_2_2_1
# 2.3.x versions
V_2_3_0 = KafkaVersion("2.3.0")
V_2_3_1 = KafkaVersion("2.3.1")
LATEST_2_3 = V_2_3_0