KAFKA-15355: Message schema changes (#14290)

Reviewers: Christo Lolov <lolovc@amazon.com>, Colin P. McCabe <cmccabe@apache.org>, Proven Provenzano <pprovenzano@confluent.io>, Ron Dagostino <rdagostino@confluent.io>
This commit is contained in:
Igor Soarez 2023-11-02 13:46:05 +00:00 committed by GitHub
parent 6e44a3f12d
commit 0390d5b1a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 971 additions and 68 deletions

View File

@ -112,7 +112,9 @@
</module>
<!-- code quality -->
<module name="MethodLength"/>
<module name="MethodLength">
<property name="max" value="170" />
</module>
<module name="ParameterNumber">
<!-- default is 8 -->
<property name="max" value="13"/>

View File

@ -61,6 +61,9 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
</subpackage>
<subpackage name="errors">
<allow pkg="org.apache.kafka.common.metadata" />
</subpackage>
</subpackage>
<subpackage name="controller">

View File

@ -17,10 +17,12 @@
package org.apache.kafka.common;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
@ -169,4 +171,32 @@ public class Uuid implements Comparable<Uuid> {
return 0;
}
}
/**
* Convert a list of Uuid to an array of Uuid.
*
* @param list The input list
* @return The output array
*/
public static Uuid[] toArray(List<Uuid> list) {
if (list == null) return null;
Uuid[] array = new Uuid[list.size()];
for (int i = 0; i < list.size(); i++) {
array[i] = list.get(i);
}
return array;
}
/**
* Convert an array of Uuids to a list of Uuid.
*
* @param array The input array
* @return The output list
*/
public static List<Uuid> toList(Uuid[] array) {
if (array == null) return null;
List<Uuid> list = new ArrayList<>(array.length);
list.addAll(Arrays.asList(array));
return list;
}
}

View File

@ -23,19 +23,19 @@ public abstract class InvalidMetadataException extends RetriableException {
private static final long serialVersionUID = 1L;
public InvalidMetadataException() {
protected InvalidMetadataException() {
super();
}
public InvalidMetadataException(String message) {
protected InvalidMetadataException(String message) {
super(message);
}
public InvalidMetadataException(String message, Throwable cause) {
protected InvalidMetadataException(String message, Throwable cause) {
super(message, cause);
}
public InvalidMetadataException(Throwable cause) {
protected InvalidMetadataException(Throwable cause) {
super(cause);
}

View File

@ -115,7 +115,8 @@ public enum ApiKeys {
CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE),
CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION),
GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY);
PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -320,6 +320,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return GetTelemetrySubscriptionsRequest.parse(buffer, apiVersion);
case PUSH_TELEMETRY:
return PushTelemetryRequest.parse(buffer, apiVersion);
case ASSIGN_REPLICAS_TO_DIRS:
return AssignReplicasToDirsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -257,6 +257,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return GetTelemetrySubscriptionsResponse.parse(responseBuffer, version);
case PUSH_TELEMETRY:
return PushTelemetryResponse.parse(responseBuffer, version);
case ASSIGN_REPLICAS_TO_DIRS:
return AssignReplicasToDirsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class AssignReplicasToDirsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<AssignReplicasToDirsRequest> {
private final AssignReplicasToDirsRequestData data;
public Builder(AssignReplicasToDirsRequestData data) {
super(ApiKeys.ASSIGN_REPLICAS_TO_DIRS);
this.data = data;
}
@Override
public AssignReplicasToDirsRequest build(short version) {
return new AssignReplicasToDirsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final AssignReplicasToDirsRequestData data;
public AssignReplicasToDirsRequest(AssignReplicasToDirsRequestData data, short version) {
super(ApiKeys.ASSIGN_REPLICAS_TO_DIRS, version);
this.data = data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AssignReplicasToDirsResponse(
new AssignReplicasToDirsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}
@Override
public AssignReplicasToDirsRequestData data() {
return data;
}
public static AssignReplicasToDirsRequest parse(ByteBuffer buffer, short version) {
return new AssignReplicasToDirsRequest(new AssignReplicasToDirsRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
public class AssignReplicasToDirsResponse extends AbstractResponse {
private final AssignReplicasToDirsResponseData data;
public AssignReplicasToDirsResponse(AssignReplicasToDirsResponseData data) {
super(ApiKeys.ASSIGN_REPLICAS_TO_DIRS);
this.data = data;
}
@Override
public AssignReplicasToDirsResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}
public static AssignReplicasToDirsResponse parse(ByteBuffer buffer, short version) {
return new AssignReplicasToDirsResponse(new AssignReplicasToDirsResponseData(
new ByteBufferAccessor(buffer), version));
}
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 73,
"type": "request",
"listeners": ["controller"],
"name": "AssignReplicasToDirsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The ID of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" }
]}
]}
]}
]
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 73,
"type": "response",
"name": "AssignReplicasToDirsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [
{ "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The ID of the assigned topic" },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The partition level error code" }
]}
]}
]}
]
}

View File

@ -18,7 +18,7 @@
"type": "request",
"listeners": ["controller"],
"name": "BrokerHeartbeatRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -30,6 +30,8 @@
{ "name": "WantFence", "type": "bool", "versions": "0+",
"about": "True if the broker wants to be fenced, false otherwise." },
{ "name": "WantShutDown", "type": "bool", "versions": "0+",
"about": "True if the broker wants to be shut down, false otherwise." }
"about": "True if the broker wants to be shut down, false otherwise." },
{ "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+", "taggedVersions": "1+", "tag": "0",
"about": "Log directories that failed and went offline." }
]
}

View File

@ -17,7 +17,7 @@
"apiKey": 63,
"type": "response",
"name": "BrokerHeartbeatResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -14,14 +14,16 @@
// limitations under the License.
// Version 1 adds Zk broker epoch to the request if the broker is migrating from Zk mode to KRaft mode.
//
// Version 2 adds the PreviousBrokerEpoch for the KIP-966
// Version 3 adds LogDirs for KIP-858
{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
"validVersions": "0-2",
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -57,6 +59,8 @@
{ "name": "IsMigratingZkBroker", "type": "bool", "versions": "1+", "default": "false",
"about": "If the required configurations for ZK migration are present, this value is set to true" },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "2+", "default": "-1",
"about": "The epoch before a clean shutdown." }
"about": "The epoch before a clean shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+",
"about": "Log directories configured in this broker which are available." }
]
}

View File

