KAFKA-12471: Implement createPartitions in KIP-500 mode (#10343)

Implement the createPartitions RPC which adds more partitions to a topic
in the KIP-500 controller.  Factor out some of the logic for validating
manual partition assignments, so that it can be shared between
createTopics and createPartitions.  Add a startPartition argument to the
replica placer.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Colin Patrick McCabe 2021-04-13 11:00:22 -07:00 committed by GitHub
parent 327809024f
commit a8a6952e4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 438 additions and 39 deletions

View File

@ -273,7 +273,7 @@
<suppress checks="CyclomaticComplexity"
files="(ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
files="KafkaEventQueue.java"/>
files="(KafkaEventQueue|ReplicationControlManager).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
</suppressions>

View File

@ -19,7 +19,7 @@ package kafka.server
import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
@ -32,11 +32,14 @@ import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_A
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException}
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsRequestData, CreateTopicsResponseData, DeleteTopicsRequestData, DeleteTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, UnregisterBrokerResponseData, VoteResponseData}
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, TOPIC_AUTHORIZATION_FAILED}
import org.apache.kafka.common.message._
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource
@ -89,6 +92,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
@ -538,4 +542,65 @@ class ControllerApis(val requestChannel: RequestChannel,
}
})
}
def handleCreatePartitions(request: RequestChannel.Request): Unit = {
val future = createPartitions(request.body[CreatePartitionsRequest].data,
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
future.whenComplete((responses, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val responseData = new CreatePartitionsResponseData().
setResults(responses).
setThrottleTimeMs(requestThrottleMs)
new CreatePartitionsResponse(responseData)
})
}
})
}
def createPartitions(request: CreatePartitionsRequestData,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String])
: CompletableFuture[util.List[CreatePartitionsTopicResult]] = {
val responses = new util.ArrayList[CreatePartitionsTopicResult]()
val duplicateTopicNames = new util.HashSet[String]()
val topicNames = new util.HashSet[String]()
request.topics().forEach {
topic =>
if (!topicNames.add(topic.name())) {
duplicateTopicNames.add(topic.name())
}
}
duplicateTopicNames.forEach { topicName =>
responses.add(new CreatePartitionsTopicResult().
setName(topicName).
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."))
topicNames.remove(topicName)
}
val authorizedTopicNames = {
if (hasClusterAuth) {
topicNames.asScala
} else {
getCreatableTopics(topicNames.asScala)
}
}
val topics = new util.ArrayList[CreatePartitionsTopic]
topicNames.forEach { topicName =>
if (authorizedTopicNames.contains(topicName)) {
topics.add(request.topics().find(topicName))
} else {
responses.add(new CreatePartitionsTopicResult().
setName(topicName).
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
}
}
controller.createPartitions(topics).thenApply { results =>
results.forEach(response => responses.add(response))
responses
}
}
}

View File

@ -25,6 +25,8 @@ import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsResponseData;
@ -41,8 +43,10 @@ import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@ -227,6 +231,29 @@ public class MockController implements Controller {
throw new UnsupportedOperationException();
}
@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(List<CreatePartitionsTopic> topicList) {
if (!active) {
CompletableFuture<List<CreatePartitionsTopicResult>> future = new CompletableFuture<>();
future.completeExceptionally(NOT_CONTROLLER_EXCEPTION);
return future;
}
List<CreatePartitionsTopicResult> results = new ArrayList<>();
for (CreatePartitionsTopic topic : topicList) {
if (topicNameToId.containsKey(topic.name())) {
results.add(new CreatePartitionsTopicResult().setName(topic.name()).
setErrorCode(Errors.NONE.code()).
setErrorMessage(null));
} else {
results.add(new CreatePartitionsTopicResult().setName(topic.name()).
setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("No such topic as " + topic.name()));
}
}
return CompletableFuture.completedFuture(results);
}
@Override
public CompletableFuture<Long> beginWritingSnapshot() {
throw new UnsupportedOperationException();

View File

@ -32,15 +32,19 @@ import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.errors.{InvalidRequestException, NotControllerException, TopicDeletionDisabledException}
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreateTopicsRequestData, DeleteTopicsRequestData}
import org.apache.kafka.common.message.{BrokerRegistrationRequestData, CreatePartitionsRequestData, DeleteTopicsRequestData}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors.{CLUSTER_AUTHORIZATION_FAILED, INVALID_REQUEST, NONE, TOPIC_AUTHORIZATION_FAILED, UNKNOWN_TOPIC_ID, UNKNOWN_TOPIC_OR_PARTITION}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BrokerRegistrationRequest, BrokerRegistrationResponse, RequestContext, RequestHeader, RequestTestUtils}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
import org.apache.kafka.metadata.ApiMessageAndVersion
@ -371,6 +375,30 @@ class ControllerApisTest {
_ => Set("foo", "bar")))
}
@Test
def testCreatePartitionsRequest(): Unit = {
val controller = new MockController.Builder().
newInitialTopic("foo", Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")).
newInitialTopic("bar", Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")).build()
val controllerApis = createControllerApis(None, controller)
val request = new CreatePartitionsRequestData()
request.topics().add(new CreatePartitionsTopic().setName("foo").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("bar").setAssignments(null).setCount(5))
request.topics().add(new CreatePartitionsTopic().setName("baz").setAssignments(null).setCount(5))
assertEquals(Set(new CreatePartitionsTopicResult().setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().setName("bar").
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate topic name."),
new CreatePartitionsTopicResult().setName("baz").
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()).
setErrorMessage(null)),
controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
}
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()

View File

@ -435,6 +435,7 @@ public class BrokerHeartbeatManager {
/**
* Place replicas on unfenced brokers.
*
* @param startPartition The partition ID to start with.
* @param numPartitions The number of partitions to place.
* @param numReplicas The number of replicas for each partition.
* @param idToRack A function mapping broker id to broker rack.
@ -444,14 +445,16 @@ public class BrokerHeartbeatManager {
*
* @throws InvalidReplicationFactorException If too many replicas were requested.
*/
List<List<Integer>> placeReplicas(int numPartitions, short numReplicas,
List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas,
Function<Integer, Optional<String>> idToRack,
ReplicaPlacementPolicy policy) {
// TODO: support using fenced brokers here if necessary to get to the desired
// number of replicas. We probably need to add a fenced boolean in UsableBroker.
Iterator<UsableBroker> iterator = new UsableBrokerIterator(
unfenced.iterator(), idToRack);
return policy.createPlacement(numPartitions, numReplicas, iterator);
return policy.createPlacement(startPartition, numPartitions, numReplicas, iterator);
}
static class UsableBrokerIterator implements Iterator<UsableBroker> {

View File

@ -310,11 +310,13 @@ public class ClusterControlManager {
}
}
public List<List<Integer>> placeReplicas(int numPartitions, short numReplicas) {
public List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
return heartbeatManager.placeReplicas(numPartitions, numReplicas,
return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas,
id -> brokerRegistrations.get(id).rack(), placementPolicy);
}

View File

@ -24,6 +24,8 @@ import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
@ -36,6 +38,7 @@ import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -195,6 +198,15 @@ public interface Controller extends AutoCloseable {
*/
CompletableFuture<Long> beginWritingSnapshot();
/**
* Create partitions on certain topics.
*
* @param topics The list of topics to create partitions for.
* @return A future yielding per-topic results.
*/
CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(List<CreatePartitionsTopic> topics);
/**
* Begin shutting down, but don't block. You must still call close to clean up all
* resources.

View File

@ -43,6 +43,8 @@ import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
@ -1094,6 +1096,16 @@ public final class QuorumController implements Controller {
});
}
@Override
public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(List<CreatePartitionsTopic> topics) {
if (topics.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
return appendWriteEvent("createPartitions", () ->
replicationControl.createPartitions(topics));
}
@Override
public CompletableFuture<Long> beginWritingSnapshot() {
CompletableFuture<Long> future = new CompletableFuture<>();

View File

@ -32,6 +32,7 @@ interface ReplicaPlacementPolicy {
/**
* Create a new replica placement.
*
* @param startPartition The partition ID to start with.
* @param numPartitions The number of partitions to create placements for.
* @param numReplicas The number of replicas to create for each partitions.
* Must be positive.
@ -41,7 +42,9 @@ interface ReplicaPlacementPolicy {
*
* @throws InvalidReplicationFactorException If too many replicas were requested.
*/
List<List<Integer>> createPlacement(int numPartitions, short numReplicas,
List<List<Integer>> createPlacement(int startPartition,
int numPartitions,
short numReplicas,
Iterator<UsableBroker> iterator)
throws InvalidReplicationFactorException;
}

View File

@ -23,14 +23,21 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@ -66,13 +73,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.OptionalInt;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
@ -451,34 +458,18 @@ public class ReplicationControlManager {
"A manual partition assignment was specified, but numPartitions " +
"was not set to -1.");
}
OptionalInt replicationFactor = OptionalInt.empty();
for (CreatableReplicaAssignment assignment : topic.assignments()) {
if (newParts.containsKey(assignment.partitionIndex())) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"Found multiple manual partition assignments for partition " +
assignment.partitionIndex());
}
HashSet<Integer> brokerIds = new HashSet<>();
for (int brokerId : assignment.brokerIds()) {
if (!brokerIds.add(brokerId)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"The manual partition assignment specifies the same node " +
"id more than once.");
} else if (!clusterControl.unfenced(brokerId)) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"The manual partition assignment contains node " + brokerId +
", but that node is not usable.");
}
}
int[] replicas = new int[assignment.brokerIds().size()];
for (int i = 0; i < replicas.length; i++) {
replicas[i] = assignment.brokerIds().get(i);
}
int[] isr = new int[assignment.brokerIds().size()];
for (int i = 0; i < replicas.length; i++) {
isr[i] = assignment.brokerIds().get(i);
}
newParts.put(assignment.partitionIndex(),
new PartitionControlInfo(replicas, isr, null, null, isr[0], 0, 0));
validateManualPartitionAssignment(assignment.brokerIds(), replicationFactor);
replicationFactor = OptionalInt.of(assignment.brokerIds().size());
int[] replicas = Replicas.toArray(assignment.brokerIds());
newParts.put(assignment.partitionIndex(), new PartitionControlInfo(
replicas, replicas, null, null, replicas[0], 0, 0));
}
} else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
@ -497,7 +488,7 @@ public class ReplicationControlManager {
defaultReplicationFactor : topic.replicationFactor();
try {
List<List<Integer>> replicas = clusterControl.
placeReplicas(numPartitions, replicationFactor);
placeReplicas(0, numPartitions, replicationFactor);
for (int partitionId = 0; partitionId < replicas.size(); partitionId++) {
int[] r = Replicas.toArray(replicas.get(partitionId));
newParts.put(partitionId,
@ -1007,6 +998,127 @@ public class ReplicationControlManager {
return ControllerResult.of(records, null);
}
ControllerResult<List<CreatePartitionsTopicResult>>
createPartitions(List<CreatePartitionsTopic> topics) {
List<ApiMessageAndVersion> records = new ArrayList<>();
List<CreatePartitionsTopicResult> results = new ArrayList<>();
for (CreatePartitionsTopic topic : topics) {
ApiError apiError = ApiError.NONE;
try {
createPartitions(topic, records);
} catch (ApiException e) {
apiError = ApiError.fromThrowable(e);
} catch (Exception e) {
log.error("Unexpected createPartitions error for {}", topic, e);
apiError = ApiError.fromThrowable(e);
}
results.add(new CreatePartitionsTopicResult().
setName(topic.name()).
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.message()));
}
return new ControllerResult<>(records, results, true);
}
void createPartitions(CreatePartitionsTopic topic,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic.name());
if (topicId == null) {
throw new UnknownTopicOrPartitionException();
}
TopicControlInfo topicInfo = topics.get(topicId);
if (topicInfo == null) {
throw new UnknownTopicOrPartitionException();
}
if (topic.count() == topicInfo.parts.size()) {
throw new InvalidPartitionsException("Topic already has " +
topicInfo.parts.size() + " partition(s).");
} else if (topic.count() < topicInfo.parts.size()) {
throw new InvalidPartitionsException("The topic " + topic.name() + " currently " +
"has " + topicInfo.parts.size() + " partition(s); " + topic.count() +
" would not be an increase.");
}
int additional = topic.count() - topicInfo.parts.size();
if (topic.assignments() != null) {
if (topic.assignments().size() != additional) {
throw new InvalidReplicaAssignmentException("Attempted to add " + additional +
" additional partition(s), but only " + topic.assignments().size() +
" assignment(s) were specified.");
}
}
Iterator<PartitionControlInfo> iterator = topicInfo.parts.values().iterator();
if (!iterator.hasNext()) {
throw new UnknownServerException("Invalid state: topic " + topic.name() +
" appears to have no partitions.");
}
PartitionControlInfo partitionInfo = iterator.next();
if (partitionInfo.replicas.length > Short.MAX_VALUE) {
throw new UnknownServerException("Invalid replication factor " +
partitionInfo.replicas.length + ": expected a number equal to less than " +
Short.MAX_VALUE);
}
short replicationFactor = (short) partitionInfo.replicas.length;
int startPartitionId = topicInfo.parts.size();
List<List<Integer>> placements;
if (topic.assignments() != null) {
placements = new ArrayList<>();
for (CreatePartitionsAssignment assignment : topic.assignments()) {
validateManualPartitionAssignment(assignment.brokerIds(),
OptionalInt.of(replicationFactor));
placements.add(assignment.brokerIds());
}
} else {
placements = clusterControl.placeReplicas(startPartitionId, additional,
replicationFactor);
}
int partitionId = startPartitionId;
for (List<Integer> placement : placements) {
records.add(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
setReplicas(placement).
setIsr(placement).
setRemovingReplicas(null).
setAddingReplicas(null).
setLeader(placement.get(0)).
setLeaderEpoch(0).
setPartitionEpoch(0), (short) 0));
partitionId++;
}
}
void validateManualPartitionAssignment(List<Integer> assignment,
OptionalInt replicationFactor) {
if (assignment.isEmpty()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes an empty replica list.");
}
List<Integer> sortedBrokerIds = new ArrayList<>(assignment);
sortedBrokerIds.sort(Integer::compare);
Integer prevBrokerId = null;
for (Integer brokerId : sortedBrokerIds) {
if (!clusterControl.brokerRegistrations().containsKey(brokerId)) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes broker " + brokerId + ", but no such broker is " +
"registered.");
}
if (brokerId.equals(prevBrokerId)) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes the broker " + prevBrokerId + " more than " +
"once.");
}
prevBrokerId = brokerId;
}
if (replicationFactor.isPresent() &&
sortedBrokerIds.size() != replicationFactor.getAsInt()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes a partition with " + sortedBrokerIds.size() +
" replica(s), but this is not consistent with previous " +
"partitions, which have " + replicationFactor.getAsInt() + " replica(s).");
}
}
class ReplicationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final long epoch;
private final Iterator<TopicControlInfo> iterator;

