mirror of https://github.com/apache/kafka.git
Updated comments to reflect that throttler is not message-specific
This commit is contained in:
parent
6842ed1ffa
commit
d586fb0eb6
|
@ -19,7 +19,7 @@ package org.apache.kafka.clients.tools;
|
|||
|
||||
|
||||
/**
|
||||
* This class helps producers throttle their maximum message throughput.
|
||||
* This class helps producers throttle throughput.
|
||||
*
|
||||
* The resulting average throughput will be approximately
|
||||
* min(targetThroughput, maximumPossibleThroughput)
|
||||
|
@ -31,7 +31,9 @@ package org.apache.kafka.clients.tools;
|
|||
* throttler.throttle();
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
* </pre>
|
||||
*
|
||||
* Note that this can be used to throttle message throughput or data throughput.
|
||||
*/
|
||||
public class ThroughputThrottler {
|
||||
|
||||
|
@ -44,25 +46,35 @@ public class ThroughputThrottler {
|
|||
long targetThroughput = -1;
|
||||
long startMs;
|
||||
|
||||
/**
|
||||
* @param targetThroughput Can be messages/sec or bytes/sec
|
||||
* @param startMs When the very first message is sent
|
||||
*/
|
||||
public ThroughputThrottler(long targetThroughput, long startMs) {
|
||||
this.startMs = startMs;
|
||||
this.targetThroughput = targetThroughput;
|
||||
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
|
||||
}
|
||||
|
||||
public boolean shouldThrottle(long messageNum, long sendStartMs) {
|
||||
/**
|
||||
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
|
||||
* messages produced so far if you want to throttle message throughput.
|
||||
* @param sendStartMs timestamp of the most recently sent message
|
||||
* @return
|
||||
*/
|
||||
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
|
||||
if (this.targetThroughput <= 0) {
|
||||
// No throttling in this case
|
||||
return false;
|
||||
}
|
||||
|
||||
float elapsedMs = (sendStartMs - startMs) / 1000.f;
|
||||
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput;
|
||||
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
|
||||
}
|
||||
|
||||
public void throttle() {
|
||||
// throttle message throughput by sleeping, on average,
|
||||
// (1 / this.throughput) seconds between each sent message
|
||||
// throttle throughput by sleeping, on average,
|
||||
// (1 / this.throughput) seconds between "things sent"
|
||||
sleepDeficitNs += sleepTimeNs;
|
||||
|
||||
// If enough sleep deficit has accumulated, sleep a little
|
||||
|
|
Loading…
Reference in New Issue