@ -20,7 +20,7 @@
"apiKey": 62,
"type": "response",
"name": "BrokerRegistrationResponse",
"validVersions": "0-2",
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -19,11 +19,14 @@ package org.apache.kafka.common;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Base64;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class UuidTest {
@ -108,4 +111,29 @@ public class UuidTest {
assertThrows(IllegalArgumentException.class, () -> Uuid.fromString(undersizeString));
}
@Test
void testToArray() {
assertNull(Uuid.toArray(null));
assertArrayEquals(
new Uuid[]{
Uuid.ZERO_UUID, Uuid.fromString("UXyU9i5ARn6W00ON2taeWA")
},
Uuid.toArray(Arrays.asList(
Uuid.ZERO_UUID, Uuid.fromString("UXyU9i5ARn6W00ON2taeWA")
))
);
}
@Test
void testToList() {
assertNull(Uuid.toList(null));
assertEquals(
Arrays.asList(
Uuid.ZERO_UUID, Uuid.fromString("UXyU9i5ARn6W00ON2taeWA")
),
Uuid.toList(new Uuid[]{
Uuid.ZERO_UUID, Uuid.fromString("UXyU9i5ARn6W00ON2taeWA")
})
);
}
}

View File

@ -105,7 +105,8 @@ public class ApiMessageTypeTest {
for (ApiMessageType type : ApiMessageType.values()) {
assertEquals(0, type.lowestSupportedVersion());
assertEquals(type.requestSchemas().length, type.responseSchemas().length);
assertEquals(type.requestSchemas().length, type.responseSchemas().length,
"request and response schemas must be the same length for " + type.name());
for (Schema schema : type.requestSchemas())
assertNotNull(schema);
for (Schema schema : type.responseSchemas())

View File

@ -61,6 +61,8 @@ import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
@ -1071,6 +1073,7 @@ public class RequestResponseTest {
case CONTROLLER_REGISTRATION: return createControllerRegistrationRequest(version);
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsRequest(version);
case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1150,6 +1153,7 @@ public class RequestResponseTest {
case CONTROLLER_REGISTRATION: return createControllerRegistrationResponse();
case GET_TELEMETRY_SUBSCRIPTIONS: return createGetTelemetrySubscriptionsResponse();
case PUSH_TELEMETRY: return createPushTelemetryResponse();
case ASSIGN_REPLICAS_TO_DIRS: return createAssignReplicasToDirsResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1178,6 +1182,71 @@ public class RequestResponseTest {
return new ConsumerGroupDescribeResponse(data);
}
private AssignReplicasToDirsRequest createAssignReplicasToDirsRequest(short version) {
AssignReplicasToDirsRequestData data = new AssignReplicasToDirsRequestData()
.setBrokerId(1)
.setBrokerEpoch(123L)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(Uuid.fromString("qo0Pcp70TdGnAa7YKMKCqw"))
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(8)
))
)),
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(Uuid.fromString("yEu11V7HTRGIwm6FDWFhzg"))
.setPartitions(Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(2),
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(80)
))
))
));
return new AssignReplicasToDirsRequest.Builder(data).build(version);
}
private AssignReplicasToDirsResponse createAssignReplicasToDirsResponse() {
AssignReplicasToDirsResponseData data = new AssignReplicasToDirsResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(123)
.setDirectories(Arrays.asList(
new AssignReplicasToDirsResponseData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(Arrays.asList(
new AssignReplicasToDirsResponseData.TopicData()
.setTopicId(Uuid.fromString("sKhZV8LnTA275KvByB9bVg"))
.setPartitions(Arrays.asList(
new AssignReplicasToDirsResponseData.PartitionData()
.setPartitionIndex(8)
.setErrorCode(Errors.NONE.code())
))
)),
new AssignReplicasToDirsResponseData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(Arrays.asList(
new AssignReplicasToDirsResponseData.TopicData()
.setTopicId(Uuid.fromString("ORLP5NEzRo64SvKq1hIVQg"))
.setPartitions(Arrays.asList(
new AssignReplicasToDirsResponseData.PartitionData()
.setPartitionIndex(2)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()),
new AssignReplicasToDirsResponseData.PartitionData()
.setPartitionIndex(8)
.setErrorCode(Errors.NONE.code())
))
))
));
return new AssignReplicasToDirsResponse(data);
}
private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
.setGroupId("group")

View File

@ -100,6 +100,7 @@ object RequestConvertToJson {
case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
case req: ConsumerGroupDescribeRequest => ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
case req: ControllerRegistrationRequest => ControllerRegistrationRequestDataJsonConverter.write(req.data, request.version)
case req: AssignReplicasToDirsRequest => AssignReplicasToDirsRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so.");
}
@ -180,6 +181,7 @@ object RequestConvertToJson {
case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
case res: ConsumerGroupDescribeResponse => ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version)
case req: ControllerRegistrationResponse => ControllerRegistrationResponseDataJsonConverter.write(req.data, version)
case res: AssignReplicasToDirsResponse => AssignReplicasToDirsResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}

View File

@ -128,6 +128,7 @@ class ControllerApis(
case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.CONTROLLER_REGISTRATION => handleControllerRegistration(request)
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS => handleAssignReplicasToDirs(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
@ -1074,4 +1075,13 @@ class ControllerApis(
new DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs)))
CompletableFuture.completedFuture[Unit](())
}
def handleAssignReplicasToDirs(request: RequestChannel.Request): CompletableFuture[Unit] = {
val assignReplicasToDirsRequest = request.body[AssignReplicasToDirsRequest]
// TODO KAFKA-15426
requestHelper.sendMaybeThrottle(request,
assignReplicasToDirsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
}

View File

@ -404,7 +404,7 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
val results = response.data.results
assertEquals(1, results.size)
checkAllErrorsAlteringCredentials(results, Errors.UNSUPPORTED_VERSION,
"when altering the credentials on unsupported IPB version")
"when altering the credentials on unsupported IBP version")
assertEquals("The current metadata version does not support SCRAM", results.get(0).errorMessage)
}

View File

@ -1095,6 +1095,34 @@ class ControllerApisTest {
assertEquals(Errors.NOT_CONTROLLER, response.error)
}
@Test
def testAssignReplicasToDirsReturnsUnsupportedVersion(): Unit = {
val controller = mock(classOf[Controller])
val controllerApis = createControllerApis(None, controller)
val request =
new AssignReplicasToDirsRequest.Builder(
new AssignReplicasToDirsRequestData()
.setBrokerId(1)
.setBrokerEpoch(123L)
.setDirectories(util.Arrays.asList(
new AssignReplicasToDirsRequestData.DirectoryData()
.setId(Uuid.randomUuid())
.setTopics(util.Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData()
.setTopicId(Uuid.fromString("pcPTaiQfRXyZG88kO9k2aA"))
.setPartitions(util.Arrays.asList(
new AssignReplicasToDirsRequestData.PartitionData()
.setPartitionIndex(8)
))
))
))).build()
val expectedResponse = new AssignReplicasToDirsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
val response = handleRequest[AssignReplicasToDirsResponse](request, controllerApis)
assertEquals(expectedResponse, response.data)
}
private def handleRequest[T <: AbstractResponse](
request: AbstractRequest,
controllerApis: ControllerApis

View File

@ -719,6 +719,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.PUSH_TELEMETRY =>
new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true)
case ApiKeys.ASSIGN_REPLICAS_TO_DIRS =>
new AssignReplicasToDirsRequest.Builder(new AssignReplicasToDirsRequestData())
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
/**
* A record was encountered where the number of directories does not match the number of replicas.
*/
public class InvalidReplicaDirectoriesException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
public InvalidReplicaDirectoriesException(PartitionRecord record) {
super("The lengths for replicas and directories do not match: " + record);
}
public InvalidReplicaDirectoriesException(PartitionChangeRecord record) {
super("The lengths for replicas and directories do not match: " + record);
}
}

View File

@ -26,6 +26,7 @@ import java.util.Set;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@ -54,6 +55,7 @@ public class PartitionChangeBuilder {
if (record.removingReplicas() != null) return false;
if (record.addingReplicas() != null) return false;
if (record.leaderRecoveryState() != LeaderRecoveryState.NO_CHANGE) return false;
if (record.directories() != null) return false;
return true;
}
@ -382,6 +384,9 @@ public class PartitionChangeBuilder {
private void setAssignmentChanges(PartitionChangeRecord record) {
if (!targetReplicas.isEmpty() && !targetReplicas.equals(Replicas.toList(partition.replicas))) {
if (metadataVersion.isDirectoryAssignmentSupported()) {
record.setDirectories(DirectoryId.createDirectoriesFrom(partition.replicas, partition.directories, targetReplicas));
}
record.setReplicas(targetReplicas);
}
if (!targetRemoving.equals(Replicas.toList(partition.removingReplicas))) {

View File

@ -77,6 +77,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
@ -791,7 +792,9 @@ public class ReplicationControlManager {
for (Entry<Integer, PartitionRegistration> partEntry : newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
records.add(info.toRecord(topicId, partitionIndex, featureControl.metadataVersion().partitionRecordVersion()));
records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
build()));
}
return ApiError.NONE;
}
@ -1697,7 +1700,9 @@ public class ReplicationControlManager {
" time(s): All brokers are currently fenced or in controlled shutdown.");
}
records.add(buildPartitionRegistration(replicas, isr)
.toRecord(topicId, partitionId, featureControl.metadataVersion().partitionRecordVersion()));
.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(featureControl.metadataVersion()).
build()));
partitionId++;
}
}

View File

@ -68,7 +68,7 @@ public final class TopicImage {
for (Entry<Integer, PartitionRegistration> entry : partitions.entrySet()) {
int partitionId = entry.getKey();
PartitionRegistration partition = entry.getValue();
writer.write(partition.toRecord(id, partitionId, options.metadataVersion().partitionRecordVersion()));
writer.write(partition.toRecord(id, partitionId, options));
}
}

View File

@ -17,15 +17,19 @@
package org.apache.kafka.metadata;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidReplicaDirectoriesException;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@ -39,6 +43,7 @@ public class PartitionRegistration {
*/
static public class Builder {
private int[] replicas;
private Uuid[] directories;
private int[] isr;
private int[] removingReplicas = Replicas.NONE;
private int[] addingReplicas = Replicas.NONE;
@ -54,6 +59,11 @@ public class PartitionRegistration {
return this;
}
public Builder setDirectories(Uuid[] directories) {
this.directories = directories;
return this;
}
public Builder setIsr(int[] isr) {
this.isr = isr;
return this;
@ -102,6 +112,8 @@ public class PartitionRegistration {
public PartitionRegistration build() {
if (replicas == null) {
throw new IllegalStateException("You must set replicas.");
} else if (directories != null && directories.length != replicas.length) {
throw new IllegalStateException("The lengths for replicas and directories do not match.");
} else if (isr == null) {
throw new IllegalStateException("You must set isr.");
} else if (removingReplicas == null) {
@ -124,6 +136,7 @@ public class PartitionRegistration {
return new PartitionRegistration(
replicas,
directories,
isr,
removingReplicas,
addingReplicas,
@ -138,6 +151,7 @@ public class PartitionRegistration {
}
public final int[] replicas;
public final Uuid[] directories;
public final int[] isr;
public final int[] removingReplicas;
public final int[] addingReplicas;
@ -152,8 +166,23 @@ public class PartitionRegistration {
return newLeader == NO_LEADER || Replicas.contains(isr, newLeader);
}
private static List<Uuid> checkDirectories(PartitionRecord record) {
if (record.directories() != null && !record.directories().isEmpty() && record.replicas().size() != record.directories().size()) {
throw new InvalidReplicaDirectoriesException(record);
}
return record.directories();
}
private static List<Uuid> checkDirectories(PartitionChangeRecord record) {
if (record.replicas() != null && record.directories() != null && !record.directories().isEmpty() && record.replicas().size() != record.directories().size()) {
throw new InvalidReplicaDirectoriesException(record);
}
return record.directories();
}
public PartitionRegistration(PartitionRecord record) {
this(Replicas.toArray(record.replicas()),
Uuid.toArray(checkDirectories(record)),
Replicas.toArray(record.isr()),
Replicas.toArray(record.removingReplicas()),
Replicas.toArray(record.addingReplicas()),
@ -165,10 +194,11 @@ public class PartitionRegistration {
Replicas.toArray(record.lastKnownELR()));
}
private PartitionRegistration(int[] replicas, int[] isr, int[] removingReplicas,
private PartitionRegistration(int[] replicas, Uuid[] directories, int[] isr, int[] removingReplicas,
int[] addingReplicas, int leader, LeaderRecoveryState leaderRecoveryState,
int leaderEpoch, int partitionEpoch, int[] elr, int[] lastKnownElr) {
this.replicas = replicas;
this.directories = directories != null && directories.length > 0 ? directories : DirectoryId.unassignedArray(replicas.length);
this.isr = isr;
this.removingReplicas = removingReplicas;
this.addingReplicas = addingReplicas;
@ -185,6 +215,8 @@ public class PartitionRegistration {
public PartitionRegistration merge(PartitionChangeRecord record) {
int[] newReplicas = (record.replicas() == null) ?
replicas : Replicas.toArray(record.replicas());
Uuid[] newDirectories = (record.directories() == null) ?
directories : Uuid.toArray(checkDirectories(record));
int[] newIsr = (record.isr() == null) ? isr : Replicas.toArray(record.isr());
int[] newRemovingReplicas = (record.removingReplicas() == null) ?
removingReplicas : Replicas.toArray(record.removingReplicas());
@ -206,6 +238,7 @@ public class PartitionRegistration {
int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : Replicas.toArray(record.eligibleLeaderReplicas());
int[] newLastKnownElr = (record.lastKnownELR() == null) ? lastKnownElr : Replicas.toArray(record.lastKnownELR());
return new PartitionRegistration(newReplicas,
newDirectories,
newIsr,
newRemovingReplicas,
newAddingReplicas,
@ -226,6 +259,12 @@ public class PartitionRegistration {
append(" -> ").append(Arrays.toString(replicas));
prefix = ", ";
}
if (!Arrays.equals(directories, prev.directories)) {
builder.append(prefix).append("directories: ").
append(Arrays.toString(prev.directories)).
append(" -> ").append(Arrays.toString(directories));
prefix = ", ";
}
if (!Arrays.equals(isr, prev.isr)) {
builder.append(prefix).append("isr: ").
append(Arrays.toString(prev.isr)).
@ -298,7 +337,7 @@ public class PartitionRegistration {
return replicas.length == 0 ? LeaderConstants.NO_LEADER : replicas[0];
}
public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, short version) {
public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterOptions options) {
PartitionRecord record = new PartitionRecord().
setPartitionId(partitionId).
setTopicId(topicId).
@ -310,11 +349,21 @@ public class PartitionRegistration {
setLeaderRecoveryState(leaderRecoveryState.value()).
setLeaderEpoch(leaderEpoch).
setPartitionEpoch(partitionEpoch);
if (version > 0) {
if (options.metadataVersion().isElrSupported()) {
record.setEligibleLeaderReplicas(Replicas.toList(elr)).
setLastKnownELR(Replicas.toList(lastKnownElr));
}
return new ApiMessageAndVersion(record, version);
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
for (int i = 0; i < directories.length; i++) {
if (!DirectoryId.UNASSIGNED.equals(directories[i])) {
options.handleLoss("the directory assignment state of one or more replicas");
break;
}
}
}
return new ApiMessageAndVersion(record, options.metadataVersion().partitionRecordVersion());
}
public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp,

View File

@ -13,11 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Version 1 adds InControlledShutdown
// Version 2 adds LogDirs
{
"apiKey": 17,
"type": "metadata",
"name": "BrokerRegistrationChangeRecord",
"validVersions": "0-1",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -27,6 +29,8 @@
{ "name": "Fenced", "type": "int8", "versions": "0+", "taggedVersions": "0+", "tag": 0,
"about": "-1 if the broker has been unfenced, 0 if no change, 1 if the broker has been fenced." },
{ "name": "InControlledShutdown", "type": "int8", "versions": "1+", "taggedVersions": "1+", "tag": 1,
"about": "0 if no change, 1 if the broker is in controlled shutdown." }
"about": "0 if no change, 1 if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+", "taggedVersions": "2+", "tag": "2",
"about": "Log directories configured in this broker which are available." }
]
}

View File

@ -17,9 +17,9 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
"validVersions": "0-1",
// Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
//
// Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
// Version 2 adds Directories for KIP-858
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@ -48,6 +48,9 @@
"about": "null if the ELR didn't change; the new eligible leader replicas otherwise." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 7,
"about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." }
"about": "null if the LastKnownELR didn't change; the last known eligible leader replicas otherwise." },
{ "name": "Directories", "type": "[]uuid", "default": "null",
"versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", "tag": 8,
"about": "null if the log dirs didn't change; the new log directory for each replica otherwise."}
]
}

View File

@ -17,9 +17,9 @@
"apiKey": 3,
"type": "metadata",
"name": "PartitionRecord",
"validVersions": "0-1",
// Version 1 implements Eligiable Leader Replicas and LastKnownELR as described in KIP-966.
//
// Version 1 implements Eligible Leader Replicas and LastKnownELR as described in KIP-966.
// Version 2 adds Directories for KIP-858
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "PartitionId", "type": "int32", "versions": "0+", "default": "-1",
@ -47,6 +47,8 @@
"about": "The eligible leader replicas of this partition." },
{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId",
"versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2,
"about": "The last known eligible leader replicas of this partition." }
"about": "The last known eligible leader replicas of this partition." },
{ "name": "Directories", "type": "[]uuid", "versions": "2+",
"about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."}
]
}

View File

@ -13,11 +13,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// Version 1 adds InControlledShutdown
// Version 2 adds IsMigratingZkBroker
// Version 3 adds LogDirs
{
"apiKey": 0,
"type": "metadata",
"name": "RegisterBrokerRecord",
"validVersions": "0-2",
"validVersions": "0-3",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
@ -53,6 +56,8 @@
{ "name": "Fenced", "type": "bool", "versions": "0+", "default": "true",
"about": "True if the broker is fenced." },
{ "name": "InControlledShutdown", "type": "bool", "versions": "1+", "default": "false",
"about": "True if the broker is in controlled shutdown." }
"about": "True if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+", "taggedVersions": "3+", "tag": "0",
"about": "Log directories configured in this broker which are available." }
]
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.controller;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.protocol.types.TaggedFields;
@ -33,9 +34,11 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
@ -49,6 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.when;
@Timeout(value = 40)
@ -63,7 +67,7 @@ public class PartitionChangeBuilderTest {
* to update changeRecordIsNoOp to take into account the new schema or tagged fields.
*/
// Check that the supported versions haven't changed
assertEquals(1, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(2, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, PartitionChangeRecord.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) PartitionChangeRecord.SCHEMA_0.get(2).def.type;
@ -87,10 +91,20 @@ public class PartitionChangeBuilderTest {
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
)
);
assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setDirectories(Arrays.asList(
Uuid.fromString("5JwD0VNXRV2Wr9CCON38Tw"),
Uuid.fromString("zpL1bRzTQXmmgdxlLHOWuw"),
Uuid.fromString("6iGUpAkHQXC6bY0FTcPRDw")
))));
}
private static final PartitionRegistration FOO = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("dpdvA5AZSWySmnPFTnu5Kw"),
Uuid.fromString("V60B3cglScq3Xk8BX1NxAQ"),
DirectoryId.UNASSIGNED,
}).
setIsr(new int[] {2, 1, 3}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@ -100,8 +114,16 @@ public class PartitionChangeBuilderTest {
private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
// TODO remove this after after MetadataVersion bump for KIP-858
private static MetadataVersion metadataVersionForDirAssignmentInfo() {
MetadataVersion metadataVersion = Mockito.spy(MetadataVersion.latest());
when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
return metadataVersion;
}
private static MetadataVersion metadataVersionForPartitionChangeRecordVersion(short version) {
return isElrEnabled(version) ? MetadataVersion.IBP_3_7_IV1 : MetadataVersion.IBP_3_7_IV0;
return isDirAssignmentEnabled(version) ? metadataVersionForDirAssignmentInfo() :
isElrEnabled(version) ? MetadataVersion.IBP_3_7_IV1 : MetadataVersion.IBP_3_7_IV0;
}
private static PartitionChangeBuilder createFooBuilder(MetadataVersion metadataVersion) {
@ -114,6 +136,12 @@ public class PartitionChangeBuilderTest {
private static final PartitionRegistration BAR = new PartitionRegistration.Builder().
setReplicas(new int[] {1, 2, 3, 4}).
setDirectories(new Uuid[] {
DirectoryId.UNASSIGNED,
Uuid.fromString("X5FnAcIgTheWgTMzeO5WHw"),
Uuid.fromString("GtrcdoSOTm2vFMGFeZq0eg"),
Uuid.fromString("YcOqPw5ARmeKr1y9W3AkFw"),
}).
setIsr(new int[] {1, 2, 3}).
setRemovingReplicas(new int[] {1}).
setAddingReplicas(new int[] {4}).
@ -129,12 +157,21 @@ public class PartitionChangeBuilderTest {
return partitionChangeRecordVersion > 0;
}
private static boolean isDirAssignmentEnabled(short partitionChangeRecordVersion) {
return partitionChangeRecordVersion > 1;
}
private static PartitionChangeBuilder createBarBuilder(short version) {
return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, metadataVersionForPartitionChangeRecordVersion(version), 2).setEligibleLeaderReplicasEnabled(isElrEnabled(version));
}
private static final PartitionRegistration BAZ = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[] {
Uuid.fromString("ywnfFpTBTbOsFdZ6uAdOmw"),
Uuid.fromString("Th0x70ecRbWvZNNV33jyRA"),
Uuid.fromString("j216tuSoQsC9JFd1Z5ZP6w"),
}).
setIsr(new int[] {1, 3}).
setLeader(3).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@ -150,6 +187,11 @@ public class PartitionChangeBuilderTest {
private static final PartitionRegistration OFFLINE = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[]{
Uuid.fromString("iYGgiDV5Sb2EtH6hbgYnCA"),
Uuid.fromString("XI2t4qAUSkGlLZSKeEVf8g"),
Uuid.fromString("eqRW24kIRlitzQFzmovE0Q")
}).
setIsr(new int[] {3}).
setLeader(-1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@ -343,26 +385,34 @@ public class PartitionChangeBuilderTest {
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testReassignmentRearrangesReplicas(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(3, 2, 1)),
version)),
setReplicas(Arrays.asList(3, 2, 1));
if (version > 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(3), dirs.get(2), dirs.get(1)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).setTargetReplicas(Arrays.asList(3, 2, 1)).build());
}
@ParameterizedTest
@MethodSource("partitionChangeRecordVersions")
public void testIsrEnlargementCompletesReassignment(short version) {
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(2, 3, 4)).
setIsr(Arrays.asList(2, 3, 4)).
setLeader(2).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
version)),
setAddingReplicas(Collections.emptyList());
if (version > 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(2), dirs.get(3), dirs.get(4)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createBarBuilder(version).setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(Arrays.asList(1, 2, 3, 4))).build());
}
@ -372,14 +422,18 @@ public class PartitionChangeBuilderTest {
PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR);
assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
assertEquals(Arrays.asList(1, 2, 3), revert.isr());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(BAR_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3)).
setLeader(1).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()),
version)),
setAddingReplicas(Collections.emptyList());
if (version > 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(BAR.replicas, BAR.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createBarBuilder(version).
setTargetReplicas(revert.replicas()).
setTargetIsrWithBrokerStates(AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(revert.isr())).
@ -396,13 +450,17 @@ public class PartitionChangeBuilderTest {
assertEquals(Collections.singletonList(3), replicas.removing());
assertEquals(Collections.emptyList(), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3), replicas.replicas());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2)).
setIsr(Arrays.asList(2, 1)).
setLeader(1),
version)),
setLeader(1);
if (version > 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2)));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetRemoving(replicas.removing()).
@ -417,12 +475,16 @@ public class PartitionChangeBuilderTest {
assertEquals(Collections.emptyList(), replicas.removing());
assertEquals(Collections.singletonList(4), replicas.adding());
assertEquals(Arrays.asList(1, 2, 3, 4), replicas.replicas());
assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord().
PartitionChangeRecord expectedRecord = new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setReplicas(Arrays.asList(1, 2, 3, 4)).
setAddingReplicas(Collections.singletonList(4)),
version)),
setAddingReplicas(Collections.singletonList(4));
if (version > 1) {
Map<Integer, Uuid> dirs = DirectoryId.createAssignmentMap(FOO.replicas, FOO.directories);
expectedRecord.setDirectories(Arrays.asList(dirs.get(1), dirs.get(2), dirs.get(3), DirectoryId.UNASSIGNED));
}
assertEquals(Optional.of(new ApiMessageAndVersion(expectedRecord, version)),
createFooBuilder(version).
setTargetReplicas(replicas.replicas()).
setTargetAdding(replicas.adding()).
@ -485,6 +547,11 @@ public class PartitionChangeBuilderTest {
LeaderRecoveryState recoveryState = LeaderRecoveryState.RECOVERING;
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {leaderId, leaderId + 1, leaderId + 2}).
setDirectories(new Uuid[] {
Uuid.fromString("1sF6XXLkSN2LtDums7CJ8Q"),
Uuid.fromString("iaBBVsoHQR6NDKXwliKMqw"),
Uuid.fromString("sHaBwjdrR2S3bL4E1RKC8Q")
}).
setIsr(new int[] {leaderId}).
setLeader(leaderId).
setLeaderRecoveryState(recoveryState).
@ -550,6 +617,11 @@ public class PartitionChangeBuilderTest {
int leaderId = 1;
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {leaderId, leaderId + 1, leaderId + 2}).
setDirectories(new Uuid[] {
Uuid.fromString("uYpxts0pS4K4bk5XOoXB4g"),
Uuid.fromString("kS6fHEqwRYucduWkmvsevw"),
Uuid.fromString("De9RqRThQRGjKg3i3yzUxA")
}).
setIsr(new int[] {leaderId + 1, leaderId + 2}).
setLeader(NO_LEADER).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@ -601,6 +673,12 @@ public class PartitionChangeBuilderTest {
public void testStoppedLeaderIsDemotedAfterReassignmentCompletesEvenIfNoNewEligibleLeaders() {
// Set up PartitionRegistration as if there's an ongoing reassignment from [0, 1] to [2, 3]
int[] replicas = new int[] {2, 3, 0, 1};
Uuid[] directories = {
Uuid.fromString("XCBQClkBSZyphD87QUXzDA"),
Uuid.fromString("Or2Rp9tTQOSVuy12hsfmTA"),
Uuid.fromString("pThsodMNSwGvljTfc1RNVQ"),
Uuid.fromString("d8CGoNJmS5mJdF20tc8P7g")
};
// The ISR starts off with the old replicas
int[] isr = new int[] {0, 1};
// We're removing [0, 1]
@ -614,6 +692,7 @@ public class PartitionChangeBuilderTest {
int partitionEpoch = 0;
PartitionRegistration part = new PartitionRegistration.Builder().
setReplicas(replicas).
setDirectories(directories).
setIsr(isr).
setRemovingReplicas(removingReplicas).
setAddingReplicas(addingReplicas).
@ -668,6 +747,12 @@ public class PartitionChangeBuilderTest {
public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[] {
Uuid.fromString("NeQeLdHhSXi4tQGaFcszKA"),
Uuid.fromString("LsVrQZ73RSSuEWA8hhqQhg"),
Uuid.fromString("0IaY4zXKRR6sROgE8yHfnw"),
Uuid.fromString("1WxphfLCSZqMHKK4JMppuw")
})
.setIsr(new int[] {1, 2, 3, 4})
.setLeader(1)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
@ -711,6 +796,12 @@ public class PartitionChangeBuilderTest {
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("CWgRKBKkToGn1HKzNb2qqQ"),
Uuid.fromString("SCnk7zfSQSmlKqvV702d3A"),
Uuid.fromString("9tO0QHlJRhimjKfH8m9d8A"),
Uuid.fromString("JaaqVOxNT2OGVNCCIFA2JQ")
})
.setIsr(new int[] {1, 2})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {4})
@ -751,6 +842,12 @@ public class PartitionChangeBuilderTest {
public void testEligibleLeaderReplicas_IsrAddNewMemberNotInELR(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[]{
Uuid.fromString("gPcIwlldQXikdUB3F4GB6w"),
Uuid.fromString("gFs7V8mKR66z8e5qwtjIMA"),
Uuid.fromString("zKHU2fwrRkuypqTgITl46g"),
Uuid.fromString("zEgmBBh8QJGqbBIvzvH7JA")
})
.setIsr(new int[] {1})
.setElr(new int[] {3})
.setLastKnownElr(new int[] {2})
@ -797,6 +894,12 @@ public class PartitionChangeBuilderTest {
public void testEligibleLeaderReplicas_RemoveUncleanShutdownReplicasFromElr(short version) {
PartitionRegistration partition = new PartitionRegistration.Builder()
.setReplicas(new int[] {1, 2, 3, 4})
.setDirectories(new Uuid[] {
Uuid.fromString("keB9ssIPRlibyVJT5FcBVA"),
Uuid.fromString("FhezfoReTSmHoKxi8wOIOg"),
Uuid.fromString("QHtFxu8LShm6RiyAP6PxYg"),
Uuid.fromString("tUJOMtvMQkGga30ydluvbQ")
})
.setIsr(new int[] {1})
.setElr(new int[] {2, 3})
.setLastKnownElr(new int[] {})
@ -838,4 +941,38 @@ public class PartitionChangeBuilderTest {
assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), partition.toString());
}
}
@Test
void testKeepsDirectoriesAfterReassignment() {
PartitionRegistration registration = new PartitionRegistration.Builder().
setReplicas(new int[] {2, 1, 3}).
setDirectories(new Uuid[] {
Uuid.fromString("v1PVrX6uS5m8CByXlLfmWg"),
Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q")
}).
setIsr(new int[] {2, 1, 3}).
setLeader(1).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
setLeaderEpoch(100).
setPartitionEpoch(200).
build();
Optional<ApiMessageAndVersion> built = new PartitionChangeBuilder(registration, FOO_ID,
0, r -> true, metadataVersionForDirAssignmentInfo(), 2).
setTargetReplicas(Arrays.asList(3, 1, 4)).build();
Optional<ApiMessageAndVersion> expected = Optional.of(new ApiMessageAndVersion(
new PartitionChangeRecord().
setTopicId(FOO_ID).
setPartitionId(0).
setLeader(1).
setReplicas(Arrays.asList(3, 1, 4)).
setDirectories(Arrays.asList(
Uuid.fromString("fM5NKyWTQHqEihjIkUl99Q"),
Uuid.fromString("iU2znv45Q9yQkOpkTSy3jA"),
DirectoryId.UNASSIGNED
)),
(short) 2
));
assertEquals(expected, built);
}
}

View File

@ -27,8 +27,10 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.image.TopicDelta;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import static org.apache.kafka.controller.metrics.ControllerMetricsTestUtils.FakePartitionRegistrationType.NORMAL;
@ -158,17 +160,19 @@ public class ControllerMetricsChangesTest {
static final TopicDelta TOPIC_DELTA2;
static {
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build(); // highest MV for PartitionRecord v0
TOPIC_DELTA1 = new TopicDelta(new TopicImage("foo", FOO_ID, Collections.emptyMap()));
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 0, (short) 0).message());
toRecord(FOO_ID, 0, options).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 1, (short) 0).message());
toRecord(FOO_ID, 1, options).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 2, (short) 0).message());
toRecord(FOO_ID, 2, options).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 3, (short) 0).message());
toRecord(FOO_ID, 3, options).message());
TOPIC_DELTA1.replay((PartitionRecord) fakePartitionRegistration(NON_PREFERRED_LEADER).
toRecord(FOO_ID, 4, (short) 0).message());
toRecord(FOO_ID, 4, options).message());
TOPIC_DELTA2 = new TopicDelta(TOPIC_DELTA1.apply());
TOPIC_DELTA2.replay(new PartitionChangeRecord().
@ -176,7 +180,7 @@ public class ControllerMetricsChangesTest {
setTopicId(FOO_ID).
setLeader(1));
TOPIC_DELTA2.replay((PartitionRecord) fakePartitionRegistration(NORMAL).
toRecord(FOO_ID, 5, (short) 0).message());
toRecord(FOO_ID, 5, options).message());
}
@Test

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.image.writer.UnwritableMetadataException;
@ -33,10 +34,13 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@Timeout(value = 40)
@ -126,6 +130,31 @@ public class ImageDowngradeTest {
TEST_RECORDS.get(1)));
}
@Test
void testDirectoryAssignmentState() {
MetadataVersion outputMetadataVersion = MetadataVersion.IBP_3_7_IV0;
MetadataVersion inputMetadataVersion = spy(outputMetadataVersion); // TODO replace with actual MV after bump for KIP-858
when(inputMetadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
PartitionRecord testPartitionRecord = (PartitionRecord) TEST_RECORDS.get(1).message();
writeWithExpectedLosses(outputMetadataVersion,
Collections.singletonList("the directory assignment state of one or more replicas"),
Arrays.asList(
metadataVersionRecord(inputMetadataVersion),
TEST_RECORDS.get(0),
new ApiMessageAndVersion(
testPartitionRecord.duplicate().setDirectories(Arrays.asList(
Uuid.fromString("c7QfSi6xSIGQVh3Qd5RJxA"),
Uuid.fromString("rWaCHejCRRiptDMvW5Xw0g"))),
(short) 2)),
Arrays.asList(
metadataVersionRecord(outputMetadataVersion),
new ApiMessageAndVersion(new ZkMigrationStateRecord(), (short) 0),
TEST_RECORDS.get(0),
new ApiMessageAndVersion(
testPartitionRecord.duplicate().setDirectories(Collections.emptyList()),
(short) 0)));
}
private static void writeWithExpectedLosses(
MetadataVersion metadataVersion,
List<String> expectedLosses,

View File

@ -22,20 +22,26 @@ import net.jqwik.api.Arbitrary;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.Provide;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.UnwritableMetadataException;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -44,6 +50,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@Timeout(40)
@ -81,7 +89,8 @@ public class PartitionRegistrationTest {
setReplicas(new int[]{1, 2, 3}).setIsr(new int[]{1, 2}).setRemovingReplicas(new int[]{1}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build();
Uuid topicId = Uuid.fromString("OGdAI5nxT_m-ds3rJMqPLA");
int partitionId = 4;
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, (short) 0);
ApiMessageAndVersion record = registrationA.toRecord(topicId, partitionId, new ImageWriterOptions.Builder().
setMetadataVersion(MetadataVersion.IBP_3_7_IV0).build()); // highest MV for PartitionRecord v0
PartitionRegistration registrationB =
new PartitionRegistration((PartitionRecord) record.message());
assertEquals(registrationA, registrationB);
@ -124,6 +133,7 @@ public class PartitionRegistrationTest {
@Test
public void testMergePartitionChangeRecordWithReassignmentData() {
PartitionRegistration partition0 = new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).
setDirectories(new Uuid[]{Uuid.fromString("FbRuu7CeQtq5YFreEzg16g"), Uuid.fromString("4rtHTelWSSStAFMODOg3cQ"), Uuid.fromString("Id1WXzHURROilVxZWJNZlw")}).
setIsr(new int[] {1, 2, 3}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).build();
PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord().
setRemovingReplicas(Collections.singletonList(3)).
@ -162,6 +172,7 @@ public class PartitionRegistrationTest {
setIsr(new int[]{0}).
setRemovingReplicas(new int[]{0}).
setAddingReplicas(new int[]{0});
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> builder.build());
assertEquals("You must set leader.", exception.getMessage());
}
@ -250,6 +261,13 @@ public class PartitionRegistrationTest {
public void testPartitionRegistrationToRecord(short version) {
PartitionRegistration.Builder builder = new PartitionRegistration.Builder().
setReplicas(new int[]{0, 1, 2, 3, 4}).
setDirectories(new Uuid[]{
DirectoryId.UNASSIGNED,
Uuid.fromString("KBJBm9GVRAG9Ffe25odmmg"),
DirectoryId.LOST,
Uuid.fromString("7DZNT5qBS7yFF7VMMHS7kw"),
Uuid.fromString("cJGPUZsMSEqbidOLYLOIXg")
}).
setIsr(new int[]{0, 1}).
setLeader(0).
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).
@ -268,12 +286,35 @@ public class PartitionRegistrationTest {
setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
setLeaderEpoch(0).
setPartitionEpoch(0);
MetadataVersion metadataVersion = spy(MetadataVersion.latest());
when(metadataVersion.partitionRecordVersion()).thenReturn(version);
if (version > 0) {
expectRecord.
setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
setLastKnownELR(Arrays.asList(new Integer[]{4}));
} else {
when(metadataVersion.isElrSupported()).thenReturn(false);
}
if (version > 1) {
expectRecord.setDirectories(Arrays.asList(
DirectoryId.UNASSIGNED,
Uuid.fromString("KBJBm9GVRAG9Ffe25odmmg"),
DirectoryId.LOST,
Uuid.fromString("7DZNT5qBS7yFF7VMMHS7kw"),
Uuid.fromString("cJGPUZsMSEqbidOLYLOIXg")
));
when(metadataVersion.isDirectoryAssignmentSupported()).thenReturn(true);
}
List<UnwritableMetadataException> exceptions = new ArrayList<>();
ImageWriterOptions options = new ImageWriterOptions.Builder().
setMetadataVersion(metadataVersion).
setLossHandler(exceptions::add).
build();
assertEquals(new ApiMessageAndVersion(expectRecord, version), partitionRegistration.toRecord(topicID, 0, options));
if (version < 2) {
assertTrue(exceptions.stream().
anyMatch(e -> e.getMessage().contains("the directory assignment state of one or more replicas")));
}
assertEquals(new ApiMessageAndVersion(expectRecord, version), partitionRegistration.toRecord(topicID, 0, version));
assertEquals(Replicas.toList(Replicas.NONE), Replicas.toList(partitionRegistration.addingReplicas));
}
@ -313,5 +354,7 @@ public class PartitionRegistrationTest {
new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3, 4, 5, 6}).setIsr(new int[] {1, 2, 3}).setAddingReplicas(new int[] {4, 5, 6}).
setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build()
);
}
}

