mirror of https://github.com/apache/kafka.git
329 lines
11 KiB
Bash
Executable File
329 lines
11 KiB
Bash
Executable File
#!/bin/bash
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
readonly num_messages=400000
|
|
readonly message_size=400
|
|
readonly action_on_fail="proceed"
|
|
|
|
readonly test_start_time="$(date +%s)"
|
|
|
|
readonly base_dir=$(dirname $0)/..
|
|
|
|
info() {
|
|
echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
|
|
}
|
|
|
|
kill_child_processes() {
|
|
isTopmost=$1
|
|
curPid=$2
|
|
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
|
|
for childPid in $childPids
|
|
do
|
|
kill_child_processes 0 $childPid
|
|
done
|
|
if [ $isTopmost -eq 0 ]; then
|
|
kill -15 $curPid 2> /dev/null
|
|
fi
|
|
}
|
|
|
|
cleanup() {
|
|
info "cleaning up"
|
|
|
|
pid_zk_source=
|
|
pid_zk_target=
|
|
pid_kafka_source1=
|
|
pid_kafka_source2=
|
|
pid_kafka_source3=
|
|
pid_kafka_target1=
|
|
pid_kafka_target2=
|
|
pid_producer=
|
|
|
|
rm -rf /tmp/zookeeper_source
|
|
rm -rf /tmp/zookeeper_target
|
|
|
|
rm -rf /tmp/kafka-source{1..3}-logs
|
|
# mkdir -p /tmp/kafka-source{1..3}-logs/test0{1..3}-0
|
|
# touch /tmp/kafka-source{1..3}-logs/test0{1..3}-0/00000000000000000000.kafka
|
|
|
|
rm -rf /tmp/kafka-target{1..2}-logs
|
|
}
|
|
|
|
begin_timer() {
|
|
t_begin=$(date +%s)
|
|
}
|
|
|
|
end_timer() {
|
|
t_end=$(date +%s)
|
|
}
|
|
|
|
start_zk() {
|
|
info "starting zookeepers"
|
|
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source.properties 2>&1 > $base_dir/zookeeper_source.log &
|
|
pid_zk_source=$!
|
|
$base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
|
|
pid_zk_target=$!
|
|
}
|
|
|
|
start_source_servers() {
|
|
info "starting source cluster"
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source1.properties 2>&1 > $base_dir/kafka_source1.log &
|
|
pid_kafka_source1=$!
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source2.properties 2>&1 > $base_dir/kafka_source2.log &
|
|
pid_kafka_source2=$!
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source3.properties 2>&1 > $base_dir/kafka_source3.log &
|
|
pid_kafka_source3=$!
|
|
}
|
|
|
|
start_target_servers_for_whitelist_test() {
|
|
echo "starting mirror cluster"
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
|
|
pid_kafka_target1=$!
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/whitelisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
|
|
pid_kafka_target2=$!
|
|
}
|
|
|
|
start_target_servers_for_blacklist_test() {
|
|
echo "starting mirror cluster"
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target1.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target1.log &
|
|
pid_kafka_target1=$!
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target2.properties $base_dir/config/blacklisttest.consumer.properties $base_dir/config/mirror_producer.properties 2>&1 > $base_dir/kafka_target2.log &
|
|
pid_kafka_target2=$!
|
|
}
|
|
|
|
shutdown_servers() {
|
|
info "stopping producer"
|
|
if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
|
|
|
|
info "shutting down target servers"
|
|
if [ "x${pid_kafka_target1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target1}; fi
|
|
if [ "x${pid_kafka_target2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target2}; fi
|
|
sleep 2
|
|
|
|
info "shutting down source servers"
|
|
if [ "x${pid_kafka_source1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source1}; fi
|
|
if [ "x${pid_kafka_source2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source2}; fi
|
|
if [ "x${pid_kafka_source3}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source3}; fi
|
|
|
|
info "shutting down zookeeper servers"
|
|
if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
|
|
if [ "x${pid_zk_source}" != "x" ]; then kill_child_processes 0 ${pid_zk_source}; fi
|
|
}
|
|
|
|
start_producer() {
|
|
topic=$1
|
|
info "start producing messages for topic $topic ..."
|
|
$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
|
|
pid_producer=$!
|
|
}
|
|
|
|
# In case the consumer does not consume, the test may exit prematurely (i.e.,
|
|
# shut down the kafka brokers, and ProducerPerformance will start throwing ugly
|
|
# exceptions. So, wait for the producer to finish before shutting down. If it
|
|
# takes too long, the user can just hit Ctrl-c which is trapped to kill child
|
|
# processes.
|
|
# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
|
|
wait_partition_done() {
|
|
n_tuples=$(($# / 3))
|
|
|
|
i=1
|
|
while (($#)); do
|
|
kafka_server[i]=$1
|
|
topic[i]=$2
|
|
partitionid[i]=$3
|
|
prev_offset[i]=0
|
|
info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
|
|
i=$((i+1))
|
|
shift 3
|
|
done
|
|
|
|
all_done=0
|
|
|
|
# set -x
|
|
while [[ $all_done != 1 ]]; do
|
|
sleep 4
|
|
i=$n_tuples
|
|
all_done=1
|
|
for ((i=1; i <= $n_tuples; i++)); do
|
|
cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
|
|
if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
|
|
all_done=0
|
|
prev_offset[i]=$cur_size
|
|
fi
|
|
done
|
|
done
|
|
|
|
}
|
|
|
|
cmp_logs() {
|
|
topic=$1
|
|
info "comparing source and target logs for topic $topic"
|
|
source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
|
|
source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
|
|
source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
|
|
target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
|
|
target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
|
|
if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
|
|
if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
|
|
expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size))
|
|
actual_size=$(($target_part0_size + $target_part1_size))
|
|
if [ "x$expected_size" != "x$actual_size" ]
|
|
then
|
|
info "source size: $expected_size target size: $actual_size"
|
|
return 1
|
|
else
|
|
return 0
|
|
fi
|
|
}
|
|
|
|
take_fail_snapshot() {
|
|
snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
|
|
mkdir $snapshot_dir
|
|
for dir in /tmp/zookeeper_source /tmp/zookeeper_target /tmp/kafka-source{1..3}-logs /tmp/kafka-target{1..2}-logs; do
|
|
if [ -d $dir ]; then
|
|
cp -r $dir $snapshot_dir
|
|
fi
|
|
done
|
|
}
|
|
|
|
# Usage: process_test_result <result> <action_on_fail>
|
|
# result: last test result
|
|
# action_on_fail: (exit|wait|proceed)
|
|
# ("wait" is useful if you want to troubleshoot using zookeeper)
|
|
process_test_result() {
|
|
result=$1
|
|
if [ $1 -eq 0 ]; then
|
|
info "test passed"
|
|
else
|
|
info "test failed"
|
|
case "$2" in
|
|
"wait") info "waiting: hit Ctrl-c to quit"
|
|
wait
|
|
;;
|
|
"exit") shutdown_servers
|
|
take_fail_snapshot
|
|
exit $result
|
|
;;
|
|
*) shutdown_servers
|
|
take_fail_snapshot
|
|
info "proceeding"
|
|
;;
|
|
esac
|
|
fi
|
|
}
|
|
|
|
test_whitelists() {
|
|
info "### Testing whitelists"
|
|
snapshot_prefix="whitelist-test"
|
|
|
|
cleanup
|
|
start_zk
|
|
start_source_servers
|
|
start_target_servers_for_whitelist_test
|
|
sleep 4
|
|
|
|
begin_timer
|
|
|
|
start_producer test01
|
|
info "waiting for producer to finish producing ..."
|
|
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
|
|
|
|
info "waiting for consumer to finish consuming ..."
|
|
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
|
|
|
|
end_timer
|
|
info "embedded consumer took $((t_end - t_begin)) seconds"
|
|
|
|
sleep 2
|
|
|
|
cmp_logs test01
|
|
result=$?
|
|
|
|
return $result
|
|
}
|
|
|
|
test_blacklists() {
|
|
info "### Testing blacklists"
|
|
snapshot_prefix="blacklist-test"
|
|
cleanup
|
|
start_zk
|
|
start_source_servers
|
|
start_target_servers_for_blacklist_test
|
|
sleep 4
|
|
|
|
start_producer test02
|
|
info "waiting for producer to finish producing test02 ..."
|
|
wait_partition_done kafka://localhost:9090 test02 0 kafka://localhost:9091 test02 0 kafka://localhost:9092 test02 0
|
|
|
|
# start_producer test03
|
|
# info "waiting for producer to finish producing test03 ..."
|
|
# wait_partition_done kafka://localhost:9090 test03 0 kafka://localhost:9091 test03 0 kafka://localhost:9092 test03 0
|
|
|
|
begin_timer
|
|
|
|
start_producer test01
|
|
info "waiting for producer to finish producing ..."
|
|
wait_partition_done kafka://localhost:9090 test01 0 kafka://localhost:9091 test01 0 kafka://localhost:9092 test01 0
|
|
|
|
info "waiting for consumer to finish consuming ..."
|
|
wait_partition_done kafka://localhost:9093 test01 0 kafka://localhost:9094 test01 0
|
|
|
|
end_timer
|
|
|
|
info "embedded consumer took $((t_end - t_begin)) seconds"
|
|
|
|
sleep 2
|
|
|
|
cmp_logs test02
|
|
result1=$?
|
|
# cmp_logs test03
|
|
# result2=$?
|
|
# if [[ "x$result1" == "x0" || "x$result2" == "x0" ]]; then
|
|
if [[ "x$result1" == "x0" ]]; then
|
|
result=1
|
|
else
|
|
cmp_logs test01
|
|
result=$?
|
|
fi
|
|
|
|
return $result
|
|
}
|
|
|
|
# main test begins
|
|
|
|
echo "Test-$test_start_time"
|
|
|
|
# Ctrl-c trap. Catches INT signal
|
|
trap "shutdown_servers; exit 0" INT
|
|
|
|
test_whitelists
|
|
result=$?
|
|
|
|
process_test_result $result $action_on_fail
|
|
|
|
shutdown_servers
|
|
|
|
sleep 2
|
|
|
|
test_blacklists
|
|
result=$?
|
|
|
|
process_test_result $result $action_on_fail
|
|
|
|
shutdown_servers
|
|
|
|
exit $result
|
|
|