Updated comments to reflect that throttler is not message-specific

This commit is contained in:
Geoff Anderson 2015-06-03 18:22:34 -07:00
parent 6842ed1ffa
commit d586fb0eb6
1 changed files with 18 additions and 6 deletions

View File

@ -19,7 +19,7 @@ package org.apache.kafka.clients.tools;
/**
* This class helps producers throttle their maximum message throughput.
* This class helps producers throttle throughput.
*
* The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput)
@ -31,7 +31,9 @@ package org.apache.kafka.clients.tools;
* throttler.throttle();
* }
* }
* </pre>
* </pre>
*
* Note that this can be used to throttle message throughput or data throughput.
*/
public class ThroughputThrottler {
@ -44,25 +46,35 @@ public class ThroughputThrottler {
long targetThroughput = -1;
long startMs;
/**
* @param targetThroughput Can be messages/sec or bytes/sec
* @param startMs When the very first message is sent
*/
public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
}
public boolean shouldThrottle(long messageNum, long sendStartMs) {
/**
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
* messages produced so far if you want to throttle message throughput.
* @param sendStartMs timestamp of the most recently sent message
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) {
// No throttling in this case
return false;
}
float elapsedMs = (sendStartMs - startMs) / 1000.f;
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput;
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
}
public void throttle() {
// throttle message throughput by sleeping, on average,
// (1 / this.throughput) seconds between each sent message
// throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs;
// If enough sleep deficit has accumulated, sleep a little