View File

@ -16,24 +16,29 @@
*/
package org.apache.kafka.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DirectoryId {
/**
* A UUID that is used to identify new or unknown dir assignments.
* A Uuid that is used to identify new or unknown dir assignments.
*/
public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
/**
* A UUID that is used to represent unspecified offline dirs.
* A Uuid that is used to represent unspecified offline dirs.
*/
public static final Uuid LOST = new Uuid(0L, 1L);
/**
* A UUID that is used to represent and unspecified log directory,
* A Uuid that is used to represent and unspecified log directory,
* that is expected to have been previously selected to host an
* associated replica. This contrasts with {@code UNASSIGNED_DIR},
* which is associated with (typically new) replicas that may not
@ -47,8 +52,8 @@ public class DirectoryId {
public static final Set<Uuid> RESERVED;
static {
HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
// The first 100 UUIDs are reserved for future use.
HashSet<Uuid> reserved = new HashSet<>();
// The first 100 Uuids are reserved for future use.
for (long i = 0L; i < 100L; i++) {
reserved.add(new Uuid(0L, i));
}
@ -67,4 +72,59 @@ public class DirectoryId {
}
return uuid;
}
/**
* Calculate the new directory information based on an existing replica assignment.
* Replicas for which there already is a directory ID keep the same directory.
* All other replicas get {@link #UNASSIGNED}.
* @param currentReplicas The current replicas, represented by the broker IDs
* @param currentDirectories The current directory information
* @param newReplicas The new replica list
* @return The new directory list
* @throws IllegalArgumentException If currentReplicas and currentDirectories have different lengths,
* or if there are duplicate broker IDs in the replica lists
*/
public static List<Uuid> createDirectoriesFrom(int[] currentReplicas, Uuid[] currentDirectories, List<Integer> newReplicas) {
if (currentReplicas == null) currentReplicas = new int[0];
if (currentDirectories == null) currentDirectories = new Uuid[0];
Map<Integer, Uuid> assignments = createAssignmentMap(currentReplicas, currentDirectories);
List<Uuid> consolidated = new ArrayList<>(newReplicas.size());
for (int newReplica : newReplicas) {
Uuid newDirectory = assignments.getOrDefault(newReplica, UNASSIGNED);
consolidated.add(newDirectory);
}
return consolidated;
}
/**
* Build a mapping from replica to directory based on two lists of the same size and order.
* @param replicas The replicas, represented by the broker IDs
* @param directories The directory information
* @return A map, linking each replica to its assigned directory
* @throws IllegalArgumentException If replicas and directories have different lengths,
* or if there are duplicate broker IDs in the replica list
*/
public static Map<Integer, Uuid> createAssignmentMap(int[] replicas, Uuid[] directories) {
if (replicas.length != directories.length) {
throw new IllegalArgumentException("The lengths for replicas and directories do not match.");
}
Map<Integer, Uuid> assignments = new HashMap<>();
for (int i = 0; i < replicas.length; i++) {
int brokerId = replicas[i];
Uuid directory = directories[i];
if (assignments.put(brokerId, directory) != null) {
throw new IllegalArgumentException("Duplicate broker ID in assignment");
}
}
return assignments;
}
/**
* Create an array with the specified number of entries set to {@link #UNASSIGNED}.
*/
public static Uuid[] unassignedArray(int length) {
Uuid[] array = new Uuid[length];
Arrays.fill(array, UNASSIGNED);
return array;
}
}

