Updated comments to reflect that throttler is not message-specific

This commit is contained in:
Geoff Anderson 2015-06-03 18:22:34 -07:00
parent 6842ed1ffa
commit d586fb0eb6
1 changed files with 18 additions and 6 deletions

View File

@ -19,7 +19,7 @@ package org.apache.kafka.clients.tools;
/** /**
* This class helps producers throttle their maximum message throughput. * This class helps producers throttle throughput.
* *
* The resulting average throughput will be approximately * The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput) * min(targetThroughput, maximumPossibleThroughput)
@ -31,7 +31,9 @@ package org.apache.kafka.clients.tools;
* throttler.throttle(); * throttler.throttle();
* } * }
* } * }
* </pre> * </pre>
*
* Note that this can be used to throttle message throughput or data throughput.
*/ */
public class ThroughputThrottler { public class ThroughputThrottler {
@ -44,25 +46,35 @@ public class ThroughputThrottler {
long targetThroughput = -1; long targetThroughput = -1;
long startMs; long startMs;
/**
* @param targetThroughput Can be messages/sec or bytes/sec
* @param startMs When the very first message is sent
*/
public ThroughputThrottler(long targetThroughput, long startMs) { public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs; this.startMs = startMs;
this.targetThroughput = targetThroughput; this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput; this.sleepTimeNs = NS_PER_SEC / targetThroughput;
} }
public boolean shouldThrottle(long messageNum, long sendStartMs) { /**
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
* messages produced so far if you want to throttle message throughput.
* @param sendStartMs timestamp of the most recently sent message
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) { if (this.targetThroughput <= 0) {
// No throttling in this case // No throttling in this case
return false; return false;
} }
float elapsedMs = (sendStartMs - startMs) / 1000.f; float elapsedMs = (sendStartMs - startMs) / 1000.f;
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput; return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
} }
public void throttle() { public void throttle() {
// throttle message throughput by sleeping, on average, // throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between each sent message // (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs; sleepDeficitNs += sleepTimeNs;
// If enough sleep deficit has accumulated, sleep a little // If enough sleep deficit has accumulated, sleep a little