KAFKA-2274: verifiable consumer and integration testing

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang, Geoff Anderson

Closes #465 from hachikuji/KAFKA-2274
This commit is contained in:
Jason Gustafson 2015-11-09 18:38:22 -08:00 committed by Guozhang Wang
parent c9264b4c89
commit bce664b42a
11 changed files with 1059 additions and 12 deletions

View File

@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer $@

View File

@ -108,6 +108,7 @@
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />

View File

@ -20,6 +20,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A container that holds the list {@link ConsumerRecord} per partition for a
@ -27,8 +28,7 @@ import java.util.Map;
* partition returned by a {@link Consumer#poll(long)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY =
new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP);
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP);
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
@ -41,12 +41,12 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
*
* @param partition The partition to get records for
*/
public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
List<ConsumerRecord<K, V>> recs = this.records.get(partition);
if (recs == null)
return Collections.emptyList();
else
return recs;
return Collections.unmodifiableList(recs);
}
/**
@ -55,17 +55,25 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public Iterable<ConsumerRecord<K, V>> records(String topic) {
if (topic == null)
throw new IllegalArgumentException("Topic must be non-null.");
List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
List<List<ConsumerRecord<K, V>>> recs = new ArrayList<>();
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
if (entry.getKey().topic().equals(topic))
recs.add(entry.getValue());
}
return new ConcatenatedIterable<K, V>(recs);
return new ConcatenatedIterable<>(recs);
}
/**
* Get the partitions which have records contained in this record set.
* @return the set of partitions with data in this record set (may be empty if no data was returned)
*/
public Set<TopicPartition> partitions() {
return Collections.unmodifiableSet(records.keySet());
}
@Override
public Iterator<ConsumerRecord<K, V>> iterator() {
return new ConcatenatedIterable<K, V>(records.values()).iterator();
return new ConcatenatedIterable<>(records.values()).iterator();
}
/**

View File

@ -596,7 +596,6 @@ public class Fetcher<K, V> {
} catch (RuntimeException e) {
throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
}
}
private static class PartitionRecords<K, V> {

View File

@ -273,6 +273,7 @@ class GroupCoordinator(val brokerId: Int,
// if this is the leader, then we can attempt to persist state and transition to stable
if (memberId == group.leaderId) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
val missing = group.allMembers -- groupAssignment.keySet

View File

@ -14,3 +14,4 @@
# limitations under the License.
from kafka import KafkaService
from util import TopicPartition

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])

View File

@ -0,0 +1,222 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.kafka.version import TRUNK
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.kafka import TopicPartition
from collections import namedtuple
import json
import os
import subprocess
import time
import signal
class VerifiableConsumer(BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stdout")
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.stderr")
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "verifiable_consumer.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_consumer.properties")
logs = {
"verifiable_consumer_stdout": {
"path": STDOUT_CAPTURE,
"collect_default": False},
"verifiable_consumer_stderr": {
"path": STDERR_CAPTURE,
"collect_default": False},
"verifiable_consumer_log": {
"path": LOG_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, topic, group_id,
max_messages=-1, session_timeout=30000, version=TRUNK):
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = "TRACE"
self.kafka = kafka
self.topic = topic
self.group_id = group_id
self.max_messages = max_messages
self.session_timeout = session_timeout
self.assignment = {}
self.joined = set()
self.total_records = 0
self.consumed_positions = {}
self.committed_offsets = {}
self.revoked_count = 0
self.assigned_count = 0
for node in self.nodes:
node.version = version
self.prop_file = ""
self.security_config = kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=VerifiableConsumer.LOG_FILE)
node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config)
# Create and upload config file
self.logger.info("verifiable_consumer.properties:")
self.logger.info(self.prop_file)
node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)
self.security_config.setup_node(node)
cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
for line in node.account.ssh_capture(cmd):
event = self.try_parse_json(line.strip())
if event is not None:
with self.lock:
name = event["name"]
if name == "shutdown_complete":
self._handle_shutdown_complete(node)
if name == "offsets_committed":
self._handle_offsets_committed(node, event)
elif name == "records_consumed":
self._handle_records_consumed(node, event)
elif name == "partitions_revoked":
self._handle_partitions_revoked(node, event)
elif name == "partitions_assigned":
self._handle_partitions_assigned(node, event)
def _handle_shutdown_complete(self, node):
if node in self.joined:
self.joined.remove(node)
def _handle_offsets_committed(self, node, event):
if event["success"]:
for offset_commit in event["offsets"]:
topic = offset_commit["topic"]
partition = offset_commit["partition"]
tp = TopicPartition(topic, partition)
self.committed_offsets[tp] = offset_commit["offset"]
def _handle_records_consumed(self, node, event):
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
tp = TopicPartition(topic, partition)
self.consumed_positions[tp] = topic_partition["maxOffset"] + 1
self.total_records += event["count"]
def _handle_partitions_revoked(self, node, event):
self.revoked_count += 1
self.assignment[node] = []
if node in self.joined:
self.joined.remove(node)
def _handle_partitions_assigned(self, node, event):
self.assigned_count += 1
self.joined.add(node)
assignment =[]
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
assignment.append(TopicPartition(topic, partition))
self.assignment[node] = assignment
def start_cmd(self, node):
cmd = ""
cmd += "export LOG_DIR=%s;" % VerifiableConsumer.LOG_DIR
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableConsumer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableConsumer" \
" --group-id %s --topic %s --broker-list %s --session-timeout %s" % \
(self.group_id, self.topic, self.kafka.bootstrap_servers(), self.session_timeout)
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, VerifiableConsumer.STDOUT_CAPTURE)
print(cmd)
return cmd
def pids(self, node):
try:
cmd = "jps | grep -i VerifiableConsumer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def try_parse_json(self, string):
"""Try to parse a string as json. Return None if not parseable."""
try:
return json.loads(string)
except ValueError:
self.logger.debug("Could not parse as json: %s" % str(string))
return None
def kill_node(self, node, clean_shutdown=True, allow_fail=False):
if clean_shutdown:
sig = signal.SIGTERM
else:
sig = signal.SIGKILL
for pid in self.pids(node):
node.account.signal(pid, sig, allow_fail)
if not clean_shutdown:
self._handle_shutdown_complete(node)
def stop_node(self, node, clean_shutdown=True, allow_fail=False):
self.kill_node(node, clean_shutdown, allow_fail)
if self.worker_threads is None:
return
# block until the corresponding thread exits
if len(self.worker_threads) >= self.idx(node):
# Need to guard this because stop is preemptively called before the worker threads are added and started
self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node):
self.kill_node(node, clean_shutdown=False)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)
def current_assignment(self):
with self.lock:
return self.assignment
def position(self, tp):
with self.lock:
return self.consumed_positions[tp]
def owner(self, tp):
with self.lock:
for node, assignment in self.assignment.iteritems():
if tp in assignment:
return node
return None
def committed(self, tp):
with self.lock:
return self.committed_offsets[tp]

View File

@ -21,6 +21,7 @@ from kafkatest.services.security.security_config import SecurityConfig
import json
import os
import signal
import subprocess
import time
@ -128,9 +129,17 @@ class VerifiableProducer(BackgroundThreadService):
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
def kill_node(self, node, clean_shutdown=True, allow_fail=False):
if clean_shutdown:
sig = signal.SIGTERM
else:
sig = signal.SIGKILL
for pid in self.pids(node):
node.account.signal(pid, sig, allow_fail)
def pids(self, node):
try:
cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'"
cmd = "jps | grep -i VerifiableProducer | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
@ -160,7 +169,7 @@ class VerifiableProducer(BackgroundThreadService):
return len(self.not_acked_values)
def stop_node(self, node):
node.account.kill_process("VerifiableProducer", allow_fail=False)
self.kill_node(node, clean_shutdown=False, allow_fail=False)
if self.worker_threads is None:
return
@ -170,7 +179,7 @@ class VerifiableProducer(BackgroundThreadService):
self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node):
node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
self.kill_node(node, clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

View File

@ -0,0 +1,157 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition
def partitions_for(topic, num_partitions):
partitions = set()
for i in range(num_partitions):
partitions.add(TopicPartition(topic=topic, partition=i))
return partitions
class VerifiableConsumerTest(KafkaTest):
STOPIC = "simple_topic"
TOPIC = "test_topic"
NUM_PARTITIONS = 3
PARTITIONS = partitions_for(TOPIC, NUM_PARTITIONS)
GROUP_ID = "test_group_id"
def __init__(self, test_context):
super(VerifiableConsumerTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
self.TOPIC : { 'partitions': self.NUM_PARTITIONS, 'replication-factor': 1 },
self.STOPIC : { 'partitions': 1, 'replication-factor': 2 }
})
self.num_producers = 1
self.num_consumers = 2
self.session_timeout = 10000
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers
def _partitions(self, assignment):
partitions = []
for parts in assignment.itervalues():
partitions += parts
return partitions
def _valid_assignment(self, assignment):
partitions = self._partitions(assignment)
return len(partitions) == self.NUM_PARTITIONS and set(partitions) == self.PARTITIONS
def _setup_consumer(self, topic):
return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka,
topic, self.GROUP_ID, session_timeout=self.session_timeout)
def _setup_producer(self, topic, max_messages=-1):
return VerifiableProducer(self.test_context, self.num_producers,
self.kafka, topic, max_messages=max_messages)
def _await_all_members(self, consumer):
# Wait until all members have joined the group
wait_until(lambda: len(consumer.joined) == self.num_consumers, timeout_sec=20,
err_msg="Consumers failed to join in a reasonable amount of time")
def test_consumer_failure(self):
partition = TopicPartition(self.STOPIC, 0)
consumer = self._setup_consumer(self.STOPIC)
producer = self._setup_producer(self.STOPIC)
consumer.start()
self._await_all_members(consumer)
partition_owner = consumer.owner(partition)
assert partition_owner is not None
# startup the producer and ensure that some records have been written
producer.start()
wait_until(lambda: producer.num_acked > 1000, timeout_sec=20,
err_msg="Producer failed waiting for messages to be written")
# stop the partition owner and await its shutdown
consumer.kill_node(partition_owner, clean_shutdown=True)
wait_until(lambda: len(consumer.joined) == 1, timeout_sec=20,
err_msg="Timed out waiting for consumer to close")
# ensure that the remaining consumer does some work after rebalancing
current_total_records = consumer.total_records
wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
# if the total records consumed matches the current position,
# we haven't seen any duplicates
assert consumer.position(partition) == consumer.total_records
assert consumer.committed(partition) <= consumer.total_records
def test_broker_failure(self):
partition = TopicPartition(self.STOPIC, 0)
consumer = self._setup_consumer(self.STOPIC)
producer = self._setup_producer(self.STOPIC)
producer.start()
consumer.start()
self._await_all_members(consumer)
# shutdown one of the brokers
self.kafka.signal_node(self.kafka.nodes[0])
# ensure that the remaining consumer does some work after broker failure
current_total_records = consumer.total_records
wait_until(lambda: consumer.total_records > current_total_records + 1000, timeout_sec=20,
err_msg="Timed out waiting for additional records to be consumed after first consumer failed")
# if the total records consumed matches the current position,
# we haven't seen any duplicates
assert consumer.position(partition) == consumer.total_records
assert consumer.committed(partition) <= consumer.total_records
def test_simple_consume(self):
total_records = 1000
consumer = self._setup_consumer(self.STOPIC)
producer = self._setup_producer(self.STOPIC, max_messages=total_records)
partition = TopicPartition(self.STOPIC, 0)
consumer.start()
self._await_all_members(consumer)
producer.start()
wait_until(lambda: producer.num_acked == total_records, timeout_sec=20,
err_msg="Producer failed waiting for messages to be written")
wait_until(lambda: consumer.committed(partition) == total_records, timeout_sec=10,
err_msg="Consumer failed to read all expected messages")
assert consumer.position(partition) == total_records
def test_valid_assignment(self):
consumer = self._setup_consumer(self.TOPIC)
consumer.start()
self._await_all_members(consumer)
assert self._valid_assignment(consumer.current_assignment())

View File

@ -0,0 +1,611 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
/**
* Command line consumer designed for system testing. It outputs consumer events to STDOUT as JSON
* formatted objects. The "name" field in each JSON event identifies the event type. The following
* events are currently supported:
*
* <ul>
* <li>partitions_revoked: outputs the partitions revoked through {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
* See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}</li>
* <li>partitions_assigned: outputs the partitions assigned through {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
* See {@link org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.</li>
* <li>records_consumed: contains a summary of records consumed in a single call to {@link KafkaConsumer#poll(long)}.
* See {@link org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.</li>
* <li>record_data: contains the key, value, and offset of an individual consumed record (only included if verbose
* output is enabled). See {@link org.apache.kafka.tools.VerifiableConsumer.RecordData}.</li>
* <li>offsets_committed: The result of every offset commit (only included if auto-commit is not enabled).
* See {@link org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}</li>
* <li>shutdown_complete: emitted after the consumer returns from {@link KafkaConsumer#close()}.
* See {@link org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.</li>
* </ul>
*/
public class VerifiableConsumer implements Closeable, OffsetCommitCallback, ConsumerRebalanceListener {
private final ObjectMapper mapper = new ObjectMapper();
private final PrintStream out;
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final boolean useAutoCommit;
private final boolean useAsyncCommit;
private final boolean verbose;
private final int maxMessages;
private int consumedMessages = 0;
private CountDownLatch shutdownLatch = new CountDownLatch(1);
public VerifiableConsumer(KafkaConsumer<String, String> consumer,
PrintStream out,
String topic,
int maxMessages,
boolean useAutoCommit,
boolean useAsyncCommit,
boolean verbose) {
this.consumer = consumer;
this.out = out;
this.topic = topic;
this.maxMessages = maxMessages;
this.useAutoCommit = useAutoCommit;
this.useAsyncCommit = useAsyncCommit;
this.verbose = verbose;
addKafkaSerializerModule();
}
private void addKafkaSerializerModule() {
SimpleModule kafka = new SimpleModule();
kafka.addSerializer(TopicPartition.class, new JsonSerializer<TopicPartition>() {
@Override
public void serialize(TopicPartition tp, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
gen.writeObjectField("topic", tp.topic());
gen.writeObjectField("partition", tp.partition());
gen.writeEndObject();
}
});
mapper.registerModule(kafka);
}
private boolean hasMessageLimit() {
return maxMessages >= 0;
}
private boolean isFinished() {
return hasMessageLimit() && consumedMessages >= maxMessages;
}
private Map<TopicPartition, OffsetAndMetadata> onRecordsReceived(ConsumerRecords<String, String> records) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
List<RecordSetSummary> summaries = new ArrayList<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
if (hasMessageLimit() && consumedMessages + partitionRecords.size() > maxMessages)
partitionRecords = partitionRecords.subList(0, maxMessages - consumedMessages);
if (partitionRecords.isEmpty())
continue;
long minOffset = partitionRecords.get(0).offset();
long maxOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(maxOffset + 1));
summaries.add(new RecordSetSummary(tp.topic(), tp.partition(),
partitionRecords.size(), minOffset, maxOffset));
if (verbose) {
for (ConsumerRecord<String, String> record : partitionRecords)
printJson(new RecordData(record));
}
consumedMessages += partitionRecords.size();
if (isFinished())
break;
}
printJson(new RecordsConsumed(records.count(), summaries));
return offsets;
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
List<CommitData> committedOffsets = new ArrayList<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
TopicPartition tp = offsetEntry.getKey();
committedOffsets.add(new CommitData(tp.topic(), tp.partition(), offsetEntry.getValue().offset()));
}
boolean success = true;
String error = null;
if (exception != null) {
success = false;
error = exception.getMessage();
}
printJson(new OffsetsCommitted(committedOffsets, error, success));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printJson(new PartitionsAssigned(partitions));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printJson(new PartitionsRevoked(partitions));
}
private void printJson(Object data) {
try {
out.println(mapper.writeValueAsString(data));
} catch (JsonProcessingException e) {
out.println("Bad data can't be written as json: " + e.getMessage());
}
}
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
consumer.commitSync(offsets);
onComplete(offsets, null);
} catch (WakeupException e) {
// we only call wakeup() once to close the consumer, so this recursion should be safe
commitSync(offsets);
throw e;
} catch (Exception e) {
onComplete(offsets, e);
}
}
public void run() {
try {
consumer.subscribe(Arrays.asList(topic), this);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);
if (!useAutoCommit) {
if (useAsyncCommit)
consumer.commitAsync(offsets, this);
else
commitSync(offsets);
}
}
} catch (WakeupException e) {
// ignore, we are closing
} finally {
consumer.close();
printJson(new ShutdownComplete());
shutdownLatch.countDown();
}
}
public void close() {
boolean interrupted = false;
try {
consumer.wakeup();
while (true) {
try {
shutdownLatch.await();
return;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
private static abstract class ConsumerEvent {
@JsonProperty
public abstract String name();
@JsonProperty("class")
public String clazz() {
return VerifiableConsumer.class.getName();
}
}
private static class ShutdownComplete extends ConsumerEvent {
@Override
public String name() {
return "shutdown_complete";
}
}
private static class PartitionsRevoked extends ConsumerEvent {
private final Collection<TopicPartition> partitions;
public PartitionsRevoked(Collection<TopicPartition> partitions) {
this.partitions = partitions;
}
@JsonProperty
public Collection<TopicPartition> partitions() {
return partitions;
}
@Override
public String name() {
return "partitions_revoked";
}
}
private static class PartitionsAssigned extends ConsumerEvent {
private final Collection<TopicPartition> partitions;
public PartitionsAssigned(Collection<TopicPartition> partitions) {
this.partitions = partitions;
}
@JsonProperty
public Collection<TopicPartition> partitions() {
return partitions;
}
@Override
public String name() {
return "partitions_assigned";
}
}
public static class RecordsConsumed extends ConsumerEvent {
private final long count;
private final List<RecordSetSummary> partitionSummaries;
public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) {
this.count = count;
this.partitionSummaries = partitionSummaries;
}
@Override
public String name() {
return "records_consumed";
}
@JsonProperty
public long count() {
return count;
}
@JsonProperty
public List<RecordSetSummary> partitions() {
return partitionSummaries;
}
}
public static class RecordData extends ConsumerEvent {
private final ConsumerRecord<String, String> record;
public RecordData(ConsumerRecord<String, String> record) {
this.record = record;
}
@Override
public String name() {
return "record_data";
}
@JsonProperty
public String topic() {
return record.topic();
}
@JsonProperty
public int partition() {
return record.partition();
}
@JsonProperty
public String key() {
return record.key();
}
@JsonProperty
public String value() {
return record.value();
}
@JsonProperty
public long offset() {
return record.offset();
}
}
private static class PartitionData {
private final String topic;
private final int partition;
public PartitionData(String topic, int partition) {
this.topic = topic;
this.partition = partition;
}
@JsonProperty
public String topic() {
return topic;
}
@JsonProperty
public int partition() {
return partition;
}
}
private static class OffsetsCommitted extends ConsumerEvent {
private final List<CommitData> offsets;
private final String error;
private final boolean success;
public OffsetsCommitted(List<CommitData> offsets, String error, boolean success) {
this.offsets = offsets;
this.error = error;
this.success = success;
}
@Override
public String name() {
return "offsets_committed";
}
@JsonProperty
public List<CommitData> offsets() {
return offsets;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String error() {
return error;
}
@JsonProperty
public boolean success() {
return success;
}
}
private static class CommitData extends PartitionData {
private final long offset;
public CommitData(String topic, int partition, long offset) {
super(topic, partition);
this.offset = offset;
}
@JsonProperty
public long offset() {
return offset;
}
}
private static class RecordSetSummary extends PartitionData {
private final long count;
private final long minOffset;
private final long maxOffset;
public RecordSetSummary(String topic, int partition, long count, long minOffset, long maxOffset) {
super(topic, partition);
this.count = count;
this.minOffset = minOffset;
this.maxOffset = maxOffset;
}
@JsonProperty
public long count() {
return count;
}
@JsonProperty
public long minOffset() {
return minOffset;
}
@JsonProperty
public long maxOffset() {
return maxOffset;
}
}
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-consumer")
.defaultHelp(true)
.description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Consumes messages from this topic.");
parser.addArgument("--group-id")
.action(store())
.required(true)
.type(String.class)
.metavar("GROUP_ID")
.dest("groupId")
.help("The groupId shared among members of the consumer group");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.type(Integer.class)
.setDefault(-1)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");
parser.addArgument("--session-timeout")
.action(store())
.required(false)
.setDefault(30000)
.type(Integer.class)
.metavar("TIMEOUT_MS")
.dest("sessionTimeout")
.help("Set the consumer's session timeout");
parser.addArgument("--verbose")
.action(storeTrue())
.type(Boolean.class)
.metavar("VERBOSE")
.help("Enable to log individual consumed records");
parser.addArgument("--enable-autocommit")
.action(storeTrue())
.type(Boolean.class)
.metavar("ENABLE-AUTOCOMMIT")
.dest("useAutoCommit")
.help("Enable offset auto-commit on consumer");
parser.addArgument("--reset-policy")
.action(store())
.required(false)
.setDefault("earliest")
.type(String.class)
.dest("resetPolicy")
.help("Set reset policy (must be either 'earliest', 'latest', or 'none'");
parser.addArgument("--consumer.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG_FILE")
.help("Consumer config properties file (config options shared with command line parameters will be overridden).");
return parser;
}
public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
Namespace res = parser.parseArgs(args);
String topic = res.getString("topic");
boolean useAutoCommit = res.getBoolean("useAutoCommit");
int maxMessages = res.getInt("maxMessages");
boolean verbose = res.getBoolean("verbose");
String configFile = res.getString("consumer.config");
Properties consumerProps = new Properties();
if (configFile != null) {
try {
consumerProps.putAll(Utils.loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, res.getString("groupId"));
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
StringDeserializer deserializer = new StringDeserializer();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer);
return new VerifiableConsumer(
consumer,
System.out,
topic,
maxMessages,
useAutoCommit,
false,
verbose);
}
public static void main(String[] args) {
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
System.exit(0);
}
try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
consumer.close();
}
});
consumer.run();
} catch (ArgumentParserException e) {
parser.handleError(e);
System.exit(1);
}
}
}