View File

@ -293,6 +293,10 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_7_IV1);
}
public boolean isDirectoryAssignmentSupported() {
return false; // TODO: Bump IBP for JBOD support in KRaft
}
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}
@ -320,7 +324,10 @@ public enum MetadataVersion {
}
public short registerBrokerRecordVersion() {
if (isMigrationSupported()) {
if (isDirectoryAssignmentSupported()) {
// new logDirs field
return (short) 3;
} else if (isMigrationSupported()) {
// new isMigrationZkBroker field
return (short) 2;
} else if (isInControlledShutdownStateSupported()) {
@ -344,7 +351,9 @@ public enum MetadataVersion {
}
public short partitionChangeRecordVersion() {
if (isElrSupported()) {
if (isDirectoryAssignmentSupported()) {
return (short) 2;
} else if (isElrSupported()) {
return (short) 1;
} else {
return (short) 0;
@ -352,7 +361,9 @@ public enum MetadataVersion {
}
public short partitionRecordVersion() {
if (isElrSupported()) {
if (isDirectoryAssignmentSupported()) {
return (short) 2;
} else if (isElrSupported()) {
return (short) 1;
} else {
return (short) 0;

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class DirectoryIdTest {
@Test
void testReserved() {
Set<Long> seen = new HashSet<>(100);
for (Uuid reservedId : DirectoryId.RESERVED) {
assertEquals(0L, reservedId.getMostSignificantBits(), "Unexpected reserved msb value");
long lsb = reservedId.getLeastSignificantBits();
assertTrue(lsb >= 0 && lsb < 100L, "Unexpected reserved lsb value");
assertTrue(seen.add(lsb), "Duplicate reserved value");
}
assertEquals(100, DirectoryId.RESERVED.size());
}
@Test
void testCreateDirectoriesFrom() {
assertThrows(IllegalArgumentException.class, () -> DirectoryId.createDirectoriesFrom(
new int[] {1},
new Uuid[] {DirectoryId.UNASSIGNED, DirectoryId.LOST},
Arrays.asList(2, 3)
));
assertEquals(
Arrays.asList(
Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
Uuid.fromString("5SZij3DRQgaFbvzR9KooLg"),
DirectoryId.UNASSIGNED
),
DirectoryId.createDirectoriesFrom(
new int[] {1, 2, 3},
new Uuid[] {
Uuid.fromString("MgVK5KSwTxe65eYATaoQrg"),
Uuid.fromString("YXY0bQYEQmmyOQ6ZDfGgSQ"),
Uuid.fromString("5SZij3DRQgaFbvzR9KooLg")
},
Arrays.asList(2, 3, 4)
)
);
assertEquals(
Arrays.asList(
DirectoryId.UNASSIGNED,
DirectoryId.UNASSIGNED,
DirectoryId.UNASSIGNED
),
DirectoryId.createDirectoriesFrom(
new int[] {1, 2},
new Uuid[] {
DirectoryId.UNASSIGNED,
DirectoryId.UNASSIGNED
},
Arrays.asList(1, 2, 3)
)
);
}
@Test
void testCreateAssignmentMap() {
assertThrows(IllegalArgumentException.class,
() -> DirectoryId.createAssignmentMap(new int[]{1, 2}, DirectoryId.unassignedArray(3)));
assertEquals(
new HashMap<Integer, Uuid>() {{
put(1, Uuid.fromString("upjfkCrUR9GNn1i94ip1wg"));
put(2, Uuid.fromString("bCF3l0RIQjOKhUqgbivHZA"));
put(3, Uuid.fromString("Fg3mFhcVQlqCWRk4dZazxw"));
put(4, Uuid.fromString("bv9TEYi4TqOm52hLmrxT5w"));
}},
DirectoryId.createAssignmentMap(
new int[] {1, 2, 3, 4},
new Uuid[] {
Uuid.fromString("upjfkCrUR9GNn1i94ip1wg"),
Uuid.fromString("bCF3l0RIQjOKhUqgbivHZA"),
Uuid.fromString("Fg3mFhcVQlqCWRk4dZazxw"),
Uuid.fromString("bv9TEYi4TqOm52hLmrxT5w")
})
);
}
}