mirror of https://github.com/apache/kafka.git
KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools
Author: Grant Henke <granthenke@gmail.com> Reviewers: Gwen Shapira, Ewen Cheslack-Postava Closes #310 from granthenke/tools-packaging
This commit is contained in:
parent
fc4ef47910
commit
2e4aed7070
|
@ -17,4 +17,4 @@
|
|||
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
|
||||
export KAFKA_HEAP_OPTS="-Xmx512M"
|
||||
fi
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer $@
|
||||
|
|
|
@ -105,14 +105,14 @@
|
|||
<subpackage name="producer">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="net.sourceforge.argparse4j" />
|
||||
<allow pkg="org.apache.log4j" />
|
||||
</subpackage>
|
||||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="net.sourceforge.argparse4j" />
|
||||
<allow pkg="org.apache.log4j" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="streams">
|
||||
|
|
|
@ -38,7 +38,7 @@ class KafkaLog4jAppender(BackgroundThreadService):
|
|||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableLog4jAppender" \
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \
|
||||
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
|
||||
if self.max_messages > 0:
|
||||
cmd += " --max-messages %s" % str(self.max_messages)
|
||||
|
|
|
@ -46,7 +46,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
|
|||
def _worker(self, idx, node):
|
||||
args = self.args.copy()
|
||||
args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id})
|
||||
cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance " \
|
||||
cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
|
||||
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
|
||||
|
||||
self.security_config.setup_node(node)
|
||||
|
|
|
@ -3,14 +3,14 @@
|
|||
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
||||
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
||||
* License. You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.tools;
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
|
@ -15,29 +15,29 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.tools;
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
|
||||
/**
|
||||
* This class helps producers throttle throughput.
|
||||
*
|
||||
*
|
||||
* If targetThroughput >= 0, the resulting average throughput will be approximately
|
||||
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
|
||||
* no throttling will occur.
|
||||
*
|
||||
* no throttling will occur.
|
||||
*
|
||||
* To use, do this between successive send attempts:
|
||||
* <pre>
|
||||
* {@code
|
||||
* {@code
|
||||
* if (throttler.shouldThrottle(...)) {
|
||||
* throttler.throttle();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* Note that this can be used to throttle message throughput or data throughput.
|
||||
*/
|
||||
public class ThroughputThrottler {
|
||||
|
||||
|
||||
private static final long NS_PER_MS = 1000000L;
|
||||
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
|
||||
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
|
||||
|
@ -55,7 +55,7 @@ public class ThroughputThrottler {
|
|||
this.startMs = startMs;
|
||||
this.targetThroughput = targetThroughput;
|
||||
this.sleepTimeNs = targetThroughput > 0 ?
|
||||
NS_PER_SEC / targetThroughput :
|
||||
NS_PER_SEC / targetThroughput :
|
||||
Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ public class ThroughputThrottler {
|
|||
|
||||
/**
|
||||
* Occasionally blocks for small amounts of time to achieve targetThroughput.
|
||||
*
|
||||
*
|
||||
* Note that if targetThroughput is 0, this will block extremely aggressively.
|
||||
*/
|
||||
public void throttle() {
|
||||
|
@ -89,7 +89,7 @@ public class ThroughputThrottler {
|
|||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// throttle throughput by sleeping, on average,
|
||||
// (1 / this.throughput) seconds between "things sent"
|
||||
sleepDeficitNs += sleepTimeNs;
|
||||
|
@ -114,5 +114,4 @@ public class ThroughputThrottler {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -14,7 +14,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.tools;
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
|
||||
import net.sourceforge.argparse4j.ArgumentParsers;
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.tools;
|
||||
package org.apache.kafka.tools;
|
||||
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -46,29 +46,29 @@ import net.sourceforge.argparse4j.inf.Namespace;
|
|||
* with end-to-end correctness tests by making externally visible which messages have been
|
||||
* acked and which have not.
|
||||
*
|
||||
* When used as a command-line tool, it produces increasing integers. It will produce a
|
||||
* When used as a command-line tool, it produces increasing integers. It will produce a
|
||||
* fixed number of messages unless the default max-messages -1 is used, in which case
|
||||
* it produces indefinitely.
|
||||
*
|
||||
*
|
||||
* If logging is left enabled, log output on stdout can be easily ignored by checking
|
||||
* whether a given line is valid JSON.
|
||||
*/
|
||||
public class VerifiableProducer {
|
||||
|
||||
|
||||
String topic;
|
||||
private Producer<String, String> producer;
|
||||
// If maxMessages < 0, produce until the process is killed externally
|
||||
private long maxMessages = -1;
|
||||
|
||||
|
||||
// Number of messages for which acks were received
|
||||
private long numAcked = 0;
|
||||
|
||||
|
||||
// Number of send attempts
|
||||
private long numSent = 0;
|
||||
|
||||
|
||||
// Throttle message throughput if this is set >= 0
|
||||
private long throughput;
|
||||
|
||||
|
||||
// Hook to trigger producing thread to stop sending messages
|
||||
private boolean stopProducing = false;
|
||||
|
||||
|
@ -102,7 +102,7 @@ public class VerifiableProducer {
|
|||
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
|
||||
.dest("brokerList")
|
||||
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
|
||||
|
||||
|
||||
parser.addArgument("--max-messages")
|
||||
.action(store())
|
||||
.required(false)
|
||||
|
@ -138,12 +138,12 @@ public class VerifiableProducer {
|
|||
|
||||
return parser;
|
||||
}
|
||||
|
||||
|
||||
/** Construct a VerifiableProducer object from command-line arguments. */
|
||||
public static VerifiableProducer createFromArgs(String[] args) {
|
||||
ArgumentParser parser = argParser();
|
||||
VerifiableProducer producer = null;
|
||||
|
||||
|
||||
try {
|
||||
Namespace res;
|
||||
res = parser.parseArgs(args);
|
||||
|
@ -180,10 +180,10 @@ public class VerifiableProducer {
|
|||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
||||
|
||||
/** Produce a message with given key and value. */
|
||||
public void send(String key, String value) {
|
||||
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
|
||||
|
@ -197,12 +197,12 @@ public class VerifiableProducer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Close the producer to flush any remaining messages. */
|
||||
public void close() {
|
||||
producer.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return JSON string encapsulating basic information about the exception, as well
|
||||
* as the key and value which triggered the exception.
|
||||
|
@ -220,10 +220,10 @@ public class VerifiableProducer {
|
|||
errorData.put("topic", this.topic);
|
||||
errorData.put("key", key);
|
||||
errorData.put("value", value);
|
||||
|
||||
|
||||
return toJsonString(errorData);
|
||||
}
|
||||
|
||||
|
||||
String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
|
||||
assert recordMetadata != null : "Expected non-null recordMetadata object.";
|
||||
|
||||
|
@ -237,10 +237,10 @@ public class VerifiableProducer {
|
|||
successData.put("offset", recordMetadata.offset());
|
||||
successData.put("key", key);
|
||||
successData.put("value", value);
|
||||
|
||||
|
||||
return toJsonString(successData);
|
||||
}
|
||||
|
||||
|
||||
private String toJsonString(Map<String, Object> data) {
|
||||
String json;
|
||||
try {
|
||||
|
@ -251,18 +251,18 @@ public class VerifiableProducer {
|
|||
}
|
||||
return json;
|
||||
}
|
||||
|
||||
|
||||
/** Callback which prints errors to stdout when the producer fails to send. */
|
||||
private class PrintInfoCallback implements Callback {
|
||||
|
||||
|
||||
private String key;
|
||||
private String value;
|
||||
|
||||
|
||||
PrintInfoCallback(String key, String value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
|
||||
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
||||
synchronized (System.out) {
|
||||
if (e == null) {
|
||||
|
@ -274,9 +274,9 @@ public class VerifiableProducer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
|
||||
|
||||
final VerifiableProducer producer = createFromArgs(args);
|
||||
final long startMs = System.currentTimeMillis();
|
||||
boolean infinite = producer.maxMessages < 0;
|
||||
|
@ -286,14 +286,14 @@ public class VerifiableProducer {
|
|||
public void run() {
|
||||
// Trigger main thread to stop producing messages
|
||||
producer.stopProducing = true;
|
||||
|
||||
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
|
||||
// Print a summary
|
||||
long stopMs = System.currentTimeMillis();
|
||||
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
||||
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("class", producer.getClass().toString());
|
||||
data.put("name", "tool_data");
|
||||
|
@ -301,7 +301,7 @@ public class VerifiableProducer {
|
|||
data.put("acked", producer.numAcked);
|
||||
data.put("target_throughput", producer.throughput);
|
||||
data.put("avg_throughput", avgThroughput);
|
||||
|
||||
|
||||
System.out.println(producer.toJsonString(data));
|
||||
}
|
||||
});
|
||||
|
@ -314,11 +314,11 @@ public class VerifiableProducer {
|
|||
}
|
||||
long sendStartMs = System.currentTimeMillis();
|
||||
producer.send(null, String.format("%d", i));
|
||||
|
||||
|
||||
if (throttler.shouldThrottle(i, sendStartMs)) {
|
||||
throttler.throttle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue