mirror of https://github.com/apache/kafka.git
MINOR: remove unused previousPartition from RoundRobinPartitioner.java (#18331)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
875da35ec3
commit
d95cb7d65e
|
@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer;
|
||||||
|
|
||||||
import org.apache.kafka.common.Cluster;
|
import org.apache.kafka.common.Cluster;
|
||||||
import org.apache.kafka.common.PartitionInfo;
|
import org.apache.kafka.common.PartitionInfo;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -37,7 +36,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
*/
|
*/
|
||||||
public class RoundRobinPartitioner implements Partitioner {
|
public class RoundRobinPartitioner implements Partitioner {
|
||||||
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
|
||||||
private final ThreadLocal<TopicPartition> previousPartition = new ThreadLocal<>();
|
|
||||||
|
|
||||||
public void configure(Map<String, ?> configs) {}
|
public void configure(Map<String, ?> configs) {}
|
||||||
|
|
||||||
|
@ -53,14 +51,6 @@ public class RoundRobinPartitioner implements Partitioner {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
||||||
TopicPartition prevPartition = previousPartition.get();
|
|
||||||
if (prevPartition != null) {
|
|
||||||
previousPartition.remove();
|
|
||||||
if (topic.equals(prevPartition.topic())) {
|
|
||||||
return prevPartition.partition();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int nextValue = nextValue(topic);
|
int nextValue = nextValue(topic);
|
||||||
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
|
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
|
||||||
if (!availablePartitions.isEmpty()) {
|
if (!availablePartitions.isEmpty()) {
|
||||||
|
|
Loading…
Reference in New Issue