View File

@ -41,7 +41,8 @@ public class SimpleReplicaPlacementPolicy implements ReplicaPlacementPolicy {
}
@Override
public List<List<Integer>> createPlacement(int numPartitions,
public List<List<Integer>> createPlacement(int startPartition,
int numPartitions,
short numReplicas,
Iterator<UsableBroker> iterator) {
List<UsableBroker> usable = new ArrayList<>();

View File

@ -142,7 +142,7 @@ public class ClusterControlManagerTest {
String.format("broker %d was not unfenced.", i));
}
for (int i = 0; i < 100; i++) {
List<List<Integer>> results = clusterControl.placeReplicas(1, (short) 3);
List<List<Integer>> results = clusterControl.placeReplicas(0, 1, (short) 3);
HashSet<Integer> seen = new HashSet<>();
for (Integer result : results.get(0)) {
assertTrue(result >= 0);

View File

@ -20,10 +20,14 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@ -57,6 +61,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
@ -518,4 +524,132 @@ public class ReplicationControlManagerTest {
Long.MAX_VALUE, Collections.singleton("foo")));
assertEmptyTopicConfigs(ctx, "foo");
}
@Test
public void testCreatePartitions() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(3).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(4).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("quux").
setNumPartitions(2).setReplicationFactor((short) 2));
request.topics().add(new CreatableTopic().setName("foo2").
setNumPartitions(2).setReplicationFactor((short) 2));
registerBroker(0, ctx);
unfenceBroker(0, ctx);
registerBroker(1, ctx);
unfenceBroker(1, ctx);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.createTopics(request);
ctx.replay(createTopicResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
setName("foo").setCount(5).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("bar").setCount(3).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("baz").setCount(3).setAssignments(null));
topics.add(new CreatePartitionsTopic().
setName("quux").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(topics);
assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("bar").
setErrorCode(INVALID_PARTITIONS.code()).
setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."),
new CreatePartitionsTopicResult().
setName("baz").
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("quux").
setErrorCode(INVALID_PARTITIONS.code()).
setErrorMessage("Topic already has 2 partition(s).")),
createPartitionsResult.response());
ctx.replay(createPartitionsResult.records());
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
topics2.add(new CreatePartitionsTopic().
setName("foo").setCount(6).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
setName("bar").setCount(5).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
topics2.add(new CreatePartitionsTopic().
setName("quux").setCount(4).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
setName("foo2").setCount(3).setAssignments(Arrays.asList(
new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
replicationControl.createPartitions(topics2);
assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
new CreatePartitionsTopicResult().
setName("bar").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes a partition " +
"with 1 replica(s), but this is not consistent with previous " +
"partitions, which have 2 replica(s)."),
new CreatePartitionsTopicResult().
setName("quux").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("Attempted to add 2 additional partition(s), but only 1 assignment(s) were specified."),
new CreatePartitionsTopicResult().
setName("foo2").
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes broker 2, but " +
"no such broker is registered.")),
createPartitionsResult2.response());
ctx.replay(createPartitionsResult2.records());
}
@Test
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
registerBroker(1, ctx);
registerBroker(2, ctx);
registerBroker(3, ctx);
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
OptionalInt.of(1));
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
OptionalInt.empty());
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
OptionalInt.of(3));
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
OptionalInt.empty());
}
@Test
public void testValidateBadManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
registerBroker(1, ctx);
registerBroker(2, ctx);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes broker 3, but no such " +
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes the broker 2 more than " +
"once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes a partition with 2 " +
"replica(s), but this is not consistent with previous partitions, which have " +
"3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
OptionalInt.of(3))).getMessage());
}
}