mirror of https://github.com/apache/kafka.git
KAFKA-3020; Ensure CheckStyle runs on all Java code
- Adds CheckStyle to core and examples modules - Fixes any existing CheckStyle issues Author: Grant Henke <granthenke@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #703 from granthenke/checkstyle-core
This commit is contained in:
parent
a0d21407cb
commit
64b746bd8b
15
build.gradle
15
build.gradle
|
@ -259,6 +259,7 @@ project(':core') {
|
|||
println "Building project 'core' with Scala version $resolvedScalaVersion"
|
||||
|
||||
apply plugin: 'scala'
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "kafka_${baseScalaVersion}"
|
||||
|
||||
dependencies {
|
||||
|
@ -334,8 +335,6 @@ project(':core') {
|
|||
into 'site-docs'
|
||||
}
|
||||
|
||||
|
||||
|
||||
tasks.create(name: "releaseTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
|
||||
into "kafka_${baseScalaVersion}-${version}"
|
||||
compression = Compression.GZIP
|
||||
|
@ -390,15 +389,27 @@ project(':core') {
|
|||
}
|
||||
into "$buildDir/dependant-testlibs"
|
||||
}
|
||||
|
||||
checkstyle {
|
||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
}
|
||||
|
||||
project(':examples') {
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "kafka-examples"
|
||||
|
||||
dependencies {
|
||||
compile project(':core')
|
||||
}
|
||||
|
||||
checkstyle {
|
||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
configProperties = [importControlFile: "$rootDir/checkstyle/import-control-core.xml"]
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
}
|
||||
|
||||
project(':clients') {
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
<!DOCTYPE import-control PUBLIC
|
||||
"-//Puppy Crawl//DTD Import Control 1.1//EN"
|
||||
"http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
|
||||
<!--
|
||||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
-->
|
||||
|
||||
<import-control pkg="kafka">
|
||||
|
||||
<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
|
||||
|
||||
<!-- common library dependencies -->
|
||||
<allow pkg="java" />
|
||||
<allow pkg="scala" />
|
||||
<allow pkg="javax.management" />
|
||||
<allow pkg="org.slf4j" />
|
||||
<allow pkg="org.junit" />
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="java.security" />
|
||||
<allow pkg="javax.net.ssl" />
|
||||
<allow pkg="javax.security" />
|
||||
|
||||
<allow pkg="kafka.common" />
|
||||
<allow pkg="kafka.utils" />
|
||||
<allow pkg="kafka.serializer" />
|
||||
<allow pkg="org.apache.kafka.common" />
|
||||
|
||||
<subpackage name="javaapi">
|
||||
<subpackage name="consumer">
|
||||
<allow pkg="kafka.consumer" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="message">
|
||||
<allow pkg="kafka.message" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="producer">
|
||||
<allow pkg="kafka.producer" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="tools">
|
||||
<allow pkg="kafka.javaapi" />
|
||||
<allow pkg="kafka.producer" />
|
||||
<allow pkg="kafka.consumer" />
|
||||
<allow pkg="joptsimple" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="examples">
|
||||
<allow pkg="org.apache.kafka.clients" />
|
||||
<allow pkg="kafka.api" />
|
||||
<allow pkg="kafka.javaapi" />
|
||||
<allow pkg="kafka.message" />
|
||||
</subpackage>
|
||||
|
||||
</import-control>
|
|
@ -17,72 +17,72 @@
|
|||
|
||||
package kafka.javaapi.consumer;
|
||||
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import kafka.common.OffsetAndMetadata;
|
||||
import kafka.common.TopicAndPartition;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.consumer.TopicFilter;
|
||||
import kafka.serializer.Decoder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface ConsumerConnector {
|
||||
/**
|
||||
* Create a list of MessageStreams of type T for each topic.
|
||||
*
|
||||
* @param topicCountMap a map of (topic, #streams) pair
|
||||
* @param keyDecoder a decoder that decodes the message key
|
||||
* @param valueDecoder a decoder that decodes the message itself
|
||||
* @return a map of (topic, list of KafkaStream) pairs.
|
||||
* The number of items in the list is #streams. Each stream supports
|
||||
* an iterator over message/metadata pairs.
|
||||
*/
|
||||
public <K,V> Map<String, List<KafkaStream<K,V>>>
|
||||
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
|
||||
|
||||
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
|
||||
/**
|
||||
* Create a list of MessageStreams of type T for each topic.
|
||||
*
|
||||
* @param topicCountMap a map of (topic, #streams) pair
|
||||
* @param keyDecoder a decoder that decodes the message key
|
||||
* @param valueDecoder a decoder that decodes the message itself
|
||||
* @return a map of (topic, list of KafkaStream) pairs.
|
||||
* The number of items in the list is #streams. Each stream supports
|
||||
* an iterator over message/metadata pairs.
|
||||
*/
|
||||
public <K, V> Map<String, List<KafkaStream<K, V>>>
|
||||
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
|
||||
|
||||
/**
|
||||
* Create a list of MessageAndTopicStreams containing messages of type T.
|
||||
*
|
||||
* @param topicFilter a TopicFilter that specifies which topics to
|
||||
* subscribe to (encapsulates a whitelist or a blacklist).
|
||||
* @param numStreams the number of message streams to return.
|
||||
* @param keyDecoder a decoder that decodes the message key
|
||||
* @param valueDecoder a decoder that decodes the message itself
|
||||
* @return a list of KafkaStream. Each stream supports an
|
||||
* iterator over its MessageAndMetadata elements.
|
||||
*/
|
||||
public <K,V> List<KafkaStream<K,V>>
|
||||
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
|
||||
|
||||
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
|
||||
|
||||
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
|
||||
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
|
||||
|
||||
/**
|
||||
* Commit the offsets of all broker partitions connected by this connector.
|
||||
*/
|
||||
public void commitOffsets();
|
||||
public void commitOffsets(boolean retryOnFailure);
|
||||
/**
|
||||
* Create a list of MessageAndTopicStreams containing messages of type T.
|
||||
*
|
||||
* @param topicFilter a TopicFilter that specifies which topics to
|
||||
* subscribe to (encapsulates a whitelist or a blacklist).
|
||||
* @param numStreams the number of message streams to return.
|
||||
* @param keyDecoder a decoder that decodes the message key
|
||||
* @param valueDecoder a decoder that decodes the message itself
|
||||
* @return a list of KafkaStream. Each stream supports an
|
||||
* iterator over its MessageAndMetadata elements.
|
||||
*/
|
||||
public <K, V> List<KafkaStream<K, V>>
|
||||
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
|
||||
|
||||
/**
|
||||
* Commit offsets using the provided offsets map
|
||||
*
|
||||
* @param offsetsToCommit a map containing the offset to commit for each partition.
|
||||
* @param retryOnFailure enable retries on the offset commit if it fails.
|
||||
*/
|
||||
public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
|
||||
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
|
||||
|
||||
/**
|
||||
* Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
|
||||
* @param listener The consumer rebalance listener to wire in
|
||||
*/
|
||||
public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
|
||||
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
|
||||
|
||||
/**
|
||||
* Shut down the connector
|
||||
*/
|
||||
public void shutdown();
|
||||
/**
|
||||
* Commit the offsets of all broker partitions connected by this connector.
|
||||
*/
|
||||
public void commitOffsets();
|
||||
|
||||
public void commitOffsets(boolean retryOnFailure);
|
||||
|
||||
/**
|
||||
* Commit offsets using the provided offsets map
|
||||
*
|
||||
* @param offsetsToCommit a map containing the offset to commit for each partition.
|
||||
* @param retryOnFailure enable retries on the offset commit if it fails.
|
||||
*/
|
||||
public void commitOffsets(Map<TopicAndPartition, OffsetAndMetadata> offsetsToCommit, boolean retryOnFailure);
|
||||
|
||||
/**
|
||||
* Wire in a consumer rebalance listener to be executed when consumer rebalance occurs.
|
||||
* @param listener The consumer rebalance listener to wire in
|
||||
*/
|
||||
public void setConsumerRebalanceListener(ConsumerRebalanceListener listener);
|
||||
|
||||
/**
|
||||
* Shut down the connector
|
||||
*/
|
||||
public void shutdown();
|
||||
}
|
||||
|
|
|
@ -17,10 +17,15 @@
|
|||
|
||||
package kafka.tools;
|
||||
|
||||
import joptsimple.*;
|
||||
import joptsimple.ArgumentAcceptingOptionSpec;
|
||||
import joptsimple.OptionParser;
|
||||
import joptsimple.OptionSet;
|
||||
import joptsimple.OptionSpec;
|
||||
import joptsimple.OptionSpecBuilder;
|
||||
import kafka.javaapi.producer.Producer;
|
||||
import kafka.producer.KeyedMessage;
|
||||
import kafka.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -38,8 +43,6 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
|
||||
/**
|
||||
* This is a kafka 0.7 to 0.8 online migration tool used for migrating data from 0.7 to 0.8 cluster. Internally,
|
||||
|
@ -58,429 +61,427 @@ import org.apache.kafka.common.utils.Utils;
|
|||
* the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code.
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public class KafkaMigrationTool
|
||||
{
|
||||
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
|
||||
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
|
||||
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
|
||||
private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
|
||||
private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
|
||||
private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
|
||||
private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
|
||||
private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
|
||||
private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
|
||||
private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
|
||||
private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
|
||||
public class KafkaMigrationTool {
|
||||
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
|
||||
private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
|
||||
private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
|
||||
private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
|
||||
private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
|
||||
private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
|
||||
private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
|
||||
private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
|
||||
private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
|
||||
private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
|
||||
private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
|
||||
|
||||
private static Class<?> KafkaStaticConsumer_07 = null;
|
||||
private static Class<?> ConsumerConfig_07 = null;
|
||||
private static Class<?> ConsumerConnector_07 = null;
|
||||
private static Class<?> KafkaStream_07 = null;
|
||||
private static Class<?> TopicFilter_07 = null;
|
||||
private static Class<?> WhiteList_07 = null;
|
||||
private static Class<?> BlackList_07 = null;
|
||||
private static Class<?> KafkaConsumerIteratorClass_07 = null;
|
||||
private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
|
||||
private static Class<?> KafkaMessageClass_07 = null;
|
||||
private static Class<?> kafkaStaticConsumer07 = null;
|
||||
private static Class<?> consumerConfig07 = null;
|
||||
private static Class<?> consumerConnector07 = null;
|
||||
private static Class<?> kafkaStream07 = null;
|
||||
private static Class<?> topicFilter07 = null;
|
||||
private static Class<?> whiteList07 = null;
|
||||
private static Class<?> blackList07 = null;
|
||||
private static Class<?> kafkaConsumerIteratorClass07 = null;
|
||||
private static Class<?> kafkaMessageAndMetaDataClass07 = null;
|
||||
private static Class<?> kafkaMessageClass07 = null;
|
||||
|
||||
public static void main(String[] args) throws InterruptedException, IOException {
|
||||
OptionParser parser = new OptionParser();
|
||||
ArgumentAcceptingOptionSpec<String> consumerConfigOpt
|
||||
= parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
public static void main(String[] args) throws InterruptedException, IOException {
|
||||
OptionParser parser = new OptionParser();
|
||||
ArgumentAcceptingOptionSpec<String> consumerConfigOpt
|
||||
= parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. " + "You man specify multiple of these.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> producerConfigOpt
|
||||
= parser.accepts("producer.config", "Producer config.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
ArgumentAcceptingOptionSpec<String> producerConfigOpt
|
||||
= parser.accepts("producer.config", "Producer config.")
|
||||
.withRequiredArg()
|
||||
.describedAs("config file")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<Integer> numProducersOpt
|
||||
= parser.accepts("num.producers", "Number of producer instances")
|
||||
.withRequiredArg()
|
||||
.describedAs("Number of producers")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
ArgumentAcceptingOptionSpec<Integer> numProducersOpt
|
||||
= parser.accepts("num.producers", "Number of producer instances")
|
||||
.withRequiredArg()
|
||||
.describedAs("Number of producers")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
|
||||
= parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
|
||||
.withRequiredArg()
|
||||
.describedAs("zkClient 0.1 jar file required by Kafka 0.7")
|
||||
.ofType(String.class);
|
||||
ArgumentAcceptingOptionSpec<String> zkClient01JarOpt
|
||||
= parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file")
|
||||
.withRequiredArg()
|
||||
.describedAs("zkClient 0.1 jar file required by Kafka 0.7")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> kafka07JarOpt
|
||||
= parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
|
||||
.withRequiredArg()
|
||||
.describedAs("kafka 0.7 jar")
|
||||
.ofType(String.class);
|
||||
ArgumentAcceptingOptionSpec<String> kafka07JarOpt
|
||||
= parser.accepts("kafka.07.jar", "Kafka 0.7 jar file")
|
||||
.withRequiredArg()
|
||||
.describedAs("kafka 0.7 jar")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
|
||||
= parser.accepts("num.streams", "Number of consumer streams")
|
||||
.withRequiredArg()
|
||||
.describedAs("Number of consumer threads")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
ArgumentAcceptingOptionSpec<Integer> numStreamsOpt
|
||||
= parser.accepts("num.streams", "Number of consumer streams")
|
||||
.withRequiredArg()
|
||||
.describedAs("Number of consumer threads")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(1);
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> whitelistOpt
|
||||
= parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
|
||||
.withRequiredArg()
|
||||
.describedAs("Java regex (String)")
|
||||
.ofType(String.class);
|
||||
ArgumentAcceptingOptionSpec<String> whitelistOpt
|
||||
= parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster")
|
||||
.withRequiredArg()
|
||||
.describedAs("Java regex (String)")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> blacklistOpt
|
||||
= parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
|
||||
.withRequiredArg()
|
||||
.describedAs("Java regex (String)")
|
||||
.ofType(String.class);
|
||||
ArgumentAcceptingOptionSpec<String> blacklistOpt
|
||||
= parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster")
|
||||
.withRequiredArg()
|
||||
.describedAs("Java regex (String)")
|
||||
.ofType(String.class);
|
||||
|
||||
ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
|
||||
= parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
|
||||
.withRequiredArg()
|
||||
.describedAs("Queue size in terms of number of messages")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(10000);
|
||||
ArgumentAcceptingOptionSpec<Integer> queueSizeOpt
|
||||
= parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer")
|
||||
.withRequiredArg()
|
||||
.describedAs("Queue size in terms of number of messages")
|
||||
.ofType(Integer.class)
|
||||
.defaultsTo(10000);
|
||||
|
||||
OptionSpecBuilder helpOpt
|
||||
= parser.accepts("help", "Print this message.");
|
||||
OptionSpecBuilder helpOpt
|
||||
= parser.accepts("help", "Print this message.");
|
||||
|
||||
OptionSet options = parser.parse(args);
|
||||
OptionSet options = parser.parse(args);
|
||||
|
||||
if (options.has(helpOpt)) {
|
||||
parser.printHelpOn(System.out);
|
||||
System.exit(0);
|
||||
if (options.has(helpOpt)) {
|
||||
parser.printHelpOn(System.out);
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
|
||||
int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
|
||||
int blackListCount = options.has(blacklistOpt) ? 1 : 0;
|
||||
if (whiteListCount + blackListCount != 1) {
|
||||
System.err.println("Exactly one of whitelist or blacklist is required.");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
String kafkaJarFile07 = options.valueOf(kafka07JarOpt);
|
||||
String zkClientJarFile = options.valueOf(zkClient01JarOpt);
|
||||
String consumerConfigFile07 = options.valueOf(consumerConfigOpt);
|
||||
int numConsumers = options.valueOf(numStreamsOpt);
|
||||
String producerConfigFile08 = options.valueOf(producerConfigOpt);
|
||||
int numProducers = options.valueOf(numProducersOpt);
|
||||
final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
|
||||
final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
|
||||
|
||||
try {
|
||||
File kafkaJar07 = new File(kafkaJarFile07);
|
||||
File zkClientJar = new File(zkClientJarFile);
|
||||
ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{
|
||||
kafkaJar07.toURI().toURL(),
|
||||
zkClientJar.toURI().toURL()
|
||||
});
|
||||
|
||||
/** Construct the 07 consumer config **/
|
||||
consumerConfig07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
|
||||
kafkaStaticConsumer07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
|
||||
consumerConnector07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
|
||||
kafkaStream07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
|
||||
topicFilter07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
|
||||
whiteList07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
|
||||
blackList07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
|
||||
kafkaMessageClass07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
|
||||
kafkaConsumerIteratorClass07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
|
||||
kafkaMessageAndMetaDataClass07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
|
||||
|
||||
Constructor consumerConfigConstructor07 = consumerConfig07.getConstructor(Properties.class);
|
||||
Properties kafkaConsumerProperties07 = new Properties();
|
||||
kafkaConsumerProperties07.load(new FileInputStream(consumerConfigFile07));
|
||||
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
|
||||
if (kafkaConsumerProperties07.getProperty("shallow.iterator.enable", "").equals("true")) {
|
||||
log.warn("Shallow iterator should not be used in the migration tool");
|
||||
kafkaConsumerProperties07.setProperty("shallow.iterator.enable", "false");
|
||||
}
|
||||
Object consumerConfig07 = consumerConfigConstructor07.newInstance(kafkaConsumerProperties07);
|
||||
|
||||
/** Construct the 07 consumer connector **/
|
||||
Method consumerConnectorCreationMethod07 = kafkaStaticConsumer07.getMethod("createJavaConsumerConnector", KafkaMigrationTool.consumerConfig07);
|
||||
final Object consumerConnector07 = consumerConnectorCreationMethod07.invoke(null, consumerConfig07);
|
||||
Method consumerConnectorCreateMessageStreamsMethod07 = KafkaMigrationTool.consumerConnector07.getMethod(
|
||||
"createMessageStreamsByFilter",
|
||||
topicFilter07, int.class);
|
||||
final Method consumerConnectorShutdownMethod07 = KafkaMigrationTool.consumerConnector07.getMethod("shutdown");
|
||||
Constructor whiteListConstructor07 = whiteList07.getConstructor(String.class);
|
||||
Constructor blackListConstructor07 = blackList07.getConstructor(String.class);
|
||||
Object filterSpec = null;
|
||||
if (options.has(whitelistOpt))
|
||||
filterSpec = whiteListConstructor07.newInstance(options.valueOf(whitelistOpt));
|
||||
else
|
||||
filterSpec = blackListConstructor07.newInstance(options.valueOf(blacklistOpt));
|
||||
|
||||
Object retKafkaStreams = consumerConnectorCreateMessageStreamsMethod07.invoke(consumerConnector07, filterSpec, numConsumers);
|
||||
|
||||
Properties kafkaProducerProperties08 = new Properties();
|
||||
kafkaProducerProperties08.load(new FileInputStream(producerConfigFile08));
|
||||
kafkaProducerProperties08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
|
||||
// create a producer channel instead
|
||||
int queueSize = options.valueOf(queueSizeOpt);
|
||||
ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
|
||||
int threadId = 0;
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
consumerConnectorShutdownMethod07.invoke(consumerConnector07);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while shutting down Kafka consumer", e);
|
||||
}
|
||||
for (MigrationThread migrationThread : migrationThreads) {
|
||||
migrationThread.shutdown();
|
||||
}
|
||||
for (ProducerThread producerThread : producerThreads) {
|
||||
producerThread.shutdown();
|
||||
}
|
||||
for (ProducerThread producerThread : producerThreads) {
|
||||
producerThread.awaitShutdown();
|
||||
}
|
||||
log.info("Kafka migration tool shutdown successfully");
|
||||
}
|
||||
});
|
||||
|
||||
// start consumer threads
|
||||
for (Object stream : (List) retKafkaStreams) {
|
||||
MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
|
||||
threadId++;
|
||||
thread.start();
|
||||
migrationThreads.add(thread);
|
||||
}
|
||||
|
||||
String clientId = kafkaProducerProperties08.getProperty("client.id");
|
||||
// start producer threads
|
||||
for (int i = 0; i < numProducers; i++) {
|
||||
kafkaProducerProperties08.put("client.id", clientId + "-" + i);
|
||||
ProducerConfig producerConfig08 = new ProducerConfig(kafkaProducerProperties08);
|
||||
Producer producer = new Producer(producerConfig08);
|
||||
ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
|
||||
producerThread.start();
|
||||
producerThreads.add(producerThread);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
|
||||
log.error("Kafka migration tool failed: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
|
||||
int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
|
||||
int blackListCount = options.has(blacklistOpt) ? 1 : 0;
|
||||
if(whiteListCount + blackListCount != 1) {
|
||||
System.err.println("Exactly one of whitelist or blacklist is required.");
|
||||
System.exit(1);
|
||||
private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
|
||||
for (OptionSpec arg : required) {
|
||||
if (!options.has(arg)) {
|
||||
System.err.println("Missing required argument \"" + arg + "\"");
|
||||
parser.printHelpOn(System.err);
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
|
||||
String zkClientJarFile = options.valueOf(zkClient01JarOpt);
|
||||
String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
|
||||
int numConsumers = options.valueOf(numStreamsOpt);
|
||||
String producerConfigFile_08 = options.valueOf(producerConfigOpt);
|
||||
int numProducers = options.valueOf(numProducersOpt);
|
||||
final List<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
|
||||
final List<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
|
||||
static class ProducerDataChannel<T> {
|
||||
private final int producerQueueSize;
|
||||
private final BlockingQueue<T> producerRequestQueue;
|
||||
|
||||
try {
|
||||
File kafkaJar_07 = new File(kafkaJarFile_07);
|
||||
File zkClientJar = new File(zkClientJarFile);
|
||||
ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[] {
|
||||
kafkaJar_07.toURI().toURL(),
|
||||
zkClientJar.toURI().toURL()
|
||||
});
|
||||
public ProducerDataChannel(int queueSize) {
|
||||
producerQueueSize = queueSize;
|
||||
producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
|
||||
}
|
||||
|
||||
/** Construct the 07 consumer config **/
|
||||
ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
|
||||
KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
|
||||
ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
|
||||
KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
|
||||
TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
|
||||
WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
|
||||
BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
|
||||
KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
|
||||
KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
|
||||
KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
|
||||
public void sendRequest(T data) throws InterruptedException {
|
||||
producerRequestQueue.put(data);
|
||||
}
|
||||
|
||||
Constructor ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
|
||||
Properties kafkaConsumerProperties_07 = new Properties();
|
||||
kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
|
||||
/** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
|
||||
if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
|
||||
log.warn("Shallow iterator should not be used in the migration tool");
|
||||
kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
|
||||
}
|
||||
Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
|
||||
public T receiveRequest() throws InterruptedException {
|
||||
return producerRequestQueue.take();
|
||||
}
|
||||
}
|
||||
|
||||
/** Construct the 07 consumer connector **/
|
||||
Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
|
||||
final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
|
||||
Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod(
|
||||
"createMessageStreamsByFilter",
|
||||
TopicFilter_07, int.class);
|
||||
final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown");
|
||||
Constructor WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
|
||||
Constructor BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
|
||||
Object filterSpec = null;
|
||||
if(options.has(whitelistOpt))
|
||||
filterSpec = WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt));
|
||||
else
|
||||
filterSpec = BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
|
||||
private static class MigrationThread extends Thread {
|
||||
private final Object stream;
|
||||
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
|
||||
private final int threadId;
|
||||
private final String threadName;
|
||||
private final org.apache.log4j.Logger logger;
|
||||
private CountDownLatch shutdownComplete = new CountDownLatch(1);
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(true);
|
||||
|
||||
Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
|
||||
MigrationThread(Object stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, int threadId) {
|
||||
this.stream = stream;
|
||||
this.producerDataChannel = producerDataChannel;
|
||||
this.threadId = threadId;
|
||||
threadName = "MigrationThread-" + threadId;
|
||||
logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
|
||||
this.setName(threadName);
|
||||
}
|
||||
|
||||
Properties kafkaProducerProperties_08 = new Properties();
|
||||
kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
|
||||
kafkaProducerProperties_08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
|
||||
// create a producer channel instead
|
||||
int queueSize = options.valueOf(queueSizeOpt);
|
||||
ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
|
||||
int threadId = 0;
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
|
||||
} catch(Exception e) {
|
||||
log.error("Error while shutting down Kafka consumer", e);
|
||||
}
|
||||
for(MigrationThread migrationThread : migrationThreads) {
|
||||
migrationThread.shutdown();
|
||||
}
|
||||
for(ProducerThread producerThread : producerThreads) {
|
||||
producerThread.shutdown();
|
||||
}
|
||||
for(ProducerThread producerThread : producerThreads) {
|
||||
producerThread.awaitShutdown();
|
||||
}
|
||||
log.info("Kafka migration tool shutdown successfully");
|
||||
try {
|
||||
Method messageGetPayloadMethod07 = kafkaMessageClass07.getMethod("payload");
|
||||
Method kafkaGetMessageMethod07 = kafkaMessageAndMetaDataClass07.getMethod("message");
|
||||
Method kafkaGetTopicMethod07 = kafkaMessageAndMetaDataClass07.getMethod("topic");
|
||||
Method consumerIteratorMethod = kafkaStream07.getMethod("iterator");
|
||||
Method kafkaStreamHasNextMethod07 = kafkaConsumerIteratorClass07.getMethod("hasNext");
|
||||
Method kafkaStreamNextMethod07 = kafkaConsumerIteratorClass07.getMethod("next");
|
||||
Object iterator = consumerIteratorMethod.invoke(stream);
|
||||
|
||||
while (((Boolean) kafkaStreamHasNextMethod07.invoke(iterator)).booleanValue()) {
|
||||
Object messageAndMetaData07 = kafkaStreamNextMethod07.invoke(iterator);
|
||||
Object message07 = kafkaGetMessageMethod07.invoke(messageAndMetaData07);
|
||||
Object topic = kafkaGetTopicMethod07.invoke(messageAndMetaData07);
|
||||
Object payload07 = messageGetPayloadMethod07.invoke(message07);
|
||||
int size = ((ByteBuffer) payload07).remaining();
|
||||
byte[] bytes = new byte[size];
|
||||
((ByteBuffer) payload07).get(bytes);
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic " + topic);
|
||||
KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String) topic, null, bytes);
|
||||
producerDataChannel.sendRequest(producerData);
|
||||
}
|
||||
logger.info("Migration thread " + threadName + " finished running");
|
||||
} catch (InvocationTargetException t) {
|
||||
logger.fatal("Migration thread failure due to root cause ", t.getCause());
|
||||
} catch (Throwable t) {
|
||||
logger.fatal("Migration thread failure due to ", t);
|
||||
} finally {
|
||||
shutdownComplete.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// start consumer threads
|
||||
for(Object stream: (List)retKafkaStreams) {
|
||||
MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
|
||||
threadId ++;
|
||||
thread.start();
|
||||
migrationThreads.add(thread);
|
||||
}
|
||||
|
||||
String clientId = kafkaProducerProperties_08.getProperty("client.id");
|
||||
// start producer threads
|
||||
for (int i = 0; i < numProducers; i++) {
|
||||
kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
|
||||
ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
|
||||
Producer producer = new Producer(producerConfig_08);
|
||||
ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
|
||||
producerThread.start();
|
||||
producerThreads.add(producerThread);
|
||||
}
|
||||
}
|
||||
catch (Throwable e){
|
||||
System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
|
||||
log.error("Kafka migration tool failed: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
|
||||
for(OptionSpec arg : required) {
|
||||
if(!options.has(arg)) {
|
||||
System.err.println("Missing required argument \"" + arg + "\"");
|
||||
parser.printHelpOn(System.err);
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class ProducerDataChannel<T> {
|
||||
private final int producerQueueSize;
|
||||
private final BlockingQueue<T> producerRequestQueue;
|
||||
|
||||
public ProducerDataChannel(int queueSize) {
|
||||
producerQueueSize = queueSize;
|
||||
producerRequestQueue = new ArrayBlockingQueue<T>(producerQueueSize);
|
||||
}
|
||||
|
||||
public void sendRequest(T data) throws InterruptedException {
|
||||
producerRequestQueue.put(data);
|
||||
}
|
||||
|
||||
public T receiveRequest() throws InterruptedException {
|
||||
return producerRequestQueue.take();
|
||||
}
|
||||
}
|
||||
|
||||
private static class MigrationThread extends Thread {
|
||||
private final Object stream;
|
||||
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
|
||||
private final int threadId;
|
||||
private final String threadName;
|
||||
private final org.apache.log4j.Logger logger;
|
||||
private CountDownLatch shutdownComplete = new CountDownLatch(1);
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(true);
|
||||
|
||||
MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
|
||||
stream = _stream;
|
||||
producerDataChannel = _producerDataChannel;
|
||||
threadId = _threadId;
|
||||
threadName = "MigrationThread-" + threadId;
|
||||
logger = org.apache.log4j.Logger.getLogger(MigrationThread.class.getName());
|
||||
this.setName(threadName);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload");
|
||||
Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message");
|
||||
Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic");
|
||||
Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator");
|
||||
Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext");
|
||||
Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next");
|
||||
Object iterator = ConsumerIteratorMethod.invoke(stream);
|
||||
|
||||
while (((Boolean) KafkaStreamHasNextMethod_07.invoke(iterator)).booleanValue()) {
|
||||
Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator);
|
||||
Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07);
|
||||
Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07);
|
||||
Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07);
|
||||
int size = ((ByteBuffer)payload_07).remaining();
|
||||
byte[] bytes = new byte[size];
|
||||
((ByteBuffer)payload_07).get(bytes);
|
||||
if(logger.isDebugEnabled())
|
||||
logger.debug("Migration thread " + threadId + " sending message of size " + bytes.length + " to topic "+ topic);
|
||||
KeyedMessage<byte[], byte[]> producerData = new KeyedMessage((String)topic, null, bytes);
|
||||
producerDataChannel.sendRequest(producerData);
|
||||
public void shutdown() {
|
||||
logger.info("Migration thread " + threadName + " shutting down");
|
||||
isRunning.set(false);
|
||||
interrupt();
|
||||
try {
|
||||
shutdownComplete.await();
|
||||
} catch (InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of MigrationThread", ie);
|
||||
}
|
||||
logger.info("Migration thread " + threadName + " shutdown complete");
|
||||
}
|
||||
logger.info("Migration thread " + threadName + " finished running");
|
||||
} catch (InvocationTargetException t){
|
||||
logger.fatal("Migration thread failure due to root cause ", t.getCause());
|
||||
} catch (Throwable t){
|
||||
logger.fatal("Migration thread failure due to ", t);
|
||||
} finally {
|
||||
shutdownComplete.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
logger.info("Migration thread " + threadName + " shutting down");
|
||||
isRunning.set(false);
|
||||
interrupt();
|
||||
try {
|
||||
shutdownComplete.await();
|
||||
} catch(InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of MigrationThread", ie);
|
||||
}
|
||||
logger.info("Migration thread " + threadName + " shutdown complete");
|
||||
}
|
||||
}
|
||||
static class ProducerThread extends Thread {
|
||||
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
|
||||
private final Producer<byte[], byte[]> producer;
|
||||
private final int threadId;
|
||||
private String threadName;
|
||||
private org.apache.log4j.Logger logger;
|
||||
private CountDownLatch shutdownComplete = new CountDownLatch(1);
|
||||
private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
|
||||
|
||||
static class ProducerThread extends Thread {
|
||||
private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
|
||||
private final Producer<byte[], byte[]> producer;
|
||||
private final int threadId;
|
||||
private String threadName;
|
||||
private org.apache.log4j.Logger logger;
|
||||
private CountDownLatch shutdownComplete = new CountDownLatch(1);
|
||||
private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage("shutdown", null, null);
|
||||
|
||||
public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel,
|
||||
Producer<byte[], byte[]> _producer,
|
||||
int _threadId) {
|
||||
producerDataChannel = _producerDataChannel;
|
||||
producer = _producer;
|
||||
threadId = _threadId;
|
||||
threadName = "ProducerThread-" + threadId;
|
||||
logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
|
||||
this.setName(threadName);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try{
|
||||
while(true) {
|
||||
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
|
||||
if(!data.equals(shutdownMessage)) {
|
||||
producer.send(data);
|
||||
if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message())));
|
||||
}
|
||||
else
|
||||
break;
|
||||
public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel,
|
||||
Producer<byte[], byte[]> producer,
|
||||
int threadId) {
|
||||
this.producerDataChannel = producerDataChannel;
|
||||
this.producer = producer;
|
||||
this.threadId = threadId;
|
||||
threadName = "ProducerThread-" + threadId;
|
||||
logger = org.apache.log4j.Logger.getLogger(ProducerThread.class.getName());
|
||||
this.setName(threadName);
|
||||
}
|
||||
logger.info("Producer thread " + threadName + " finished running");
|
||||
} catch (Throwable t){
|
||||
logger.fatal("Producer thread failure due to ", t);
|
||||
} finally {
|
||||
shutdownComplete.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
try {
|
||||
logger.info("Producer thread " + threadName + " shutting down");
|
||||
producerDataChannel.sendRequest(shutdownMessage);
|
||||
} catch(InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of ProducerThread", ie);
|
||||
}
|
||||
}
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
|
||||
if (!data.equals(shutdownMessage)) {
|
||||
producer.send(data);
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug(String.format("Sending message %s", new String(data.message())));
|
||||
} else
|
||||
break;
|
||||
}
|
||||
logger.info("Producer thread " + threadName + " finished running");
|
||||
} catch (Throwable t) {
|
||||
logger.fatal("Producer thread failure due to ", t);
|
||||
} finally {
|
||||
shutdownComplete.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void awaitShutdown() {
|
||||
try {
|
||||
shutdownComplete.await();
|
||||
producer.close();
|
||||
logger.info("Producer thread " + threadName + " shutdown complete");
|
||||
} catch(InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of ProducerThread", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
public void shutdown() {
|
||||
try {
|
||||
logger.info("Producer thread " + threadName + " shutting down");
|
||||
producerDataChannel.sendRequest(shutdownMessage);
|
||||
} catch (InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of ProducerThread", ie);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A parent-last class loader that will try the child class loader first and then the parent.
|
||||
* This takes a fair bit of doing because java really prefers parent-first.
|
||||
*/
|
||||
private static class ParentLastURLClassLoader extends ClassLoader {
|
||||
private ChildURLClassLoader childClassLoader;
|
||||
|
||||
/**
|
||||
* This class allows me to call findClass on a class loader
|
||||
*/
|
||||
private static class FindClassClassLoader extends ClassLoader {
|
||||
public FindClassClassLoader(ClassLoader parent) {
|
||||
super(parent);
|
||||
}
|
||||
@Override
|
||||
public Class<?> findClass(String name) throws ClassNotFoundException {
|
||||
return super.findClass(name);
|
||||
}
|
||||
public void awaitShutdown() {
|
||||
try {
|
||||
shutdownComplete.await();
|
||||
producer.close();
|
||||
logger.info("Producer thread " + threadName + " shutdown complete");
|
||||
} catch (InterruptedException ie) {
|
||||
logger.warn("Interrupt during shutdown of ProducerThread", ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class delegates (child then parent) for the findClass method for a URLClassLoader.
|
||||
* We need this because findClass is protected in URLClassLoader
|
||||
* A parent-last class loader that will try the child class loader first and then the parent.
|
||||
* This takes a fair bit of doing because java really prefers parent-first.
|
||||
*/
|
||||
private static class ChildURLClassLoader extends URLClassLoader {
|
||||
private FindClassClassLoader realParent;
|
||||
public ChildURLClassLoader( URL[] urls, FindClassClassLoader realParent) {
|
||||
super(urls, null);
|
||||
this.realParent = realParent;
|
||||
}
|
||||
private static class ParentLastURLClassLoader extends ClassLoader {
|
||||
private ChildURLClassLoader childClassLoader;
|
||||
|
||||
@Override
|
||||
public Class<?> findClass(String name) throws ClassNotFoundException {
|
||||
try{
|
||||
// first try to use the URLClassLoader findClass
|
||||
return super.findClass(name);
|
||||
/**
|
||||
* This class allows me to call findClass on a class loader
|
||||
*/
|
||||
private static class FindClassClassLoader extends ClassLoader {
|
||||
public FindClassClassLoader(ClassLoader parent) {
|
||||
super(parent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> findClass(String name) throws ClassNotFoundException {
|
||||
return super.findClass(name);
|
||||
}
|
||||
}
|
||||
catch( ClassNotFoundException e ) {
|
||||
// if that fails, we ask our real parent class loader to load the class (we give up)
|
||||
return realParent.loadClass(name);
|
||||
|
||||
/**
|
||||
* This class delegates (child then parent) for the findClass method for a URLClassLoader.
|
||||
* We need this because findClass is protected in URLClassLoader
|
||||
*/
|
||||
private static class ChildURLClassLoader extends URLClassLoader {
|
||||
private FindClassClassLoader realParent;
|
||||
|
||||
public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
|
||||
super(urls, null);
|
||||
this.realParent = realParent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> findClass(String name) throws ClassNotFoundException {
|
||||
try {
|
||||
// first try to use the URLClassLoader findClass
|
||||
return super.findClass(name);
|
||||
} catch (ClassNotFoundException e) {
|
||||
// if that fails, we ask our real parent class loader to load the class (we give up)
|
||||
return realParent.loadClass(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ParentLastURLClassLoader(URL[] urls) {
|
||||
super(Thread.currentThread().getContextClassLoader());
|
||||
childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
|
||||
}
|
||||
public ParentLastURLClassLoader(URL[] urls) {
|
||||
super(Thread.currentThread().getContextClassLoader());
|
||||
childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
|
||||
try {
|
||||
// first we try to find a class inside the child class loader
|
||||
return childClassLoader.findClass(name);
|
||||
}
|
||||
catch( ClassNotFoundException e ) {
|
||||
// didn't find it, try the parent
|
||||
return super.loadClass(name, resolve);
|
||||
}
|
||||
@Override
|
||||
protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
|
||||
try {
|
||||
// first we try to find a class inside the child class loader
|
||||
return childClassLoader.findClass(name);
|
||||
} catch (ClassNotFoundException e) {
|
||||
// didn't find it, try the parent
|
||||
return super.loadClass(name, resolve);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -16,52 +16,50 @@
|
|||
*/
|
||||
package kafka.examples;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ShutdownableThread;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
|
||||
public class Consumer extends ShutdownableThread
|
||||
{
|
||||
private final KafkaConsumer<Integer, String> consumer;
|
||||
private final String topic;
|
||||
import java.util.Collections;
|
||||
import java.util.Properties;
|
||||
|
||||
public Consumer(String topic)
|
||||
{
|
||||
super("KafkaConsumerExample", false);
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
||||
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
||||
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
public class Consumer extends ShutdownableThread {
|
||||
private final KafkaConsumer<Integer, String> consumer;
|
||||
private final String topic;
|
||||
|
||||
consumer = new KafkaConsumer<>(props);
|
||||
this.topic = topic;
|
||||
}
|
||||
public Consumer(String topic) {
|
||||
super("KafkaConsumerExample", false);
|
||||
Properties props = new Properties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
||||
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
||||
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
|
||||
@Override
|
||||
public void doWork() {
|
||||
consumer.subscribe(Collections.singletonList(this.topic));
|
||||
ConsumerRecords<Integer, String> records = consumer.poll(1000);
|
||||
for (ConsumerRecord<Integer, String> record : records) {
|
||||
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
|
||||
consumer = new KafkaConsumer<>(props);
|
||||
this.topic = topic;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public void doWork() {
|
||||
consumer.subscribe(Collections.singletonList(this.topic));
|
||||
ConsumerRecords<Integer, String> records = consumer.poll(1000);
|
||||
for (ConsumerRecord<Integer, String> record : records) {
|
||||
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInterruptible() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public String name() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInterruptible() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -16,16 +16,14 @@
|
|||
*/
|
||||
package kafka.examples;
|
||||
|
||||
public class KafkaConsumerProducerDemo implements KafkaProperties
|
||||
{
|
||||
public static void main(String[] args)
|
||||
{
|
||||
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
|
||||
Producer producerThread = new Producer(KafkaProperties.topic, isAsync);
|
||||
producerThread.start();
|
||||
public class KafkaConsumerProducerDemo implements KafkaProperties {
|
||||
public static void main(String[] args) {
|
||||
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;
|
||||
Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync);
|
||||
producerThread.start();
|
||||
|
||||
Consumer consumerThread = new Consumer(KafkaProperties.topic);
|
||||
consumerThread.start();
|
||||
|
||||
}
|
||||
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
|
||||
consumerThread.start();
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -16,17 +16,16 @@
|
|||
*/
|
||||
package kafka.examples;
|
||||
|
||||
public interface KafkaProperties
|
||||
{
|
||||
final static String zkConnect = "127.0.0.1:2181";
|
||||
final static String groupId = "group1";
|
||||
final static String topic = "topic1";
|
||||
final static String kafkaServerURL = "localhost";
|
||||
final static int kafkaServerPort = 9092;
|
||||
final static int kafkaProducerBufferSize = 64*1024;
|
||||
final static int connectionTimeOut = 100000;
|
||||
final static int reconnectInterval = 10000;
|
||||
final static String topic2 = "topic2";
|
||||
final static String topic3 = "topic3";
|
||||
final static String clientId = "SimpleConsumerDemoClient";
|
||||
public interface KafkaProperties {
|
||||
String ZK_CONNECT = "127.0.0.1:2181";
|
||||
String GROUP_ID = "group1";
|
||||
String TOPIC = "topic1";
|
||||
String KAFKA_SERVER_URL = "localhost";
|
||||
int KAFKA_SERVER_PORT = 9092;
|
||||
int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024;
|
||||
int CONNECTION_TIMEOUT = 100000;
|
||||
int RECONNECT_INTERVAL = 10000;
|
||||
String TOPIC2 = "topic2";
|
||||
String TOPIC3 = "topic3";
|
||||
String CLIENT_ID = "SimpleConsumerDemoClient";
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -16,90 +16,86 @@
|
|||
*/
|
||||
package kafka.examples;
|
||||
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
public class Producer extends Thread
|
||||
{
|
||||
private final KafkaProducer<Integer, String> producer;
|
||||
private final String topic;
|
||||
private final Boolean isAsync;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public Producer(String topic, Boolean isAsync)
|
||||
{
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("client.id", "DemoProducer");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
producer = new KafkaProducer<Integer, String>(props);
|
||||
this.topic = topic;
|
||||
this.isAsync = isAsync;
|
||||
}
|
||||
public class Producer extends Thread {
|
||||
private final KafkaProducer<Integer, String> producer;
|
||||
private final String topic;
|
||||
private final Boolean isAsync;
|
||||
|
||||
public void run() {
|
||||
int messageNo = 1;
|
||||
while(true)
|
||||
{
|
||||
String messageStr = "Message_" + messageNo;
|
||||
long startTime = System.currentTimeMillis();
|
||||
if (isAsync) { // Send asynchronously
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
|
||||
} else { // Send synchronously
|
||||
try {
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr)).get();
|
||||
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
++messageNo;
|
||||
public Producer(String topic, Boolean isAsync) {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", "localhost:9092");
|
||||
props.put("client.id", "DemoProducer");
|
||||
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
|
||||
producer = new KafkaProducer<Integer, String>(props);
|
||||
this.topic = topic;
|
||||
this.isAsync = isAsync;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
int messageNo = 1;
|
||||
while (true) {
|
||||
String messageStr = "Message_" + messageNo;
|
||||
long startTime = System.currentTimeMillis();
|
||||
if (isAsync) { // Send asynchronously
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
|
||||
} else { // Send synchronously
|
||||
try {
|
||||
producer.send(new ProducerRecord<Integer, String>(topic,
|
||||
messageNo,
|
||||
messageStr)).get();
|
||||
System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
++messageNo;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DemoCallBack implements Callback {
|
||||
|
||||
private long startTime;
|
||||
private int key;
|
||||
private String message;
|
||||
private long startTime;
|
||||
private int key;
|
||||
private String message;
|
||||
|
||||
public DemoCallBack(long startTime, int key, String message) {
|
||||
this.startTime = startTime;
|
||||
this.key = key;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
/**
|
||||
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
|
||||
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
|
||||
* non-null.
|
||||
*
|
||||
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
|
||||
* occurred.
|
||||
* @param exception The exception thrown during processing of this record. Null if no error occurred.
|
||||
*/
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (metadata != null) {
|
||||
System.out.println(
|
||||
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
|
||||
"), " +
|
||||
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
|
||||
} else {
|
||||
exception.printStackTrace();
|
||||
public DemoCallBack(long startTime, int key, String message) {
|
||||
this.startTime = startTime;
|
||||
this.key = key;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
/**
|
||||
* A callback method the user can implement to provide asynchronous handling of request completion. This method will
|
||||
* be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
|
||||
* non-null.
|
||||
*
|
||||
* @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
|
||||
* occurred.
|
||||
* @param exception The exception thrown during processing of this record. Null if no error occurred.
|
||||
*/
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
long elapsedTime = System.currentTimeMillis() - startTime;
|
||||
if (metadata != null) {
|
||||
System.out.println(
|
||||
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
|
||||
"), " +
|
||||
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
|
||||
} else {
|
||||
exception.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
|
@ -19,74 +19,74 @@ package kafka.examples;
|
|||
import kafka.api.FetchRequest;
|
||||
import kafka.api.FetchRequestBuilder;
|
||||
import kafka.javaapi.FetchResponse;
|
||||
import kafka.javaapi.consumer.SimpleConsumer;
|
||||
import kafka.javaapi.message.ByteBufferMessageSet;
|
||||
import kafka.message.MessageAndOffset;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import kafka.javaapi.consumer.SimpleConsumer;
|
||||
import kafka.javaapi.message.ByteBufferMessageSet;
|
||||
import kafka.message.MessageAndOffset;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SimpleConsumerDemo {
|
||||
|
||||
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
|
||||
for(MessageAndOffset messageAndOffset: messageSet) {
|
||||
ByteBuffer payload = messageAndOffset.message().payload();
|
||||
byte[] bytes = new byte[payload.limit()];
|
||||
payload.get(bytes);
|
||||
System.out.println(new String(bytes, "UTF-8"));
|
||||
}
|
||||
}
|
||||
|
||||
private static void generateData() {
|
||||
Producer producer2 = new Producer(KafkaProperties.topic2, false);
|
||||
producer2.start();
|
||||
Producer producer3 = new Producer(KafkaProperties.topic3, false);
|
||||
producer3.start();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
|
||||
for (MessageAndOffset messageAndOffset : messageSet) {
|
||||
ByteBuffer payload = messageAndOffset.message().payload();
|
||||
byte[] bytes = new byte[payload.limit()];
|
||||
payload.get(bytes);
|
||||
System.out.println(new String(bytes, "UTF-8"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
generateData();
|
||||
|
||||
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
|
||||
KafkaProperties.kafkaServerPort,
|
||||
KafkaProperties.connectionTimeOut,
|
||||
KafkaProperties.kafkaProducerBufferSize,
|
||||
KafkaProperties.clientId);
|
||||
|
||||
System.out.println("Testing single fetch");
|
||||
FetchRequest req = new FetchRequestBuilder()
|
||||
.clientId(KafkaProperties.clientId)
|
||||
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
|
||||
private static void generateData() {
|
||||
Producer producer2 = new Producer(KafkaProperties.TOPIC2, false);
|
||||
producer2.start();
|
||||
Producer producer3 = new Producer(KafkaProperties.TOPIC3, false);
|
||||
producer3.start();
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
generateData();
|
||||
|
||||
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.KAFKA_SERVER_URL,
|
||||
KafkaProperties.KAFKA_SERVER_PORT,
|
||||
KafkaProperties.CONNECTION_TIMEOUT,
|
||||
KafkaProperties.KAFKA_PRODUCER_BUFFER_SIZE,
|
||||
KafkaProperties.CLIENT_ID);
|
||||
|
||||
System.out.println("Testing single fetch");
|
||||
FetchRequest req = new FetchRequestBuilder()
|
||||
.clientId(KafkaProperties.CLIENT_ID)
|
||||
.addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
|
||||
.build();
|
||||
FetchResponse fetchResponse = simpleConsumer.fetch(req);
|
||||
printMessages(fetchResponse.messageSet(KafkaProperties.topic2, 0));
|
||||
FetchResponse fetchResponse = simpleConsumer.fetch(req);
|
||||
printMessages(fetchResponse.messageSet(KafkaProperties.TOPIC2, 0));
|
||||
|
||||
System.out.println("Testing single multi-fetch");
|
||||
Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
|
||||
topicMap.put(KafkaProperties.topic2, Collections.singletonList(0));
|
||||
topicMap.put(KafkaProperties.topic3, Collections.singletonList(0));
|
||||
req = new FetchRequestBuilder()
|
||||
.clientId(KafkaProperties.clientId)
|
||||
.addFetch(KafkaProperties.topic2, 0, 0L, 100)
|
||||
.addFetch(KafkaProperties.topic3, 0, 0L, 100)
|
||||
System.out.println("Testing single multi-fetch");
|
||||
Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
|
||||
topicMap.put(KafkaProperties.TOPIC2, Collections.singletonList(0));
|
||||
topicMap.put(KafkaProperties.TOPIC3, Collections.singletonList(0));
|
||||
req = new FetchRequestBuilder()
|
||||
.clientId(KafkaProperties.CLIENT_ID)
|
||||
.addFetch(KafkaProperties.TOPIC2, 0, 0L, 100)
|
||||
.addFetch(KafkaProperties.TOPIC3, 0, 0L, 100)
|
||||
.build();
|
||||
fetchResponse = simpleConsumer.fetch(req);
|
||||
int fetchReq = 0;
|
||||
for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
|
||||
String topic = entry.getKey();
|
||||
for ( Integer offset : entry.getValue()) {
|
||||
System.out.println("Response from fetch request no: " + ++fetchReq);
|
||||
printMessages(fetchResponse.messageSet(topic, offset));
|
||||
}
|
||||
fetchResponse = simpleConsumer.fetch(req);
|
||||
int fetchReq = 0;
|
||||
for (Map.Entry<String, List<Integer>> entry : topicMap.entrySet()) {
|
||||
String topic = entry.getKey();
|
||||
for (Integer offset : entry.getValue()) {
|
||||
System.out.println("Response from fetch request no: " + ++fetchReq);
|
||||
printMessages(fetchResponse.messageSet(topic, offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue