KAFKA-15426: Process and persist directory assignments

Handle AssignReplicasToDirs requests, persist metadata changes
with new directory assignments and possible leader elections.

Reviewers: Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rndgstn@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Igor Soarez 2023-11-28 17:42:21 +00:00 committed by Colin P. McCabe
parent 498f3f010f
commit c515bf51f8
14 changed files with 472 additions and 133 deletions

View File

@ -52,6 +52,12 @@
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="metadata">
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.common.protocol" />
</subpackage>
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />

View File

@ -53,6 +53,7 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.metadata" />
<!-- utilities and reusable classes from server-common -->
<allow pkg="org.apache.kafka.queue" />

View File

@ -1050,12 +1050,14 @@ class ControllerApis(
}
def handleDescribeCluster(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a high level of
// permissions (ALTER on CLUSTER).
// Nearly all RPCs should check MetadataVersion inside the QuorumController. However, this
// RPC is consulting a cache which lives outside the QC. So we check MetadataVersion here.
if (!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported()) {
throw new UnsupportedVersionException("Direct-to-controller communication is not " +
"supported with the current MetadataVersion.")
}
// Unlike on the broker, DESCRIBE_CLUSTER on the controller requires a high level of
// permissions (ALTER on CLUSTER).
authHelper.authorizeClusterOperation(request, ALTER)
val response = authHelper.computeDescribeClusterResponse(
request,
@ -1070,11 +1072,13 @@ class ControllerApis(
}
def handleAssignReplicasToDirs(request: RequestChannel.Request): CompletableFuture[Unit] = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest]
// TODO KAFKA-15426
requestHelper.sendMaybeThrottle(request,
assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
val context = new ControllerRequestContext(request.context.header.data, request.context.principal,
OptionalLong.empty())
controller.assignReplicasToDirs(context, assignReplicasToDirsRequest.data).thenApply { reply =>
requestHelper.sendResponseMaybeThrottle(request,
requestThrottleMs => new AssignReplicasToDirsResponse(reply.setThrottleTimeMs(requestThrottleMs)))
}
}
}

View File

@ -32,6 +32,8 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
@ -537,4 +539,9 @@ public class MockController implements Controller {
public void close() {
beginShutdown();
}
@Override
public CompletableFuture<AssignReplicasToDirsResponseData> assignReplicasToDirs(ControllerRequestContext context, AssignReplicasToDirsRequestData request) {
throw new java.lang.UnsupportedOperationException("not implemented");
}
}

View File

