Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j.

This commit is contained in:
Geoff Anderson 2015-07-20 16:33:14 -07:00
parent a9e6a14a35
commit 47b7b64e0f
7 changed files with 117 additions and 40 deletions

View File

@ -65,6 +65,16 @@ do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
# classpath addition for release
for file in $base_dir/libs/*.jar;
do

View File

@ -203,20 +203,20 @@ for ( sv in ['2_9_1', '2_9_2', '2_10_5', '2_11_6'] ) {
}
}
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_9_1', 'jar_core_2_9_2', 'jar_core_2_10_5', 'jar_core_2_11_6', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
}
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_9_1', 'srcJar_2_9_2', 'srcJar_2_10_5', 'srcJar_2_11_6', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_9_1', 'docsJar_2_9_2', 'docsJar_2_10_5', 'docsJar_2_11_6', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test']) {
tasks.create(name: "testAll", dependsOn: ['test_core_2_9_1', 'test_core_2_9_2', 'test_core_2_10_5', 'test_core_2_11_6', 'clients:test', 'log4j-appender:test', 'tools:test']) {
}
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_9_1', 'releaseTarGz_2_9_2', 'releaseTarGz_2_10_5', 'releaseTarGz_2_11_6']) {
}
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_9_1', 'uploadCoreArchives_2_9_2', 'uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_6', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
}
project(':core') {
@ -378,8 +378,6 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.7'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1'
testCompile 'junit:junit:4.6'
testRuntime "$slf4jlog4j"
@ -419,6 +417,56 @@ project(':clients') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
project(':tools') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-tools"
dependencies {
compile project(':clients')
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
compile "$slf4jlog4j"
testCompile 'junit:junit:4.6'
testCompile project(path: ':clients', configuration: 'archives')
}
task testJar(type: Jar) {
classifier = 'test'
from sourceSets.test.output
}
test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}
javadoc {
include "**/org/apache/kafka/tools/*"
}
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${scalaVersion}"
}
jar {
dependsOn 'copyDependantLibs'
}
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
}
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
project(':log4j-appender') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-log4j-appender"

View File

@ -92,7 +92,7 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" />
<allow pkg="com.fasterxml.jackson.core" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>

View File

@ -14,4 +14,5 @@
// limitations under the License.
apply from: file('scala.gradle')
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender'

View File

@ -24,9 +24,12 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.simple.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -191,33 +194,46 @@ public class VerifiableProducer {
String errorString(Exception e, String key, String value, Long nowMs) {
assert e != null : "Expected non-null exception.";
JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_error");
Map<String, Object> errorData = new HashMap<>();
errorData.put("class", this.getClass().toString());
errorData.put("name", "producer_send_error");
obj.put("time_ms", nowMs);
obj.put("exception", e.getClass().toString());
obj.put("message", e.getMessage());
obj.put("topic", this.topic);
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
errorData.put("time_ms", nowMs);
errorData.put("exception", e.getClass().toString());
errorData.put("message", e.getMessage());
errorData.put("topic", this.topic);
errorData.put("key", key);
errorData.put("value", value);
return toJsonString(errorData);
}
String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
assert recordMetadata != null : "Expected non-null recordMetadata object.";
JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_success");
Map<String, Object> successData = new HashMap<>();
successData.put("class", this.getClass().toString());
successData.put("name", "producer_send_success");
obj.put("time_ms", nowMs);
obj.put("topic", this.topic);
obj.put("partition", recordMetadata.partition());
obj.put("offset", recordMetadata.offset());
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
successData.put("time_ms", nowMs);
successData.put("topic", this.topic);
successData.put("partition", recordMetadata.partition());
successData.put("offset", recordMetadata.offset());
successData.put("key", key);
successData.put("value", value);
return toJsonString(successData);
}
private String toJsonString(Map<String, Object> data) {
String json;
try {
ObjectMapper mapper = new ObjectMapper();
json = mapper.writeValueAsString(data);
} catch(JsonProcessingException e) {
json = "Bad data can't be written as json: " + e.getMessage();
}
return json;
}
/** Callback which prints errors to stdout when the producer fails to send. */
@ -261,14 +277,16 @@ public class VerifiableProducer {
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data");
obj.put("sent", producer.numSent);
obj.put("acked", producer.numAcked);
obj.put("target_throughput", producer.throughput);
obj.put("avg_throughput", avgThroughput);
System.out.println(obj.toJSONString());
Map<String, Object> data = new HashMap<>();
data.put("class", producer.getClass().toString());
data.put("name", "tool_data");
data.put("sent", producer.numSent);
data.put("acked", producer.numAcked);
data.put("target_throughput", producer.throughput);
data.put("avg_throughput", avgThroughput);
System.out.println(producer.toJsonString(data));
}
});