mirror of https://github.com/apache/kafka.git
Updated comments to reflect that throttler is not message-specific
This commit is contained in:
parent
6842ed1ffa
commit
d586fb0eb6
|
@ -19,7 +19,7 @@ package org.apache.kafka.clients.tools;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class helps producers throttle their maximum message throughput.
|
* This class helps producers throttle throughput.
|
||||||
*
|
*
|
||||||
* The resulting average throughput will be approximately
|
* The resulting average throughput will be approximately
|
||||||
* min(targetThroughput, maximumPossibleThroughput)
|
* min(targetThroughput, maximumPossibleThroughput)
|
||||||
|
@ -31,7 +31,9 @@ package org.apache.kafka.clients.tools;
|
||||||
* throttler.throttle();
|
* throttler.throttle();
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
* </pre>
|
* </pre>
|
||||||
|
*
|
||||||
|
* Note that this can be used to throttle message throughput or data throughput.
|
||||||
*/
|
*/
|
||||||
public class ThroughputThrottler {
|
public class ThroughputThrottler {
|
||||||
|
|
||||||
|
@ -44,25 +46,35 @@ public class ThroughputThrottler {
|
||||||
long targetThroughput = -1;
|
long targetThroughput = -1;
|
||||||
long startMs;
|
long startMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param targetThroughput Can be messages/sec or bytes/sec
|
||||||
|
* @param startMs When the very first message is sent
|
||||||
|
*/
|
||||||
public ThroughputThrottler(long targetThroughput, long startMs) {
|
public ThroughputThrottler(long targetThroughput, long startMs) {
|
||||||
this.startMs = startMs;
|
this.startMs = startMs;
|
||||||
this.targetThroughput = targetThroughput;
|
this.targetThroughput = targetThroughput;
|
||||||
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
|
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shouldThrottle(long messageNum, long sendStartMs) {
|
/**
|
||||||
|
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
|
||||||
|
* messages produced so far if you want to throttle message throughput.
|
||||||
|
* @param sendStartMs timestamp of the most recently sent message
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
|
||||||
if (this.targetThroughput <= 0) {
|
if (this.targetThroughput <= 0) {
|
||||||
// No throttling in this case
|
// No throttling in this case
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
float elapsedMs = (sendStartMs - startMs) / 1000.f;
|
float elapsedMs = (sendStartMs - startMs) / 1000.f;
|
||||||
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput;
|
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void throttle() {
|
public void throttle() {
|
||||||
// throttle message throughput by sleeping, on average,
|
// throttle throughput by sleeping, on average,
|
||||||
// (1 / this.throughput) seconds between each sent message
|
// (1 / this.throughput) seconds between "things sent"
|
||||||
sleepDeficitNs += sleepTimeNs;
|
sleepDeficitNs += sleepTimeNs;
|
||||||
|
|
||||||
// If enough sleep deficit has accumulated, sleep a little
|
// If enough sleep deficit has accumulated, sleep a little
|
||||||
|
|
Loading…
Reference in New Issue