mirror of https://github.com/apache/kafka.git
system test to validate consistency of replicas; patched by John Fung; reviewed by Jun Rao; KAFKA-341
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1350316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
83c82a3ecc
commit
965edbbcd8
|
@ -0,0 +1,168 @@
|
|||
#!/bin/bash
|
||||
|
||||
# =========================================
|
||||
# info - print messages with timestamp
|
||||
# =========================================
|
||||
info() {
|
||||
echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# info_no_newline - print messages with
|
||||
# timestamp without newline
|
||||
# =========================================
|
||||
info_no_newline() {
|
||||
echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# get_random_range - return a random number
|
||||
# between the lower & upper bounds
|
||||
# usage:
|
||||
# get_random_range $lower $upper
|
||||
# random_no=$?
|
||||
# =========================================
|
||||
get_random_range() {
|
||||
lo=$1
|
||||
up=$2
|
||||
range=$(($up - $lo + 1))
|
||||
|
||||
return $(($(($RANDOM % range)) + $lo))
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# kill_child_processes - terminate a
|
||||
# process and its child processes
|
||||
# =========================================
|
||||
kill_child_processes() {
|
||||
isTopmost=$1
|
||||
curPid=$2
|
||||
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
|
||||
|
||||
for childPid in $childPids
|
||||
do
|
||||
kill_child_processes 0 $childPid
|
||||
done
|
||||
if [ $isTopmost -eq 0 ]; then
|
||||
kill -15 $curPid 2> /dev/null
|
||||
fi
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# generate_kafka_properties_files -
|
||||
# 1. it takes the following arguments and generate server_{1..n}.properties
|
||||
# for the total no. of kafka broker as specified in "num_server"; the
|
||||
# resulting properties files will be located at:
|
||||
# <kafka home>/system_test/<test suite>/config
|
||||
# 2. the default values in the generated properties files will be copied
|
||||
# from the settings in config/server.properties while the brokerid and
|
||||
# server port will be incremented accordingly
|
||||
# 3. to generate properties files with non-default values such as
|
||||
# "socket.send.buffer=2097152", simply add the property with new value
|
||||
# to the array variable kafka_properties_to_replace as shown below
|
||||
# =========================================================================
|
||||
generate_kafka_properties_files() {
|
||||
|
||||
test_suite_full_path=$1 # eg. <kafka home>/system_test/single_host_multi_brokers
|
||||
num_server=$2 # total no. of brokers in the cluster
|
||||
brokerid_to_start=$3 # this should be '0' in most cases
|
||||
kafka_port_to_start=$4 # if 9091 is used, the rest would be 9092, 9093, ...
|
||||
|
||||
this_config_dir=${test_suite_full_path}/config
|
||||
|
||||
# info "test suite full path : $test_suite_full_path"
|
||||
# info "broker id to start : $brokerid_to_start"
|
||||
# info "kafka port to start : $kafka_port_to_start"
|
||||
# info "num of server : $num_server"
|
||||
# info "config dir : $this_config_dir"
|
||||
|
||||
# =============================================
|
||||
# array to keep kafka properties statements
|
||||
# from the file 'server.properties' need
|
||||
# to be changed from their default values
|
||||
# =============================================
|
||||
# kafka_properties_to_replace # DO NOT uncomment this line !!
|
||||
|
||||
# =============================================
|
||||
# Uncomment the following kafka properties
|
||||
# array element as needed to change the default
|
||||
# values. Other kafka properties can be added
|
||||
# in a similar fashion.
|
||||
# =============================================
|
||||
# kafka_properties_to_replace[1]="socket.send.buffer=2097152"
|
||||
# kafka_properties_to_replace[2]="socket.receive.buffer=2097152"
|
||||
# kafka_properties_to_replace[3]="num.partitions=3"
|
||||
# kafka_properties_to_replace[4]="max.socket.request.bytes=10485760"
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server.properties`
|
||||
|
||||
for ((i=1; i<=$num_server; i++))
|
||||
do
|
||||
# ======================
|
||||
# update misc properties
|
||||
# ======================
|
||||
for ((j=1; j<=${#kafka_properties_to_replace[@]}; j++))
|
||||
do
|
||||
keyword_to_replace=`echo ${kafka_properties_to_replace[${j}]} | awk -F '=' '{print $1}'`
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
# info "string to be replaced : [$string_to_be_replaced]"
|
||||
# info "string to replace : [${kafka_properties_to_replace[${j}]}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${kafka_properties_to_replace[${j}]}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
done
|
||||
|
||||
# ======================
|
||||
# update brokerid
|
||||
# ======================
|
||||
keyword_to_replace="brokerid="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
|
||||
string_to_replace="${keyword_to_replace}${brokerid_idx}"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
# ======================
|
||||
# update kafak_port
|
||||
# ======================
|
||||
keyword_to_replace="port="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
port_idx=$(( $kafka_port_to_start + $i - 1 ))
|
||||
string_to_replace="${keyword_to_replace}${port_idx}"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
# ======================
|
||||
# update kafka_log dir
|
||||
# ======================
|
||||
keyword_to_replace="log.dir="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
string_to_be_replaced=${string_to_be_replaced//\//\\\/}
|
||||
string_to_replace="${keyword_to_replace}\/tmp\/kafka_server_${i}_logs"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
done
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
==================
|
||||
Test Descriptions
|
||||
==================
|
||||
This test suite performs Kafka system test on the replication feature as described here:
|
||||
1. Start the Kafka cluster
|
||||
2. Create topic
|
||||
3. Find the leader
|
||||
4. Stop the leader in Step 3
|
||||
5. Send n messages
|
||||
6. Consume the messages
|
||||
7. Start the leader in Step 3
|
||||
8. Goto Step 3 for all servers in the cluster
|
||||
9. Validate test results
|
||||
|
||||
==================
|
||||
Quick Start
|
||||
==================
|
||||
1. Modify the values of "num_kafka_server" & "replica_factor" as needed.
|
||||
2. Execute the test as:
|
||||
<kafka home>/system_test/single_host_multi_brokers $ bin/run-test.sh
|
||||
|
||||
==================
|
||||
Expected Results
|
||||
==================
|
||||
The following items should match:
|
||||
1. The checksums of the data files in all replicas
|
||||
2. The sizes of the data files in all replicas
|
||||
3. The no. of messages produced and consumed
|
||||
|
||||
==================
|
||||
Notes
|
||||
==================
|
||||
1. There is no need to copy and paste the config/server.properties
|
||||
files to match the no. of brokers in the Kafka cluster.
|
||||
2. The required no. of server properties files will be automatically
|
||||
generated according to the value of "num_kafka_server".
|
||||
3. The default values in the generated properties files will be
|
||||
copied from the settings in config/server.properties while the
|
||||
brokerid and server port will be incremented accordingly.
|
|
@ -0,0 +1,67 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
if [ $# -lt 1 ];
|
||||
then
|
||||
echo "USAGE: $0 classname [opts]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
base_dir=$(dirname $0)/..
|
||||
kafka_inst_dir=${base_dir}/../..
|
||||
|
||||
for file in $kafka_inst_dir/project/boot/scala-2.8.0/lib/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $kafka_inst_dir/core/target/scala_2.8.0/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $kafka_inst_dir/core/lib/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $kafka_inst_dir/perf/target/scala_2.8.0/kafka*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $kafka_inst_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
|
||||
do
|
||||
if [ ${file##*/} != "sbt-launch.jar" ]; then
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
fi
|
||||
done
|
||||
if [ -z "$KAFKA_JMX_OPTS" ]; then
|
||||
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
|
||||
fi
|
||||
if [ -z "$KAFKA_OPTS" ]; then
|
||||
KAFKA_OPTS="-Xmx512M -server -Dlog4j.configuration=file:$base_dir/config/log4j.properties"
|
||||
fi
|
||||
if [ $JMX_PORT ]; then
|
||||
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
|
||||
fi
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
JAVA="java"
|
||||
else
|
||||
JAVA="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
|
||||
$JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
|
|
@ -0,0 +1,429 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# ==================================================================
|
||||
# run-test.sh
|
||||
#
|
||||
# ==================================================================
|
||||
|
||||
# ====================================
|
||||
# Do not change the followings
|
||||
# (keep this section at the beginning
|
||||
# of this script)
|
||||
# ====================================
|
||||
readonly system_test_root=$(dirname $0)/../.. # path of <kafka install>/system_test
|
||||
readonly common_dir=${system_test_root}/common # common util scripts for system_test
|
||||
source ${common_dir}/util.sh # include the util script
|
||||
|
||||
readonly base_dir=$(dirname $0)/.. # root of this test suite
|
||||
readonly base_dir_full_path=$(readlink -f $base_dir) # full path of the root of this test suite
|
||||
readonly config_dir=${base_dir}/config
|
||||
|
||||
readonly test_start_time="$(date +%s)" # time starting the test
|
||||
|
||||
# ====================================
|
||||
# Change the followings as needed
|
||||
# ====================================
|
||||
readonly num_kafka_server=3 # same no. of property files such as server_{1..n}.properties
|
||||
# will be automatically generated
|
||||
readonly replica_factor=3 # should be less than or equal to "num_kafka_server"
|
||||
readonly my_brokerid_to_start=0 # this should be '0' for now
|
||||
readonly my_server_port_to_start=9091 # if using this default, the ports to be used will be 9091, 9092, ...
|
||||
readonly producer_msg_batch_size=200 # batch no. of messsages by producer
|
||||
readonly consumer_timeout_ms=10000 # elapsed time for consumer to timeout and exit
|
||||
|
||||
# ====================================
|
||||
# No need to change the following
|
||||
# configurations in most cases
|
||||
# ====================================
|
||||
readonly test_topic=mytest
|
||||
readonly max_wait_for_consumer_complete=30
|
||||
readonly zk_prop_pathname=${config_dir}/zookeeper.properties
|
||||
readonly zk_log4j_log_pathname=${base_dir}/zookeeper.log
|
||||
|
||||
readonly producer_prop_pathname=${config_dir}/producer.properties
|
||||
readonly consumer_prop_pathname=${config_dir}/consumer.properties
|
||||
|
||||
readonly producer_perf_log_pathname=${base_dir}/producer_perf_output.log
|
||||
readonly producer_perf_crc_log_pathname=${base_dir}/producer_perf_crc.log
|
||||
readonly producer_perf_crc_sorted_log_pathname=${base_dir}/producer_perf_crc_sorted.log
|
||||
readonly producer_perf_crc_sorted_uniq_log_pathname=${base_dir}/producer_perf_crc_sorted_uniq.log
|
||||
|
||||
readonly console_consumer_log_pathname=${base_dir}/console_consumer.log
|
||||
readonly console_consumer_crc_log_pathname=${base_dir}/console_consumer_crc.log
|
||||
readonly console_consumer_crc_sorted_log_pathname=${base_dir}/console_consumer_crc_sorted.log
|
||||
readonly console_consumer_crc_sorted_uniq_log_pathname=${base_dir}/console_consumer_crc_sorted_uniq.log
|
||||
|
||||
readonly this_test_stderr_output_log_pathname=${base_dir}/this_test_stderr_output.log
|
||||
|
||||
# ====================================
|
||||
# arrays for kafka brokers properties
|
||||
# ====================================
|
||||
kafka_data_log_dirs=
|
||||
kafka_log4j_log_pathnames=
|
||||
kafka_prop_pathnames=
|
||||
kafka_brokerids=
|
||||
kafka_sock_ports=
|
||||
#kafka_first_data_file_sizes=
|
||||
#kafka_first_data_file_checksums=
|
||||
|
||||
# ====================================
|
||||
# Misc
|
||||
# ====================================
|
||||
zk_port=
|
||||
zk_data_log_dir=
|
||||
pid_zk=
|
||||
kafka_pids=
|
||||
test_failure_counter=0
|
||||
|
||||
initialize() {
|
||||
info "initializing ..."
|
||||
|
||||
zk_port=`grep clientPort ${zk_prop_pathname} | awk -F '=' '{print $2}'`
|
||||
zk_data_log_dir=`grep dataDir ${zk_prop_pathname} | awk -F '=' '{print $2}'`
|
||||
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
kafka_log4j_log_pathnames[${i}]=$base_dir/kafka_server_${i}.log
|
||||
kafka_prop_pathnames[${i}]=${config_dir}/server_${i}.properties
|
||||
|
||||
kafka_data_log_dirs[${i}]=`grep ^log.dir ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
|
||||
kafka_brokerids[${i}]=`grep ^brokerid= ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
|
||||
kafka_sock_ports[${i}]=`grep ^port= ${kafka_prop_pathnames[${i}]} | awk -F '=' '{print $2}'`
|
||||
|
||||
info "kafka $i data dir : ${kafka_data_log_dirs[$i]}"
|
||||
info "kafka $i log4j log : ${kafka_log4j_log_pathnames[$i]}"
|
||||
info "kafka $i prop file : ${kafka_prop_pathnames[$i]}"
|
||||
info "kafka $i brokerid : ${kafka_brokerids[$i]}"
|
||||
info "kafka $i socket : ${kafka_sock_ports[$i]}"
|
||||
done
|
||||
|
||||
info "zookeeper port : $zk_port"
|
||||
info "zookeeper data dir : $zk_data_log_dir"
|
||||
echo
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
info "cleaning up kafka server log/data dir"
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
rm -rf ${kafka_data_log_dirs[$i]}
|
||||
rm -f ${kafka_log4j_log_pathnames[$i]}
|
||||
done
|
||||
|
||||
rm -rf $zk_data_log_dir
|
||||
rm -f $zk_log4j_log_pathname
|
||||
rm -f $this_test_stderr_output_log_pathname
|
||||
|
||||
rm -f $producer_perf_log_pathname
|
||||
rm -f $producer_perf_crc_log_pathname
|
||||
rm -f $producer_perf_crc_sorted_log_pathname
|
||||
rm -f $producer_perf_crc_sorted_uniq_log_pathname
|
||||
|
||||
rm -f $console_consumer_log_pathname
|
||||
rm -f $console_consumer_crc_log_pathname
|
||||
rm -f $console_consumer_crc_sorted_log_pathname
|
||||
rm -f $console_consumer_crc_sorted_uniq_log_pathname
|
||||
}
|
||||
|
||||
get_leader_brokerid() {
|
||||
log_line=`grep -i -h 'is leader' ${base_dir}/kafka_server_*.log | sort | tail -1`
|
||||
info "found the log line: $log_line"
|
||||
broker_id=`echo $log_line | awk -F ' ' '{print $5}'`
|
||||
|
||||
return $broker_id
|
||||
}
|
||||
|
||||
start_zk() {
|
||||
info "starting zookeeper"
|
||||
$base_dir/../../bin/zookeeper-server-start.sh $zk_prop_pathname \
|
||||
2>&1 > ${zk_log4j_log_pathname} &
|
||||
pid_zk=$!
|
||||
}
|
||||
|
||||
stop_server() {
|
||||
s_idx=$1
|
||||
|
||||
info "stopping server: $s_idx"
|
||||
|
||||
if [ "x${kafka_pids[${s_idx}]}" != "x" ]; then
|
||||
kill_child_processes 0 ${kafka_pids[${s_idx}]};
|
||||
fi
|
||||
|
||||
kafka_pids[${s_idx}]=
|
||||
}
|
||||
|
||||
start_server() {
|
||||
s_idx=$1
|
||||
|
||||
info "starting kafka server"
|
||||
$base_dir/bin/kafka-run-class.sh kafka.Kafka ${kafka_prop_pathnames[$s_idx]} \
|
||||
2>&1 >> ${kafka_log4j_log_pathnames[$s_idx]} &
|
||||
kafka_pids[${s_idx}]=$!
|
||||
info " -> kafka_pids[$s_idx]: ${kafka_pids[$s_idx]}"
|
||||
}
|
||||
|
||||
start_servers_cluster() {
|
||||
info "starting cluster"
|
||||
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
start_server $i
|
||||
done
|
||||
}
|
||||
|
||||
start_producer_perf() {
|
||||
this_topic=$1
|
||||
zk_conn_str=$2
|
||||
no_msg_to_produce=$3
|
||||
|
||||
info "starting producer performance"
|
||||
|
||||
${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
|
||||
--brokerinfo "zk.connect=${zk_conn_str}" \
|
||||
--topic ${this_topic} \
|
||||
--messages $no_msg_to_produce \
|
||||
--vary-message-size \
|
||||
--message-size 100 \
|
||||
--threads 1 \
|
||||
--async \
|
||||
2>&1 >> $producer_perf_log_pathname
|
||||
}
|
||||
|
||||
start_console_consumer() {
|
||||
this_consumer_topic=$1
|
||||
this_zk_conn_str=$2
|
||||
|
||||
info "starting console consumer"
|
||||
$base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \
|
||||
--zookeeper $this_zk_conn_str \
|
||||
--topic $this_consumer_topic \
|
||||
--formatter 'kafka.consumer.ConsoleConsumer$ChecksumMessageFormatter' \
|
||||
--consumer-timeout-ms $consumer_timeout_ms \
|
||||
2>&1 >> $console_consumer_log_pathname &
|
||||
}
|
||||
|
||||
shutdown_servers() {
|
||||
|
||||
info "shutting down servers"
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
if [ "x${kafka_pids[$i]}" != "x" ]; then
|
||||
kill_child_processes 0 ${kafka_pids[$i]};
|
||||
fi
|
||||
done
|
||||
|
||||
info "shutting down zookeeper servers"
|
||||
if [ "x${pid_zk}" != "x" ]; then kill_child_processes 0 ${pid_zk}; fi
|
||||
}
|
||||
|
||||
force_shutdown_producer() {
|
||||
info "force shutdown producer"
|
||||
`ps auxw | grep ProducerPerformance | awk '{print $2}' | xargs kill -9`
|
||||
}
|
||||
|
||||
force_shutdown_consumer() {
|
||||
info "force shutdown consumer"
|
||||
`ps auxw | grep ConsoleConsumer | awk '{print $2}' | xargs kill -9`
|
||||
}
|
||||
|
||||
create_topic() {
|
||||
this_topic_to_create=$1
|
||||
this_zk_conn_str=$2
|
||||
this_replica_factor=$3
|
||||
|
||||
info "creating topic [$this_topic_to_create] on [$this_zk_conn_str]"
|
||||
$base_dir/../../bin/kafka-create-topic.sh --topic $this_topic_to_create \
|
||||
--zookeeper $this_zk_conn_str --replica $this_replica_factor
|
||||
}
|
||||
|
||||
validate_results() {
|
||||
|
||||
echo
|
||||
info "========================================================"
|
||||
info "VALIDATING TEST RESULTS"
|
||||
info "========================================================"
|
||||
|
||||
# get the checksums and sizes of the replica data files
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
first_data_file_dir=${kafka_data_log_dirs[$i]}/${test_topic}-0
|
||||
first_data_file=`ls ${first_data_file_dir} | head -1`
|
||||
first_data_file_pathname=${first_data_file_dir}/$first_data_file
|
||||
kafka_first_data_file_sizes[$i]=`stat -c%s ${first_data_file_pathname}`
|
||||
kafka_first_data_file_checksums[$i]=`cksum ${first_data_file_pathname} | awk '{print $1}'`
|
||||
info "## broker[$i] data file: ${first_data_file_pathname} : [${kafka_first_data_file_sizes[$i]}]"
|
||||
info "## ==> crc ${kafka_first_data_file_checksums[$i]}"
|
||||
done
|
||||
|
||||
# get the checksums from messages produced and consumed
|
||||
grep checksum $console_consumer_log_pathname | tr -d ' ' | awk -F ':' '{print $2}' > $console_consumer_crc_log_pathname
|
||||
grep checksum $producer_perf_log_pathname | tr ' ' '\n' | grep checksum | awk -F ':' '{print $2}' > $producer_perf_crc_log_pathname
|
||||
|
||||
sort $console_consumer_crc_log_pathname > $console_consumer_crc_sorted_log_pathname
|
||||
sort $producer_perf_crc_log_pathname > $producer_perf_crc_sorted_log_pathname
|
||||
|
||||
sort -u $console_consumer_crc_sorted_log_pathname > $console_consumer_crc_sorted_uniq_log_pathname
|
||||
sort -u $producer_perf_crc_sorted_log_pathname > $producer_perf_crc_sorted_uniq_log_pathname
|
||||
|
||||
msg_count_from_console_consumer=`cat $console_consumer_crc_log_pathname | wc -l | tr -d ' '`
|
||||
uniq_msg_count_from_console_consumer=`cat $console_consumer_crc_sorted_uniq_log_pathname | wc -l | tr -d ' '`
|
||||
|
||||
msg_count_from_producer_perf=`cat $producer_perf_crc_log_pathname | wc -l | tr -d ' '`
|
||||
uniq_msg_count_from_producer_perf=`cat $producer_perf_crc_sorted_uniq_log_pathname | wc -l | tr -d ' '`
|
||||
|
||||
# report the findings
|
||||
echo
|
||||
info "## no. of messages published : $msg_count_from_producer_perf"
|
||||
info "## producer unique msg published : $uniq_msg_count_from_producer_perf"
|
||||
info "## console consumer msg rec'd : $msg_count_from_console_consumer"
|
||||
info "## console consumer unique msg rec'd : $uniq_msg_count_from_console_consumer"
|
||||
echo
|
||||
|
||||
validation_start_unix_ts=`date +%s`
|
||||
curr_unix_ts=`date +%s`
|
||||
size_unmatched_idx=1
|
||||
while [[ $(( $curr_unix_ts - $validation_start_unix_ts )) -le $max_wait_for_consumer_complete && $size_unmatched_idx -gt 0 ]]
|
||||
do
|
||||
info "wait 5s (up to ${max_wait_for_consumer_complete}s) and check replicas data sizes"
|
||||
sleep 5
|
||||
|
||||
first_element_value=${kafka_first_data_file_sizes[1]}
|
||||
for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
|
||||
do
|
||||
if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
|
||||
size_unmatched_idx=1
|
||||
break
|
||||
else
|
||||
size_unmatched_idx=0
|
||||
fi
|
||||
done
|
||||
|
||||
curr_unix_ts=`date +%s`
|
||||
done
|
||||
|
||||
# validate that sizes of all replicas should match
|
||||
first_element_value=${kafka_first_data_file_sizes[1]}
|
||||
for ((i=2; i<=${#kafka_first_data_file_sizes[@]}; i++))
|
||||
do
|
||||
if [ $first_element_value -ne ${kafka_first_data_file_sizes[$i]} ]; then
|
||||
info "## FAILURE: Unmatched size found"
|
||||
test_failure_counter=$(( $test_failure_counter + 1 ))
|
||||
fi
|
||||
done
|
||||
|
||||
# validate that checksums of all replicas should match
|
||||
first_element_value=${kafka_first_data_file_checksums[1]}
|
||||
for ((i=2; i<=${#kafka_first_data_file_checksums[@]}; i++))
|
||||
do
|
||||
if [ $first_element_value -ne ${kafka_first_data_file_checksums[$i]} ]; then
|
||||
info "## FAILURE: Unmatched checksum found"
|
||||
test_failure_counter=$(( $test_failure_counter + 1 ))
|
||||
fi
|
||||
done
|
||||
|
||||
# validate that there is no data loss
|
||||
if [ $uniq_msg_count_from_producer_perf -ne $uniq_msg_count_from_console_consumer ]; then
|
||||
test_failure_counter=$(( $test_failure_counter + 1 ))
|
||||
fi
|
||||
|
||||
# report PASSED or FAILED
|
||||
info "========================================================"
|
||||
if [ $test_failure_counter -eq 0 ]; then
|
||||
info "## Test PASSED"
|
||||
else
|
||||
info "## Test FAILED"
|
||||
fi
|
||||
info "========================================================"
|
||||
}
|
||||
|
||||
|
||||
start_test() {
|
||||
echo
|
||||
info "======================================="
|
||||
info "#### Kafka Replicas System Test ####"
|
||||
info "======================================="
|
||||
echo
|
||||
|
||||
# Ctrl-c trap. Catches INT signal
|
||||
trap "force_shutdown_producer; force_shutdown_consumer; shutdown_servers; exit 0" INT
|
||||
|
||||
generate_kafka_properties_files $base_dir_full_path $num_kafka_server $my_brokerid_to_start $my_server_port_to_start
|
||||
|
||||
initialize
|
||||
|
||||
cleanup
|
||||
sleep 2
|
||||
|
||||
start_zk
|
||||
sleep 2
|
||||
|
||||
start_servers_cluster
|
||||
sleep 2
|
||||
|
||||
create_topic $test_topic localhost:$zk_port $replica_factor 2> $this_test_stderr_output_log_pathname
|
||||
|
||||
info "sleeping for 5s"
|
||||
sleep 5
|
||||
echo
|
||||
|
||||
for ((i=1; i<=$num_kafka_server; i++))
|
||||
do
|
||||
info "kafka server [$i] - reading leader"
|
||||
get_leader_brokerid
|
||||
ldr_bkr_id=$?
|
||||
info "leader broker id: $ldr_bkr_id"
|
||||
|
||||
svr_idx=$(($ldr_bkr_id + 1))
|
||||
|
||||
# ==========================================================
|
||||
# If KAFKA-350 is fixed, uncomment the following 3 lines to
|
||||
# STOP the server for failure test
|
||||
# ==========================================================
|
||||
#stop_server $svr_idx
|
||||
#info "sleeping for 10s"
|
||||
#sleep 10
|
||||
|
||||
start_console_consumer $test_topic localhost:$zk_port
|
||||
info "sleeping for 5s"
|
||||
sleep 5
|
||||
|
||||
start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size
|
||||
info "sleeping for 15s"
|
||||
sleep 15
|
||||
echo
|
||||
|
||||
# ==========================================================
|
||||
# If KAFKA-350 is fixed, uncomment the following 3 lines to
|
||||
# START the server for failure test
|
||||
# ==========================================================
|
||||
#start_server $svr_idx
|
||||
#info "sleeping for 30s"
|
||||
#sleep 30
|
||||
done
|
||||
|
||||
validate_results
|
||||
echo
|
||||
|
||||
shutdown_servers
|
||||
echo
|
||||
}
|
||||
|
||||
# =================================================
|
||||
# Main Test
|
||||
# =================================================
|
||||
|
||||
start_test
|
|
@ -0,0 +1,29 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.consumer.ConsumerConfig for more details
|
||||
|
||||
# zk connection string
|
||||
# comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
|
||||
zk.connect=127.0.0.1:2181
|
||||
|
||||
# timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
||||
|
||||
#consumer group id
|
||||
groupid=test-consumer-group
|
||||
|
||||
#consumer timeout
|
||||
#consumer.timeout.ms=5000
|
|
@ -0,0 +1,38 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
log4j.rootLogger=INFO, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
|
||||
|
||||
#log4j.appender.fileAppender=org.apache.log4j.FileAppender
|
||||
#log4j.appender.fileAppender.File=kafka-request.log
|
||||
#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
|
||||
#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
|
||||
|
||||
|
||||
# Turn on all our debugging info
|
||||
#log4j.logger.kafka=INFO
|
||||
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
|
||||
|
||||
# to print message checksum from ProducerPerformance
|
||||
log4j.logger.kafka.perf=DEBUG
|
||||
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
|
||||
|
||||
# to print message checksum from ProducerPerformance
|
||||
log4j.logger.kafka.perf=DEBUG
|
||||
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
|
||||
|
|
@ -0,0 +1,80 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.producer.ProducerConfig for more details
|
||||
|
||||
############################# Producer Basics #############################
|
||||
|
||||
# need to set either broker.list or zk.connect
|
||||
|
||||
# configure brokers statically
|
||||
# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
|
||||
#broker.list=0:localhost:9092
|
||||
|
||||
# discover brokers from ZK
|
||||
zk.connect=localhost:2181
|
||||
|
||||
# zookeeper session timeout; default is 6000
|
||||
#zk.sessiontimeout.ms=
|
||||
|
||||
# the max time that the client waits to establish a connection to zookeeper; default is 6000
|
||||
#zk.connectiontimeout.ms
|
||||
|
||||
# name of the partitioner class for partitioning events; default partition spreads data randomly
|
||||
#partitioner.class=
|
||||
|
||||
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
|
||||
producer.type=sync
|
||||
|
||||
# specify the compression codec for all data generated: 0: no compression, 1: gzip
|
||||
compression.codec=0
|
||||
|
||||
# message encoder
|
||||
serializer.class=kafka.serializer.StringEncoder
|
||||
|
||||
# allow topic level compression
|
||||
#compressed.topics=
|
||||
|
||||
# max message size; messages larger than that size are discarded; default is 1000000
|
||||
#max.message.size=
|
||||
|
||||
|
||||
############################# Async Producer #############################
|
||||
# maximum time, in milliseconds, for buffering data on the producer queue
|
||||
#queue.time=
|
||||
|
||||
# the maximum size of the blocking queue for buffering on the producer
|
||||
#queue.size=
|
||||
|
||||
# Timeout for event enqueue:
|
||||
# 0: events will be enqueued immediately or dropped if the queue is full
|
||||
# -ve: enqueue will block indefinitely if the queue is full
|
||||
# +ve: enqueue will block up to this many milliseconds if the queue is full
|
||||
#queue.enqueueTimeout.ms=
|
||||
|
||||
# the number of messages batched at the producer
|
||||
#batch.size=
|
||||
|
||||
# the callback handler for one or multiple events
|
||||
#callback.handler=
|
||||
|
||||
# properties required to initialize the callback handler
|
||||
#callback.handler.props=
|
||||
|
||||
# the handler for events
|
||||
#event.handler=
|
||||
|
||||
# properties required to initialize the event handler
|
||||
#event.handler.props=
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
############################# Server Basics #############################
|
||||
|
||||
# The id of the broker. This must be set to a unique integer for each broker.
|
||||
brokerid=0
|
||||
|
||||
# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
|
||||
# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
|
||||
# may not be what you want.
|
||||
#hostname=
|
||||
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
# The port the socket server listens on
|
||||
port=9091
|
||||
|
||||
# The number of threads handling network requests
|
||||
network.threads=2
|
||||
|
||||
# The number of threads doing disk I/O
|
||||
io.threads=2
|
||||
|
||||
# The send buffer (SO_SNDBUF) used by the socket server
|
||||
socket.send.buffer=1048576
|
||||
|
||||
# The receive buffer (SO_RCVBUF) used by the socket server
|
||||
socket.receive.buffer=1048576
|
||||
|
||||
# The maximum size of a request that the socket server will accept (protection against OOM)
|
||||
max.socket.request.bytes=104857600
|
||||
|
||||
|
||||
############################# Log Basics #############################
|
||||
|
||||
# The directory under which to store log files
|
||||
log.dir=/tmp/kafka_server_1_logs
|
||||
|
||||
# The number of logical partitions per topic per server. More partitions allow greater parallelism
|
||||
# for consumption, but also mean more files.
|
||||
num.partitions=1
|
||||
|
||||
# Overrides for for the default given by num.partitions on a per-topic basis
|
||||
#topic.partition.count.map=topic1:3, topic2:4
|
||||
|
||||
############################# Log Flush Policy #############################
|
||||
|
||||
# The following configurations control the flush of data to disk. This is the most
|
||||
# important performance knob in kafka.
|
||||
# There are a few important trade-offs here:
|
||||
# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
|
||||
# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
|
||||
# 3. Throughput: The flush is generally the most expensive operation.
|
||||
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
||||
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
||||
|
||||
# The number of messages to accept before forcing a flush of data to disk
|
||||
log.flush.interval=10000
|
||||
|
||||
# The maximum amount of time a message can sit in a log before we force a flush
|
||||
log.default.flush.interval.ms=1000
|
||||
|
||||
# Per-topic overrides for log.default.flush.interval.ms
|
||||
#topic.flush.intervals.ms=topic1:1000, topic2:3000
|
||||
|
||||
# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
|
||||
log.default.flush.scheduler.interval.ms=1000
|
||||
|
||||
############################# Log Retention Policy #############################
|
||||
|
||||
# The following configurations control the disposal of log segments. The policy can
|
||||
# be set to delete segments after a period of time, or after a given size has accumulated.
|
||||
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
||||
# from the end of the log.
|
||||
|
||||
# The minimum age of a log file to be eligible for deletion
|
||||
log.retention.hours=168
|
||||
|
||||
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
||||
# segments don't drop below log.retention.size.
|
||||
#log.retention.size=1073741824
|
||||
|
||||
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
||||
log.file.size=536870912
|
||||
|
||||
# The interval at which log segments are checked to see if they can be deleted according
|
||||
# to the retention policies
|
||||
log.cleanup.interval.mins=1
|
||||
|
||||
############################# Zookeeper #############################
|
||||
|
||||
# Enable connecting to zookeeper
|
||||
enable.zookeeper=true
|
||||
|
||||
# Zk connection string (see zk docs for details).
|
||||
# This is a comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||
# You can also append an optional chroot string to the urls to specify the
|
||||
# root directory for all kafka znodes.
|
||||
zk.connect=localhost:2181
|
||||
|
||||
# Timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
|
@ -0,0 +1,20 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# the directory where the snapshot is stored.
|
||||
dataDir=/tmp/zookeeper_source
|
||||
# the port at which the clients will connect
|
||||
clientPort=2181
|
||||
# disable the per-ip limit on the number of connections since this is a non-production config
|
||||
maxClientCnxns=0
|
Loading…
Reference in New Issue