@ -54,6 +54,7 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.util.FutureUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@ -1139,31 +1140,24 @@ class ControllerApisTest {
}
@Test
def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
def testAssignReplicasToDirs(): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
val authorizer = mock(classOf[Authorizer])
val controllerApis = createControllerApis(Some(authorizer), controller)
val request =
new AssignReplicasToDirsRequest.Builder(
new AssignReplicasToDirsRequestData()
.setBrokerId(1)
.setBrokerEpoch(123L)
.setDirectories(util.Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(util.Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(Uuid.fromString("pcPTaiQfRXyZG88kO9k2aA"))
.setPartitions(util.Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(8)
))
))
))).build()
val request = new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData()).build()
when(authorizer.authorize(any[RequestContext], ArgumentMatchers.eq(Collections.singletonList(new Action(
AclOperation.CLUSTER_ACTION,
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
1, true, true
)))))
.thenReturn(Collections.singletonList(AuthorizationResult.ALLOWED))
when(controller.assignReplicasToDirs(any[ControllerRequestContext], ArgumentMatchers.eq(request.data)))
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()))
val expectedResponse = new AssignReplicasToDirsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
val response = handleRequest[AssignReplicasToDirsResponse](request, controllerApis)
assertEquals(expectedResponse, response.data)
assertEquals(new AssignReplicasToDirsResponseData().setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()), response.data)
}
private def handleRequest[T <: AbstractResponse](

View File

@ -28,6 +28,8 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
@ -404,6 +406,19 @@ public interface Controller extends AclMutator, AutoCloseable {
ControllerRegistrationRequestData request
);
/**
* Assign replicas to directories.
*
* @param context The controller request context.
* @param request The assign replicas to dirs request.
*
* @return A future yielding the results.
*/
CompletableFuture<AssignReplicasToDirsResponseData> assignReplicasToDirs(
ControllerRequestContext context,
AssignReplicasToDirsRequestData request
);
/**
* Begin shutting down, but don't block. You must still call close to clean up all
* resources.

View File

@ -37,6 +37,8 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.ControllerRegistrationRequestData;
@ -2295,6 +2297,15 @@ public final class QuorumController implements Controller {
() -> aclControlManager.deleteAcls(filters));
}
@Override
public CompletableFuture<AssignReplicasToDirsResponseData> assignReplicasToDirs(
ControllerRequestContext context,
AssignReplicasToDirsRequestData request
) {
return appendWriteEvent("assignReplicasToDirs", context.deadlineNs(),
() -> replicationControl.handleAssignReplicasToDirs(request));
}
@Override
public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
final CompletableFuture<Void> future = new CompletableFuture<>();

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
@ -46,6 +47,8 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.Re
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
@ -2019,6 +2022,76 @@ public class ReplicationControlManager {
return response;
}
ControllerResult<AssignReplicasToDirsResponseData> handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
if (!featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
throw new UnsupportedVersionException("Directory assignment is not supported yet.");
}
int brokerId = request.brokerId();
clusterControl.checkBrokerEpoch(brokerId, request.brokerEpoch());
BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
if (brokerRegistration == null) {
throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + " is not currently registered");
}
List<ApiMessageAndVersion> records = new ArrayList<>();
AssignReplicasToDirsResponseData response = new AssignReplicasToDirsResponseData();
Set<TopicIdPartition> leaderAndIsrUpdates = new HashSet<>();
for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) {
Uuid dirId = reqDir.id();
boolean directoryIsOffline = !brokerRegistration.hasOnlineDir(dirId);
AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId);
for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) {
Uuid topicId = reqTopic.topicId();
Errors topicError = Errors.NONE;
TopicControlInfo topicInfo = this.topics.get(topicId);
if (topicInfo == null) {
log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId);
topicError = Errors.UNKNOWN_TOPIC_ID;
}
AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) {
int partitionIndex = reqPartition.partitionIndex();
Errors partitionError = topicError;
if (topicError == Errors.NONE) {
String topicName = topicInfo.name;
PartitionRegistration partitionRegistration = topicInfo.parts.get(partitionIndex);
if (partitionRegistration == null) {
log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex);
partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION;
} else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) {
log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex);
partitionError = Errors.NOT_LEADER_OR_FOLLOWER;
} else {
Optional<ApiMessageAndVersion> partitionChangeRecord = new PartitionChangeBuilder(
partitionRegistration,
topicId,
partitionIndex,
new LeaderAcceptor(clusterControl, partitionRegistration),
featureControl.metadataVersion(),
getTopicEffectiveMinIsr(topicName)
)
.setDirectory(brokerId, dirId)
.setDefaultDirProvider(clusterDescriber)
.build();
partitionChangeRecord.ifPresent(records::add);
if (directoryIsOffline) {
leaderAndIsrUpdates.add(new TopicIdPartition(topicId, partitionIndex));
}
}
}
resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData().
setPartitionIndex(partitionIndex).
setErrorCode(partitionError.code()));
}
resDir.topics().add(resTopic);
}
response.directories().add(resDir);
}
if (!leaderAndIsrUpdates.isEmpty()) {
generateLeaderAndIsrUpdates("offline-dir-assignment", brokerId, NO_LEADER, records, leaderAndIsrUpdates.iterator());
}
return ControllerResult.of(records, response);
}
private void listReassigningTopic(ListPartitionReassignmentsResponseData response,
Uuid topicId,
List<Integer> partitionIds) {

View File

@ -206,11 +206,12 @@ public class PartitionRegistration {
private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int[] removingReplicas,
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
if (directories != null && directories.length > 0 && directories.length != replicas.length) {
Objects.requireNonNull(directories);
if (directories.length > 0 && directories.length != replicas.length) {
throw new IllegalArgumentException("The lengths for replicas and directories do not match.");
}
this.replicas = replicas;
this.directories = Objects.requireNonNull(directories);
this.directories = directories;
this.isr = isr;
this.removingReplicas = removingReplicas;
this.addingReplicas = addingReplicas;

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.message.AlterPartitionRequestData.PartitionData;
@ -37,6 +38,8 @@ import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.Re
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
@ -75,6 +78,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.controller.BrokerHeartbeatManager.BrokerHeartbeatState;
import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
import org.apache.kafka.metadata.AssignmentsHelper;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
@ -100,6 +104,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Comparator;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
@ -130,6 +135,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER;
import static org.apache.kafka.common.protocol.Errors.NOT_LEADER_OR_FOLLOWER;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED;
import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION;
@ -489,6 +495,15 @@ public class ReplicationControlManagerTest {
getPartition(topicIdPartition.topicId(), topicIdPartition.partitionId());
return (partition.leader < 0) ? OptionalInt.empty() : OptionalInt.of(partition.leader);
}
ControllerResult<AssignReplicasToDirsResponseData> assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) throws Exception {
ControllerResult<AssignReplicasToDirsResponseData> result = replicationControl.handleAssignReplicasToDirs(
AssignmentsHelper.buildRequestData(brokerId, defaultBrokerEpoch(brokerId), assignment));
assertNotNull(result.response());
assertEquals(NONE.code(), result.response().errorCode());
replay(result.records());
return result;
}
}
private static class MockCreateTopicPolicy implements CreateTopicPolicy {
@ -2835,4 +2850,98 @@ public class ReplicationControlManagerTest {
setTopicId(Uuid.fromString("8auUWq8zQqe_99H_m2LAmw")))).
getMessage());
}
@Test
void testHandleAssignReplicasToDirsFailsOnOlderMv() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
setMetadataVersion(MetadataVersion.IBP_3_7_IV1).
build();
assertThrows(UnsupportedVersionException.class,
() -> ctx.replicationControl.handleAssignReplicasToDirs(new AssignReplicasToDirsRequestData()));
}
@Test
void testHandleAssignReplicasToDirs() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ");
Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw");
Uuid dir1b2 = Uuid.fromString("TBGa8UayQi6KguqF5nC0sw");
ctx.registerBrokersWithDirs(1, asList(dir1b1, dir2b1), 2, singletonList(dir1b2));
ctx.unfenceBrokers(1, 2);
Uuid topicA = ctx.createTestTopic("a", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId();
Uuid topicB = ctx.createTestTopic("b", new int[][]{new int[]{1, 2}, new int[]{1, 2}}).topicId();
Uuid topicC = ctx.createTestTopic("c", new int[][]{new int[]{2}}).topicId();
ControllerResult<AssignReplicasToDirsResponseData> controllerResult = ctx.assignReplicasToDirs(1, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(topicA, 0), dir1b1);
put(new TopicIdPartition(topicA, 1), dir2b1);
put(new TopicIdPartition(topicB, 0), dir1b1);
put(new TopicIdPartition(topicB, 1), DirectoryId.LOST);
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), dir2b1); // expect UNKNOWN_TOPIC_ID
put(new TopicIdPartition(topicA, 137), dir1b1); // expect UNKNOWN_TOPIC_OR_PARTITION
put(new TopicIdPartition(topicC, 0), dir1b1); // expect NOT_LEADER_OR_FOLLOWER
}});
assertEquals(AssignmentsHelper.normalize(AssignmentsHelper.buildResponseData((short) 0, 0, new HashMap<Uuid, Map<TopicIdPartition, Errors>>() {{
put(dir1b1, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicA, 0), NONE);
put(new TopicIdPartition(topicA, 137), UNKNOWN_TOPIC_OR_PARTITION);
put(new TopicIdPartition(topicB, 0), NONE);
put(new TopicIdPartition(topicC, 0), NOT_LEADER_OR_FOLLOWER);
}});
put(dir2b1, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicA, 1), NONE);
put(new TopicIdPartition(Uuid.fromString("nLU9hKNXSZuMe5PO2A4dVQ"), 1), UNKNOWN_TOPIC_ID);
}});
put(DirectoryId.LOST, new HashMap<TopicIdPartition, Errors>() {{
put(new TopicIdPartition(topicB, 1), NONE);
}});
}})), AssignmentsHelper.normalize(controllerResult.response()));
short recordVersion = ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
assertEquals(sortPartitionChangeRecords(asList(
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)
.setDirectories(asList(dir1b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicA).setPartitionId(1).
setDirectories(asList(dir2b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(0).
setDirectories(asList(dir1b1, dir1b2)), recordVersion),
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
setDirectories(asList(DirectoryId.LOST, dir1b2)), recordVersion),
// In addition to the directory assignment changes we expect an additional record,
// which elects a new leader for bar-1 which has been assigned to an offline directory.
new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(topicB).setPartitionId(1).
setIsr(singletonList(2)).setLeader(2), recordVersion)
)), sortPartitionChangeRecords(controllerResult.records()));
ctx.replay(controllerResult.records());
assertEquals(new HashSet<TopicIdPartition>() {{
add(new TopicIdPartition(topicA, 0));
add(new TopicIdPartition(topicA, 1));
add(new TopicIdPartition(topicB, 0));
}}, RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(1, true)));
assertEquals(new HashSet<TopicIdPartition>() {{
add(new TopicIdPartition(topicB, 1));
add(new TopicIdPartition(topicC, 0));
}},
RecordTestUtils.iteratorToSet(ctx.replicationControl.brokersToIsrs().iterator(2, true)));
}
/**
* Sorts {@link PartitionChangeRecord} by topic ID and partition ID,
* so that the order of the records is deterministic, and can be compared.
*/
private static List<ApiMessageAndVersion> sortPartitionChangeRecords(List<ApiMessageAndVersion> records) {
records = new ArrayList<>(records);
records.sort(Comparator.comparing((ApiMessageAndVersion record) -> {
PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
return partitionChangeRecord.topicId() + "-" + partitionChangeRecord.partitionId();
}));
return records;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
public class AssignmentsHelper {
/**
* Build a AssignReplicasToDirsRequestData from a map of TopicIdPartition to Uuid.
*/
public static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
Map<Uuid, AssignReplicasToDirsRequestData.DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, AssignReplicasToDirsRequestData.TopicData>> topicMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
TopicIdPartition topicPartition = entry.getKey();
Uuid directoryId = entry.getValue();
AssignReplicasToDirsRequestData.DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new AssignReplicasToDirsRequestData.DirectoryData().setId(directoryId));
AssignReplicasToDirsRequestData.TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
AssignReplicasToDirsRequestData.TopicData data = new AssignReplicasToDirsRequestData.TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
AssignReplicasToDirsRequestData.PartitionData partition = new AssignReplicasToDirsRequestData.PartitionData().setPartitionIndex(topicPartition.partitionId());
topic.partitions().add(partition);
}
return new AssignReplicasToDirsRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
/**
* Build a AssignReplicasToDirsRequestData from a map of TopicIdPartition to Uuid.
*/
public static AssignReplicasToDirsResponseData buildResponseData(short errorCode, int throttleTimeMs, Map<Uuid, Map<TopicIdPartition, Errors>> errors) {
Map<Uuid, AssignReplicasToDirsResponseData.DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, AssignReplicasToDirsResponseData.TopicData>> topicMap = new HashMap<>();
for (Map.Entry<Uuid, Map<TopicIdPartition, Errors>> dirEntry : errors.entrySet()) {
Uuid directoryId = dirEntry.getKey();
AssignReplicasToDirsResponseData.DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new AssignReplicasToDirsResponseData.DirectoryData().setId(directoryId));
for (Map.Entry<TopicIdPartition, Errors> partitionEntry : dirEntry.getValue().entrySet()) {
TopicIdPartition topicPartition = partitionEntry.getKey();
Errors error = partitionEntry.getValue();
AssignReplicasToDirsResponseData.TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
AssignReplicasToDirsResponseData.TopicData data = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
AssignReplicasToDirsResponseData.PartitionData partition = new AssignReplicasToDirsResponseData.PartitionData()
.setPartitionIndex(topicPartition.partitionId()).setErrorCode(error.code());
topic.partitions().add(partition);
}
}
return new AssignReplicasToDirsResponseData()
.setErrorCode(errorCode)
.setThrottleTimeMs(throttleTimeMs)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
/**
* Normalize the request data by sorting the directories, topics and partitions.
* This is useful for comparing two semantically equivalent requests.
*/
public static AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData request) {
request = request.duplicate();
request.directories().sort(Comparator.comparing(AssignReplicasToDirsRequestData.DirectoryData::id));
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
directory.topics().sort(Comparator.comparing(AssignReplicasToDirsRequestData.TopicData::topicId));
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
topic.partitions().sort(Comparator.comparing(AssignReplicasToDirsRequestData.PartitionData::partitionIndex));
}
}
return request;
}
/**
* Normalize the response data by sorting the directories, topics and partitions.
* This is useful for comparing two semantically equivalent requests.
*/
public static AssignReplicasToDirsResponseData normalize(AssignReplicasToDirsResponseData response) {
response = response.duplicate();
response.directories().sort(Comparator.comparing(AssignReplicasToDirsResponseData.DirectoryData::id));
for (AssignReplicasToDirsResponseData.DirectoryData directory : response.directories()) {
directory.topics().sort(Comparator.comparing(AssignReplicasToDirsResponseData.TopicData::topicId));
for (AssignReplicasToDirsResponseData.TopicData topic : directory.topics()) {
topic.partitions().sort(Comparator.comparing(AssignReplicasToDirsResponseData.PartitionData::partitionIndex));
}
}
return response;
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class AssignmentsHelperTest {
private static final Uuid TOPIC_1 = Uuid.fromString("tJU174w8QiSXLimIdtUKKg");
private static final Uuid TOPIC_2 = Uuid.fromString("3vHa6oVIRKOmm0VYhGzTJQ");
private static final Uuid DIR_1 = Uuid.fromString("Nm6KAvyxQNS63HyB4yRsTQ");
private static final Uuid DIR_2 = Uuid.fromString("l3Rv0JxcRLCQ6rLoGbYUgQ");
private static final Uuid DIR_3 = Uuid.fromString("ILABYpv3SKOBjqws4SR8Ww");
@Test
public void testBuildRequestData() {
Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
}};
AssignReplicasToDirsRequestData built = AssignmentsHelper.buildRequestData(8, 100L, assignment);
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData()
.setBrokerId(8)
.setBrokerEpoch(100L)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_2)
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(2)
)),
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_2)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(5)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_3)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(3)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_1)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(4),
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(1)
))
))
));
assertEquals(AssignmentsHelper.normalize(expected), AssignmentsHelper.normalize(built));
}
}

