mirror of https://github.com/apache/kafka.git
KAFKA-373 Fix trunk broker failure test to work with mirror maker; patched by John Fung; reviewed by Joel Koshy
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1358407 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1af23bb263
commit
489b09c120
|
@ -1,23 +1,34 @@
|
|||
This script performs broker failure tests with the following
|
||||
setup in a single local machine:
|
||||
** Please note that the following commands should be executed
|
||||
after downloading the kafka source code to build all the
|
||||
required binaries:
|
||||
1. <kafka install dir>/ $ ./sbt update
|
||||
2. <kafka install dir>/ $ ./sbt package
|
||||
|
||||
1. A cluster of Kafka source brokers
|
||||
2. A cluster of Kafka mirror brokers with embedded consumers in
|
||||
point-to-point mode
|
||||
3. An independent ConsoleConsumer in publish/subcribe mode to
|
||||
Now you are ready to follow the steps below.
|
||||
|
||||
This script performs broker failure tests in an environment with
|
||||
Mirrored Source & Target clusters in a single machine:
|
||||
|
||||
1. Start a cluster of Kafka source brokers
|
||||
2. Start a cluster of Kafka target brokers
|
||||
3. Start one or more Mirror Maker to create mirroring between
|
||||
source and target clusters
|
||||
4. A producer produces batches of messages to the SOURCE brokers
|
||||
in the background
|
||||
5. The Kafka SOURCE, TARGET brokers and Mirror Maker will be
|
||||
terminated in a round-robin fashion and wait for the consumer
|
||||
to catch up.
|
||||
6. Repeat step 5 as many times as specified in the script
|
||||
7. An independent ConsoleConsumer in publish/subcribe mode to
|
||||
consume messages from the SOURCE brokers cluster
|
||||
4. An independent ConsoleConsumer in publish/subcribe mode to
|
||||
consume messages from the MIRROR brokers cluster
|
||||
5. A producer produces batches of messages to the SOURCE brokers
|
||||
6. One of the Kafka SOURCE or MIRROR brokers in the cluster will
|
||||
be randomly terminated and waiting for the consumer to catch up.
|
||||
7. Repeat Step 4 & 5 as many times as specified in the script
|
||||
8. An independent ConsoleConsumer in publish/subcribe mode to
|
||||
consume messages from the TARGET brokers cluster
|
||||
|
||||
Expected results:
|
||||
==================
|
||||
There should not be any discrepancies by comparing the unique
|
||||
message checksums from the source ConsoleConsumer and the
|
||||
mirror ConsoleConsumer.
|
||||
target ConsoleConsumer.
|
||||
|
||||
Notes:
|
||||
==================
|
||||
|
@ -26,17 +37,36 @@ The number of Kafka SOURCE brokers can be increased as follows:
|
|||
2. Make sure that there are corresponding number of prop files:
|
||||
$base_dir/config/server_source{1..4}.properties
|
||||
|
||||
The number of Kafka MIRROR brokers can be increased as follows:
|
||||
The number of Kafka TARGET brokers can be increased as follows:
|
||||
1. Update the value of $num_kafka_target_server in this script
|
||||
2. Make sure that there are corresponding number of prop files:
|
||||
$base_dir/config/server_target{1..3}.properties
|
||||
|
||||
Quick Start:
|
||||
==================
|
||||
Execute this script as follows:
|
||||
<kafka home>/system_test/broker_failure $ bin/run-test.sh
|
||||
In the directory <kafka home>/system_test/broker_failure,
|
||||
execute this script as following:
|
||||
$ bin/run-test.sh -n <num of iterations> -s <servers to bounce>
|
||||
|
||||
num of iterations - the number of iterations that the test runs
|
||||
|
||||
servers to bounce - the servers to be bounced in a round-robin fashion.
|
||||
|
||||
Values to be entered:
|
||||
1 - source broker
|
||||
2 - mirror maker
|
||||
3 - target broker
|
||||
|
||||
Example:
|
||||
* To bounce only mirror maker and target broker
|
||||
in turns, enter the value 23.
|
||||
* To bounce only mirror maker, enter the value 2.
|
||||
* To run the test without bouncing, enter 0.
|
||||
|
||||
At the end of the test, the received messages checksums in both
|
||||
SOURCE & TARGET will be compared. If all checksums are matched,
|
||||
the test is PASSED. Otherwise, the test is FAILED.
|
||||
|
||||
In the event of failure, by default the brokers and zookeepers
|
||||
remain running to make it easier to debug the issue - hit Ctrl-C
|
||||
to shut them down.
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
log4j.rootLogger=INFO, stdout, kafkaAppender
|
||||
log4j.rootLogger=INFO, stdout
|
||||
|
||||
# ====================================
|
||||
# messages going to kafkaAppender
|
||||
|
@ -27,7 +27,7 @@ log4j.logger.org.apache.zookeeper=INFO, kafkaAppender
|
|||
# ====================================
|
||||
# (comment out this line to redirect ZK-related messages to kafkaAppender
|
||||
# to allow reading both Kafka and ZK debugging messages in a single file)
|
||||
#log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
|
||||
log4j.logger.org.apache.zookeeper=INFO, zookeeperAppender
|
||||
|
||||
# ====================================
|
||||
# stdout
|
||||
|
@ -73,6 +73,7 @@ log4j.additivity.org.apache.zookeeper=false
|
|||
|
||||
log4j.logger.kafka.consumer=DEBUG
|
||||
log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
|
||||
log4j.logger.kafka.tools.ConsumerOffsetChecker=DEBUG
|
||||
|
||||
# to print message checksum from ProducerPerformance
|
||||
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
# zk connection string
|
||||
# comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
|
||||
broker.list=0:localhost:9081
|
||||
#broker.list=0:localhost:9081
|
||||
zk.connect=localhost:2182
|
||||
|
||||
# timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
# zk connection string
|
||||
# comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
|
||||
broker.list=0:localhost:9082
|
||||
#broker.list=0:localhost:9082
|
||||
zk.connect=localhost:2182
|
||||
|
||||
# timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
||||
|
|
|
@ -15,7 +15,8 @@
|
|||
# zk connection string
|
||||
# comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
|
||||
broker.list=0:localhost:9083
|
||||
#broker.list=0:localhost:9083
|
||||
zk.connect=localhost:2182
|
||||
|
||||
# timeout in ms for connecting to zookeeper
|
||||
zk.connectiontimeout.ms=1000000
|
||||
|
|
|
@ -75,7 +75,7 @@ log.default.flush.interval.ms=1000
|
|||
log.default.flush.scheduler.interval.ms=1000
|
||||
|
||||
# set sendBufferSize
|
||||
send.buffer.size=10000
|
||||
send.buffer.size=500000
|
||||
|
||||
# set receiveBufferSize
|
||||
receive.buffer.size=10000
|
||||
receive.buffer.size=500000
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -41,7 +41,7 @@ socket.send.buffer=1048576
|
|||
socket.receive.buffer=1048576
|
||||
|
||||
# the maximum size of a log segment
|
||||
log.file.size=536870912
|
||||
log.file.size=10000000
|
||||
|
||||
# the interval between running cleanup on the logs
|
||||
log.cleanup.interval.mins=1
|
||||
|
|
|
@ -0,0 +1,168 @@
|
|||
#!/bin/bash
|
||||
|
||||
# =========================================
|
||||
# info - print messages with timestamp
|
||||
# =========================================
|
||||
info() {
|
||||
echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# info_no_newline - print messages with
|
||||
# timestamp without newline
|
||||
# =========================================
|
||||
info_no_newline() {
|
||||
echo -e -n "$(date +"%Y-%m-%d %H:%M:%S") $*"
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# get_random_range - return a random number
|
||||
# between the lower & upper bounds
|
||||
# usage:
|
||||
# get_random_range $lower $upper
|
||||
# random_no=$?
|
||||
# =========================================
|
||||
get_random_range() {
|
||||
lo=$1
|
||||
up=$2
|
||||
range=$(($up - $lo + 1))
|
||||
|
||||
return $(($(($RANDOM % range)) + $lo))
|
||||
}
|
||||
|
||||
# =========================================
|
||||
# kill_child_processes - terminate a
|
||||
# process and its child processes
|
||||
# =========================================
|
||||
kill_child_processes() {
|
||||
isTopmost=$1
|
||||
curPid=$2
|
||||
childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
|
||||
|
||||
for childPid in $childPids
|
||||
do
|
||||
kill_child_processes 0 $childPid
|
||||
done
|
||||
if [ $isTopmost -eq 0 ]; then
|
||||
kill -15 $curPid 2> /dev/null
|
||||
fi
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# generate_kafka_properties_files -
|
||||
# 1. it takes the following arguments and generate server_{1..n}.properties
|
||||
# for the total no. of kafka broker as specified in "num_server"; the
|
||||
# resulting properties files will be located at:
|
||||
# <kafka home>/system_test/<test suite>/config
|
||||
# 2. the default values in the generated properties files will be copied
|
||||
# from the settings in config/server.properties while the brokerid and
|
||||
# server port will be incremented accordingly
|
||||
# 3. to generate properties files with non-default values such as
|
||||
# "socket.send.buffer=2097152", simply add the property with new value
|
||||
# to the array variable kafka_properties_to_replace as shown below
|
||||
# =========================================================================
|
||||
generate_kafka_properties_files() {
|
||||
|
||||
test_suite_full_path=$1 # eg. <kafka home>/system_test/single_host_multi_brokers
|
||||
num_server=$2 # total no. of brokers in the cluster
|
||||
brokerid_to_start=$3 # this should be '0' in most cases
|
||||
kafka_port_to_start=$4 # if 9091 is used, the rest would be 9092, 9093, ...
|
||||
|
||||
this_config_dir=${test_suite_full_path}/config
|
||||
|
||||
# info "test suite full path : $test_suite_full_path"
|
||||
# info "broker id to start : $brokerid_to_start"
|
||||
# info "kafka port to start : $kafka_port_to_start"
|
||||
# info "num of server : $num_server"
|
||||
# info "config dir : $this_config_dir"
|
||||
|
||||
# =============================================
|
||||
# array to keep kafka properties statements
|
||||
# from the file 'server.properties' need
|
||||
# to be changed from their default values
|
||||
# =============================================
|
||||
# kafka_properties_to_replace # DO NOT uncomment this line !!
|
||||
|
||||
# =============================================
|
||||
# Uncomment the following kafka properties
|
||||
# array element as needed to change the default
|
||||
# values. Other kafka properties can be added
|
||||
# in a similar fashion.
|
||||
# =============================================
|
||||
# kafka_properties_to_replace[1]="socket.send.buffer=2097152"
|
||||
# kafka_properties_to_replace[2]="socket.receive.buffer=2097152"
|
||||
# kafka_properties_to_replace[3]="num.partitions=3"
|
||||
# kafka_properties_to_replace[4]="max.socket.request.bytes=10485760"
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server.properties`
|
||||
|
||||
for ((i=1; i<=$num_server; i++))
|
||||
do
|
||||
# ======================
|
||||
# update misc properties
|
||||
# ======================
|
||||
for ((j=1; j<=${#kafka_properties_to_replace[@]}; j++))
|
||||
do
|
||||
keyword_to_replace=`echo ${kafka_properties_to_replace[${j}]} | awk -F '=' '{print $1}'`
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
# info "string to be replaced : [$string_to_be_replaced]"
|
||||
# info "string to replace : [${kafka_properties_to_replace[${j}]}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${kafka_properties_to_replace[${j}]}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
done
|
||||
|
||||
# ======================
|
||||
# update brokerid
|
||||
# ======================
|
||||
keyword_to_replace="brokerid="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
|
||||
string_to_replace="${keyword_to_replace}${brokerid_idx}"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
# ======================
|
||||
# update kafak_port
|
||||
# ======================
|
||||
keyword_to_replace="port="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
port_idx=$(( $kafka_port_to_start + $i - 1 ))
|
||||
string_to_replace="${keyword_to_replace}${port_idx}"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
# ======================
|
||||
# update kafka_log dir
|
||||
# ======================
|
||||
keyword_to_replace="log.dir="
|
||||
string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
|
||||
string_to_be_replaced=${string_to_be_replaced//\//\\\/}
|
||||
string_to_replace="${keyword_to_replace}\/tmp\/kafka_server_${i}_logs"
|
||||
# info "string to be replaced : [${string_to_be_replaced}]"
|
||||
# info "string to replace : [${string_to_replace}]"
|
||||
|
||||
echo "${server_properties}" | \
|
||||
sed -e "s/${string_to_be_replaced}/${string_to_replace}/g" \
|
||||
>${this_config_dir}/server_${i}.properties
|
||||
|
||||
server_properties=`cat ${this_config_dir}/server_${i}.properties`
|
||||
|
||||
done
|
||||
}
|
||||
|
Loading…
Reference in New Issue