MINOR: Clean up the sticky partitioner code a bit (#7151)

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Lucas Bradstreet <lucasbradstreet@gmail.com>
This commit is contained in:
Justine Olshan 2019-08-12 14:17:28 -07:00 committed by Colin Patrick McCabe
parent 0c2d1c390d
commit 88087e91dd
4 changed files with 43 additions and 39 deletions

View File

@ -925,7 +925,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
partition = partition(record, serializedKey, serializedValue, cluster); partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition); tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("Retrying because of a new batch, sending the record to topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
} }
// producer callback will make sure to call both 'callback' and interceptor callback // producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

View File

@ -51,14 +51,14 @@ public class StickyPartitionCache {
// triggered the new batch matches the sticky partition that needs to be changed. // triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) { if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
if (availablePartitions.size() < 1) { if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size(); newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) { } else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition(); newPart = availablePartitions.get(0).partition();
} else { } else {
while (newPart == null || newPart.equals(oldPart)) { while (newPart == null || newPart.equals(oldPart)) {
random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition(); newPart = availablePartitions.get(random % availablePartitions.size()).partition();
} }
} }

View File

@ -32,6 +32,10 @@
</li> </li>
<li>The internal <code>PartitionAssignor</code> interface has been deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the public API. Users <li>The internal <code>PartitionAssignor</code> interface has been deprecated and replaced with a new <code>ConsumerPartitionAssignor</code> in the public API. Users
implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.</li> implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.</li>
<li>The <code>DefaultPartitioner</code> now uses a sticky partitioning strategy. This means that records for specific topic with null keys and no assigned partition
will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but
it may result in uneven distribution of records across partitions in edge cases. Generally users will not be impacted, but this difference may be noticeable in tests and
other situations producing records for a very short amount of time.</li>
</ul> </ul>
<h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4> <h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>