View File

@ -19,10 +19,6 @@ package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
@ -36,7 +32,6 @@ import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -47,6 +42,8 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
public class AssignmentsManager {
private static final Logger log = LoggerFactory.getLogger(AssignmentsManager.class);
@ -368,27 +365,4 @@ public class AssignmentsManager {
}
return failures;
}
// visible for testing
static AssignReplicasToDirsRequestData buildRequestData(int brokerId, long brokerEpoch, Map<TopicIdPartition, Uuid> assignment) {
Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, Uuid> entry : assignment.entrySet()) {
TopicIdPartition topicPartition = entry.getKey();
Uuid directoryId = entry.getValue();
DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new DirectoryData().setId(directoryId));
TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
TopicData data = new TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
PartitionData partition = new PartitionData().setPartitionIndex(topicPartition.partitionId());
topic.partitions().add(partition);
}
return new AssignReplicasToDirsRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
}

View File

@ -33,14 +33,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.metadata.AssignmentsHelper.buildRequestData;
import static org.apache.kafka.metadata.AssignmentsHelper.normalize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@ -73,78 +71,11 @@ public class AssignmentsManagerTest {
manager.close();
}
AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData request) {
request = request.duplicate();
request.directories().sort(Comparator.comparing(AssignReplicasToDirsRequestData.DirectoryData::id));
for (AssignReplicasToDirsRequestData.DirectoryData directory : request.directories()) {
directory.topics().sort(Comparator.comparing(AssignReplicasToDirsRequestData.TopicData::topicId));
for (AssignReplicasToDirsRequestData.TopicData topic : directory.topics()) {
topic.partitions().sort(Comparator.comparing(AssignReplicasToDirsRequestData.PartitionData::partitionIndex));
}
}
return request;
}
void assertRequestEquals(AssignReplicasToDirsRequestData expected, AssignReplicasToDirsRequestData actual) {
assertEquals(normalize(expected), normalize(actual));
}
@Test
void testBuildRequestData() {
Map<TopicIdPartition, Uuid> assignment = new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
put(new TopicIdPartition(TOPIC_1, 3), DIR_3);
put(new TopicIdPartition(TOPIC_1, 4), DIR_1);
put(new TopicIdPartition(TOPIC_2, 5), DIR_2);
}};
AssignReplicasToDirsRequestData built = AssignmentsManager.buildRequestData(8, 100L, assignment);
AssignReplicasToDirsRequestData expected = new AssignReplicasToDirsRequestData()
.setBrokerId(8)
.setBrokerEpoch(100L)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_2)
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(2)
)),
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_2)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(5)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_3)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Collections.singletonList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(3)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(DIR_1)
.setTopics(Collections.singletonList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(TOPIC_1)
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(4),
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(1)
))
))
));
assertRequestEquals(expected, built);
}
@Test
public void testAssignmentAggregation() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(1);
@ -170,7 +101,7 @@ public class AssignmentsManagerTest {
verifyNoMoreInteractions(channelManager);
assertEquals(1, captor.getAllValues().size());
AssignReplicasToDirsRequestData actual = captor.getValue().build().data();
AssignReplicasToDirsRequestData expected = AssignmentsManager.buildRequestData(
AssignReplicasToDirsRequestData expected = buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_2);
@ -228,18 +159,18 @@ public class AssignmentsManagerTest {
verify(channelManager, times(5)).sendRequest(captor.capture(), any(ControllerRequestCompletionHandler.class));
verifyNoMoreInteractions(channelManager);
assertEquals(5, captor.getAllValues().size());
assertRequestEquals(AssignmentsManager.buildRequestData(
assertRequestEquals(buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
}}
), captor.getAllValues().get(0).build().data());
assertRequestEquals(AssignmentsManager.buildRequestData(
assertRequestEquals(buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);
}}
), captor.getAllValues().get(1).build().data());
assertRequestEquals(AssignmentsManager.buildRequestData(
assertRequestEquals(buildRequestData(
8, 100L, new HashMap<TopicIdPartition, Uuid>() {{
put(new TopicIdPartition(TOPIC_1, 1), DIR_1);
put(new TopicIdPartition(TOPIC_1, 2), DIR_3);