MINOR: Add system test for optimization upgrades (#5912)

This is a new system test testing for optimizing an existing topology. This test takes the following steps

1. Start a Kafka Streams application that uses a selectKey then performs 3 groupByKey() operations and 1 join creating four repartition topics
2. Verify all instances start and process data
3. Stop all instances and verify stopped
4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to all then restart the instance and verify the instance has started successfully also verifying Kafka Streams reduced the number of repartition topics from 4 to 1
5. Verify that each instance is processing data from the aggregation, reduce, and join operation
Stop all instances and verify the shut down is complete.
6. For testing I ran two passes of the system test with 25 repeats for a total of 50 test runs.

All test runs passed

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Bill Bejeck 2018-11-27 16:07:34 -05:00 committed by Guozhang Wang
parent 55c77ebf01
commit dfd545485a
3 changed files with 332 additions and 0 deletions

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
public class StreamsOptimizedTest {
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");
}
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started StreamsOptimizedTest");
System.out.println("props=" + streamsProperties);
final String inputTopic = (String) Objects.requireNonNull(streamsProperties.remove("input.topic"));
final String aggregationTopic = (String) Objects.requireNonNull(streamsProperties.remove("aggregation.topic"));
final String reduceTopic = (String) Objects.requireNonNull(streamsProperties.remove("reduce.topic"));
final String joinTopic = (String) Objects.requireNonNull(streamsProperties.remove("join.topic"));
final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
final Initializer<Integer> initializer = () -> 0;
final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
final Reducer<String> reducer = (v1, v2) -> Integer.toString(Integer.parseInt(v1) + Integer.parseInt(v2));
final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v));
final KStream<String, Long> countStream = mappedStream.groupByKey()
.count(Materialized.with(Serdes.String(),
Serdes.Long())).toStream();
mappedStream.groupByKey().aggregate(
initializer,
aggregator,
Materialized.with(Serdes.String(), Serdes.Integer()))
.toStream()
.peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v)))
.to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer()));
mappedStream.groupByKey()
.reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
.toStream()
.peek((k, v) -> System.out.println(String.format("REDUCED key=%s value=%s", k, v)))
.to(reduceTopic, Produced.with(Serdes.String(), Serdes.String()));
mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
JoinWindows.of(ofMillis(500)),
Joined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
.peek((k, v) -> System.out.println(String.format("JOINED key=%s value=%s", k, v)))
.to(joinTopic, Produced.with(Serdes.String(), Serdes.String()));
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest");
config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.putAll(streamsProperties);
final Topology topology = builder.build(config);
final KafkaStreams streams = new KafkaStreams(topology, config);
streams.setStateListener((oldState, newState) -> {
if (oldState == State.REBALANCING && newState == State.RUNNING) {
final int repartitionTopicCount = getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern);
System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", repartitionTopicCount));
System.out.flush();
}
});
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("closing Kafka Streams instance");
System.out.flush();
streams.close(Duration.ofMillis(5000));
System.out.println("OPTIMIZE_TEST Streams Stopped");
System.out.flush();
}
});
}
private static int getCountOfRepartitionTopicsFound(final String topologyString,
final Pattern repartitionTopicPattern) {
final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
final List<String> repartitionTopicsFound = new ArrayList<>();
while (matcher.find()) {
final String repartitionTopic = matcher.group();
System.out.println(String.format("REPARTITION TOPIC found -> %s", repartitionTopic));
repartitionTopicsFound.add(repartitionTopic);
}
return repartitionTopicsFound.size();
}
}

View File

