MINOR: Rework NewPartitionReassignment public API (#7638)

This patch removes the NewPartitionReassignment#of() method in favor of a simple constructor. Said method was confusing due to breaking two conventions - always returning a non-empty Optional and thus not being used as a static factory method.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Stanislav Kozlovski 2019-11-05 18:34:11 +00:00 committed by Colin Patrick McCabe
parent c552c06aed
commit be58580e14
5 changed files with 16 additions and 21 deletions

View File

@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map, AlterPartitionReassignmentsOptions)}.
@ -32,13 +31,9 @@ public class NewPartitionReassignment {
/**
* @throws IllegalArgumentException if no replicas are supplied
*/
public static Optional<NewPartitionReassignment> of(List<Integer> replicas) {
if (replicas == null || replicas.size() == 0)
public NewPartitionReassignment(List<Integer> targetReplicas) {
if (targetReplicas == null || targetReplicas.size() == 0)
throw new IllegalArgumentException("Cannot create a new partition reassignment without any replicas");
return Optional.of(new NewPartitionReassignment(replicas));
}
private NewPartitionReassignment(List<Integer> targetReplicas) {
this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
}

View File

@ -2040,7 +2040,7 @@ public class KafkaAdminClientTest {
TopicPartition tp2 = new TopicPartition("B", 0);
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<>();
reassignments.put(tp1, Optional.empty());
reassignments.put(tp2, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
reassignments.put(tp2, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3))));
// 1. server returns less responses than number of partitions we sent
AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
@ -2131,9 +2131,9 @@ public class KafkaAdminClientTest {
TopicPartition invalidTopicTP = new TopicPartition("", 0);
TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments = new HashMap<>();
invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
invalidTopicReassignments.put(invalidPartitionTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3))));
invalidTopicReassignments.put(invalidTopicTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3))));
invalidTopicReassignments.put(tp1, Optional.of(new NewPartitionReassignment(Arrays.asList(1, 2, 3))));
AlterPartitionReassignmentsResponseData singlePartResponseData =
new AlterPartitionReassignmentsResponseData()

View File

@ -22,7 +22,7 @@ import java.time.{Duration => JDuration}
import java.util.Arrays.asList
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import java.util.{Collections, Optional, Properties}
import java.{time, util}
import kafka.log.LogConfig
@ -1996,9 +1996,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
createTopic(topic, numPartitions = 4)
val validAssignment = NewPartitionReassignment.of(
val validAssignment = Optional.of(new NewPartitionReassignment(
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
)
))
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
@ -2012,9 +2012,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
val extraNonExistentReplica = NewPartitionReassignment.of((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava))
val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava))
val duplicateReplica = Optional.of(new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava))
val invalidReplicaResult = client.alterPartitionReassignments(Map(
tp1 -> extraNonExistentReplica,
tp2 -> negativeIdReplica,

View File

@ -29,7 +29,7 @@ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
import scala.util.Random
import java.io.File
import java.util.{Collections, Properties}
import java.util.{Collections, Optional, Properties}
import java.util.concurrent.ExecutionException
import org.apache.kafka.clients.producer.ProducerRecord
@ -1218,7 +1218,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
}
def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) =
tp -> NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
tp -> Optional.of(new NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava)))
def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, java.util.Optional[NewPartitionReassignment]) =
tp -> java.util.Optional.empty()

View File

@ -16,7 +16,7 @@
*/
package kafka.admin
import java.util.{Collections, Properties}
import java.util.{Collections, Optional, Properties}
import kafka.admin.TopicCommand.{AdminClientTopicService, TopicCommandOptions}
import kafka.common.AdminCommandFailedException
@ -674,7 +674,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
val targetReplica = brokerIds.diff(replicasOfFirstPartition).head
adminClient.alterPartitionReassignments(Collections.singletonMap(firstTopicPartition,
NewPartitionReassignment.of(Collections.singletonList(targetReplica))))
Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica)))))
// let's wait until the LAIR is propagated
TestUtils.waitUntilTrue(() => {