@ -424,6 +424,32 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
configs)
class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsOptimizedUpgradeTestService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsOptimizedTest",
"")
self.OPTIMIZED_CONFIG = 'none'
self.INPUT_TOPIC = None
self.AGGREGATION_TOPIC = None
self.REDUCE_TOPIC = None
self.JOIN_TOPIC = None
def prop_file(self):
properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
properties['topology.optimization'] = self.OPTIMIZED_CONFIG
properties['input.topic'] = self.INPUT_TOPIC
properties['aggregation.topic'] = self.AGGREGATION_TOPIC
properties['reduce.topic'] = self.REDUCE_TOPIC
properties['join.topic'] = self.JOIN_TOPIC
cfg = KafkaConfig(**properties)
return cfg.render()
class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,

View File

@ -0,0 +1,150 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
class StreamsOptimizedTest(Test):
"""
Test doing upgrades of a Kafka Streams application
that is un-optimized initially then optimized
"""
input_topic = 'inputTopic'
aggregation_topic = 'aggregationTopic'
reduce_topic = 'reduceTopic'
join_topic = 'joinTopic'
operation_pattern = 'AGGREGATED\|REDUCED\|JOINED'
def __init__(self, test_context):
super(StreamsOptimizedTest, self).__init__(test_context)
self.topics = {
self.input_topic: {'partitions': 6},
self.aggregation_topic: {'partitions': 6},
self.reduce_topic: {'partitions': 6},
self.join_topic: {'partitions': 6}
}
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics)
self.producer = VerifiableProducer(self.test_context,
1,
self.kafka,
self.input_topic,
throughput=1000,
acks=1)
def test_upgrade_optimized_topology(self):
self.zookeeper.start()
self.kafka.start()
processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
processor2 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
processor3 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka)
processors = [processor1, processor2, processor3]
# produce records continually during the test
self.producer.start()
# start all processors unoptimized
for processor in processors:
self.set_topics(processor)
processor.CLEAN_NODE_ENABLED = False
self.verify_running_repartition_topic_count(processor, 4)
self.verify_processing(processors, verify_individual_operations=False)
self.stop_processors(processors)
# start again with topology optimized
for processor in processors:
processor.OPTIMIZED_CONFIG = 'all'
self.verify_running_repartition_topic_count(processor, 1)
self.verify_processing(processors, verify_individual_operations=True)
self.stop_processors(processors)
self.producer.stop()
self.kafka.stop()
self.zookeeper.stop()
@staticmethod
def verify_running_repartition_topic_count(processor, repartition_topic_count):
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
monitor.wait_until('REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' % repartition_topic_count,
timeout_sec=60,
err_msg="Never saw 'REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' message "
% repartition_topic_count + str(processor.node.account))
@staticmethod
def verify_stopped(processor):
node = processor.node
with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.stop()
monitor.wait_until('OPTIMIZE_TEST Streams Stopped',
timeout_sec=60,
err_msg="'OPTIMIZE_TEST Streams Stopped' message" + str(processor.node.account))
def verify_processing(self, processors, verify_individual_operations):
for processor in processors:
if not self.all_source_subtopology_tasks(processor):
if verify_individual_operations:
for operation in self.operation_pattern.split('\|'):
self.do_verify(processor, operation)
else:
self.do_verify(processor, self.operation_pattern)
else:
self.logger.info("Skipping processor %s with all source tasks" % processor.node.account)
def do_verify(self, processor, pattern):
self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern)
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
monitor.wait_until(pattern,
timeout_sec=60,
err_msg="Never saw processing of %s " % pattern + str(processor.node.account))
def all_source_subtopology_tasks(self, processor):
retries = 0
while retries < 5:
found = list(processor.node.account.ssh_capture("sed -n 's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % processor.LOG_FILE, allow_fail=True))
self.logger.info("Returned %s from assigned task check" % found)
if len(found) > 0:
return True
retries += 1
time.sleep(1)
return False
def stop_processors(self, processors):
for processor in processors:
self.verify_stopped(processor)
def set_topics(self, processor):
processor.INPUT_TOPIC = self.input_topic
processor.AGGREGATION_TOPIC = self.aggregation_topic
processor.REDUCE_TOPIC = self.reduce_topic
processor.JOIN_TOPIC = self.join_topic