KAFKA-14996: Handle overly large user operations on the kcontroller (#13742)

Previously, if a user tried to perform an overly large batch operation on the KRaft controller
(such as creating a million topics), we would create a very large number of records in memory. Our
attempt to write these records to the Raft layer would fail, because there were too many to fit in
an atomic batch. This failure, in turn, would trigger a controller failover.

(Note: I am assuming here that no topic creation policy was in place that would prevent the
creation of a million topics. I am also assuming that the user operation must be done atomically,
which is true for all current user operations, since we have not implemented KIP-868 yet.)

With this PR, we fail immediately when the number of records we have generated exceeds the
threshold that we can apply. This failure does not generate a controller failover. We also now
fail with a PolicyViolationException rather than an UnknownServerException.

In order to implement this in a simple way, this PR adds the BoundedList class, which wraps any
list and adds a maximum length. Attempts to grow the list beyond this length cause an exception to
be thrown.

Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ijuma@apache.org>, Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Colin Patrick McCabe 2023-05-26 13:16:17 -07:00 committed by GitHub
parent 9aac5ff1fe
commit b74204fa0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 511 additions and 16 deletions

View File

@ -94,6 +94,7 @@
<allow pkg="org.apache.kafka.server.config" /> <allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" /> <allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" /> <allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.mutable" />
<allow pkg="org.apache.kafka.server.policy"/> <allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/> <allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" /> <allow pkg="org.apache.kafka.snapshot" />

View File

@ -26,6 +26,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.message.DescribeClusterRequestData import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.Errors._
@ -1076,6 +1077,35 @@ class KRaftClusterTest {
cluster.close() cluster.close()
} }
} }
@Test
def testOverlyLargeCreateTopics(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
val admin = Admin.create(cluster.clientProperties())
try {
val newTopics = new util.ArrayList[NewTopic]()
for (i <- 0 to 10000) {
newTopics.add(new NewTopic("foo" + i, 100000, 1.toShort))
}
val executionException = assertThrows(classOf[ExecutionException],
() => admin.createTopics(newTopics).all().get())
assertNotNull(executionException.getCause)
assertEquals(classOf[PolicyViolationException], executionException.getCause.getClass)
assertEquals("Unable to perform excessively large batch operation.",
executionException.getCause.getMessage)
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
} }
class BadAuthorizer() extends Authorizer { class BadAuthorizer() extends Authorizer {

View File

@ -34,6 +34,8 @@ import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult; import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult; import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet; import org.apache.kafka.timeline.TimelineHashSet;
@ -48,6 +50,8 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
/** /**
* The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic. * The AclControlManager manages any ACLs that are stored in the __cluster_metadata topic.
@ -77,7 +81,8 @@ public class AclControlManager {
ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) { ControllerResult<List<AclCreateResult>> createAcls(List<AclBinding> acls) {
List<AclCreateResult> results = new ArrayList<>(acls.size()); List<AclCreateResult> results = new ArrayList<>(acls.size());
List<ApiMessageAndVersion> records = new ArrayList<>(acls.size()); List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (AclBinding acl : acls) { for (AclBinding acl : acls) {
try { try {
validateNewAcl(acl); validateNewAcl(acl);
@ -170,6 +175,10 @@ public class AclControlManager {
deleted.add(new AclBindingDeleteResult(binding)); deleted.add(new AclBindingDeleteResult(binding));
records.add(new ApiMessageAndVersion( records.add(new ApiMessageAndVersion(
new RemoveAccessControlEntryRecord().setId(id), (short) 0)); new RemoveAccessControlEntryRecord().setId(id), (short) 0));
if (records.size() > MAX_RECORDS_PER_USER_OP) {
throw new BoundedListTooLongException("Cannot remove more than " +
MAX_RECORDS_PER_USER_OP + " acls in a single delete operation.");
}
} }
} }
return new AclDeleteResult(deleted); return new AclDeleteResult(deleted);

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashMap;
@ -43,6 +44,8 @@ import java.util.Objects;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
public class ClientQuotaControlManager { public class ClientQuotaControlManager {
private final SnapshotRegistry snapshotRegistry; private final SnapshotRegistry snapshotRegistry;
@ -64,7 +67,8 @@ public class ClientQuotaControlManager {
*/ */
ControllerResult<Map<ClientQuotaEntity, ApiError>> alterClientQuotas( ControllerResult<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations) { Collection<ClientQuotaAlteration> quotaAlterations) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<ClientQuotaEntity, ApiError> outputResults = new HashMap<>(); Map<ClientQuotaEntity, ApiError> outputResults = new HashMap<>();
quotaAlterations.forEach(quotaAlteration -> { quotaAlterations.forEach(quotaAlteration -> {

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.KafkaConfigSchema; import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
@ -49,6 +50,7 @@ import java.util.function.Consumer;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND; import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG; import static org.apache.kafka.common.protocol.Errors.INVALID_CONFIG;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
public class ConfigurationControlManager { public class ConfigurationControlManager {
@ -169,7 +171,8 @@ public class ConfigurationControlManager {
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges, Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
boolean newlyCreatedResource boolean newlyCreatedResource
) { ) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<ConfigResource, ApiError> outputResults = new HashMap<>(); Map<ConfigResource, ApiError> outputResults = new HashMap<>();
for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry : for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry :
configChanges.entrySet()) { configChanges.entrySet()) {
@ -187,7 +190,8 @@ public class ConfigurationControlManager {
Map<String, Entry<OpType, String>> keyToOps, Map<String, Entry<OpType, String>> keyToOps,
boolean newlyCreatedResource boolean newlyCreatedResource
) { ) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
ApiError apiError = incrementalAlterConfigResource(configResource, ApiError apiError = incrementalAlterConfigResource(configResource,
keyToOps, keyToOps,
newlyCreatedResource, newlyCreatedResource,
@ -316,7 +320,8 @@ public class ConfigurationControlManager {
Map<ConfigResource, Map<String, String>> newConfigs, Map<ConfigResource, Map<String, String>> newConfigs,
boolean newlyCreatedResource boolean newlyCreatedResource
) { ) {
List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<ConfigResource, ApiError> outputResults = new HashMap<>(); Map<ConfigResource, ApiError> outputResults = new HashMap<>();
for (Entry<ConfigResource, Map<String, String>> resourceEntry : for (Entry<ConfigResource, Map<String, String>> resourceEntry :
newConfigs.entrySet()) { newConfigs.entrySet()) {

View File

@ -17,7 +17,6 @@
package org.apache.kafka.controller; package org.apache.kafka.controller;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -39,12 +38,14 @@ import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineObject; import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger; import org.slf4j.Logger;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD; import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
public class FeatureControlManager { public class FeatureControlManager {
@ -147,7 +148,8 @@ public class FeatureControlManager {
boolean validateOnly boolean validateOnly
) { ) {
TreeMap<String, ApiError> results = new TreeMap<>(); TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (Entry<String, Short> entry : updates.entrySet()) { for (Entry<String, Short> entry : updates.entrySet()) {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), brokerFeatures, records)); upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), brokerFeatures, records));

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -104,6 +105,7 @@ import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.mutable.BoundedListTooLongException;
import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotReader;
@ -160,8 +162,18 @@ import static org.apache.kafka.controller.QuorumController.ControllerOperationFl
* the controller can fully initialize. * the controller can fully initialize.
*/ */
public final class QuorumController implements Controller { public final class QuorumController implements Controller {
/**
* The maximum records that the controller will write in a single batch.
*/
private final static int MAX_RECORDS_PER_BATCH = 10000; private final static int MAX_RECORDS_PER_BATCH = 10000;
/**
* The maximum records any user-initiated operation is allowed to generate.
*
* For now, this is set to the maximum records in a single batch.
*/
final static int MAX_RECORDS_PER_USER_OP = MAX_RECORDS_PER_BATCH;
/** /**
* A builder class which creates the QuorumController. * A builder class which creates the QuorumController.
*/ */
@ -457,9 +469,14 @@ public final class QuorumController implements Controller {
long endProcessingTime = time.nanoseconds(); long endProcessingTime = time.nanoseconds();
long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
if (exception instanceof ApiException) { if ((exception instanceof ApiException) ||
(exception instanceof BoundedListTooLongException)) {
log.info("{}: failed with {} in {} us. Reason: {}", name, log.info("{}: failed with {} in {} us. Reason: {}", name,
exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); exception.getClass().getSimpleName(), deltaUs, exception.getMessage());
if (exception instanceof BoundedListTooLongException) {
exception = new PolicyViolationException("Unable to perform excessively large " +
"batch operation.");
}
return exception; return exception;
} }
if (isActiveController()) { if (isActiveController()) {
@ -2205,7 +2222,8 @@ public final class QuorumController implements Controller {
} }
return appendWriteEvent("createPartitions", context.deadlineNs(), () -> { return appendWriteEvent("createPartitions", context.deadlineNs(), () -> {
final ControllerResult<List<CreatePartitionsTopicResult>> result = replicationControl.createPartitions(context, topics); final ControllerResult<List<CreatePartitionsTopicResult>> result =
replicationControl.createPartitions(context, topics);
if (validateOnly) { if (validateOnly) {
log.debug("Validate-only CreatePartitions result(s): {}", result.response()); log.debug("Validate-only CreatePartitions result(s): {}", result.response());
return result.withoutRecords(); return result.withoutRecords();

View File

@ -91,6 +91,7 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment; import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker; import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy; import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashMap;
@ -130,6 +131,7 @@ import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.PartitionReassignmentReplicas.isReassignmentInProgress; import static org.apache.kafka.controller.PartitionReassignmentReplicas.isReassignmentInProgress;
import static org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
@ -523,7 +525,7 @@ public class ReplicationControlManager {
Set<String> describable Set<String> describable
) { ) {
Map<String, ApiError> topicErrors = new HashMap<>(); Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
// Check the topic names. // Check the topic names.
validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars); validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);
@ -858,7 +860,8 @@ public class ReplicationControlManager {
ControllerResult<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) { ControllerResult<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
Map<Uuid, ApiError> results = new HashMap<>(ids.size()); Map<Uuid, ApiError> results = new HashMap<>(ids.size());
List<ApiMessageAndVersion> records = new ArrayList<>(ids.size()); List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP, ids.size());
for (Uuid id : ids) { for (Uuid id : ids) {
try { try {
deleteTopic(context, id, records); deleteTopic(context, id, records);
@ -1282,7 +1285,7 @@ public class ReplicationControlManager {
ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) { ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
ElectionType electionType = electionType(request.electionType()); ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
ElectLeadersResponseData response = new ElectLeadersResponseData(); ElectLeadersResponseData response = new ElectLeadersResponseData();
if (request.topicPartitions() == null) { if (request.topicPartitions() == null) {
// If topicPartitions is null, we try to elect a new leader for every partition. There // If topicPartitions is null, we try to elect a new leader for every partition. There
@ -1422,7 +1425,7 @@ public class ReplicationControlManager {
throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + throw new BrokerIdNotRegisteredException("Broker ID " + brokerId +
" is not currently registered"); " is not currently registered");
} }
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
handleBrokerUnregistered(brokerId, registration.epoch(), records); handleBrokerUnregistered(brokerId, registration.epoch(), records);
return ControllerResult.of(records, null); return ControllerResult.of(records, null);
} }
@ -1494,8 +1497,8 @@ public class ReplicationControlManager {
ControllerRequestContext context, ControllerRequestContext context,
List<CreatePartitionsTopic> topics List<CreatePartitionsTopic> topics
) { ) {
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
List<CreatePartitionsTopicResult> results = new ArrayList<>(); List<CreatePartitionsTopicResult> results = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
for (CreatePartitionsTopic topic : topics) { for (CreatePartitionsTopic topic : topics) {
ApiError apiError = ApiError.NONE; ApiError apiError = ApiError.NONE;
try { try {
@ -1734,7 +1737,7 @@ public class ReplicationControlManager {
ControllerResult<AlterPartitionReassignmentsResponseData> ControllerResult<AlterPartitionReassignmentsResponseData>
alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) {
List<ApiMessageAndVersion> records = new ArrayList<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
AlterPartitionReassignmentsResponseData result = AlterPartitionReassignmentsResponseData result =
new AlterPartitionReassignmentsResponseData().setErrorMessage(null); new AlterPartitionReassignmentsResponseData().setErrorMessage(null);
int successfulAlterations = 0, totalAlterations = 0; int successfulAlterations = 0, totalAlterations = 0;

View File

@ -248,6 +248,7 @@ public class ClientQuotaControlManagerTest {
new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList( new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
new EntityData().setEntityType("client-id").setEntityName("client-id-2"))). new EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0)); setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0));
records = new ArrayList<>(records);
RecordTestUtils.deepSortRecords(records); RecordTestUtils.deepSortRecords(records);
RecordTestUtils.deepSortRecords(expectedRecords); RecordTestUtils.deepSortRecords(expectedRecords);
assertEquals(expectedRecords, records); assertEquals(expectedRecords, records);

View File

@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.mutable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
/**
* A list which cannot grow beyond a certain length. If the maximum length would be exceeded by an
* operation, the operation throws a BoundedListTooLongException exception rather than completing.
* For simplicity, mutation through iterators or sublists is not allowed.
*
* @param <E> the element type
*/
public class BoundedList<E> implements List<E> {
private final int maxLength;
private final List<E> underlying;
public static <E> BoundedList<E> newArrayBacked(int maxLength) {
return new BoundedList<>(maxLength, new ArrayList<E>());
}
public static <E> BoundedList<E> newArrayBacked(int maxLength, int initialCapacity) {
return new BoundedList<>(maxLength, new ArrayList<E>(initialCapacity));
}
public BoundedList(int maxLength, List<E> underlying) {
if (maxLength <= 0) {
throw new IllegalArgumentException("Invalid non-positive maxLength of " + maxLength);
}
this.maxLength = maxLength;
if (underlying.size() > maxLength) {
throw new BoundedListTooLongException("Cannot wrap list, because it is longer than " +
"the maximum length " + maxLength);
}
this.underlying = underlying;
}
@Override
public int size() {
return underlying.size();
}
@Override
public boolean isEmpty() {
return underlying.isEmpty();
}
@Override
public boolean contains(Object o) {
return underlying.contains(o);
}
@Override
public Iterator<E> iterator() {
return Collections.unmodifiableList(underlying).iterator();
}
@Override
public Object[] toArray() {
return underlying.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return underlying.toArray(a);
}
@Override
public boolean add(E e) {
if (underlying.size() >= maxLength) {
throw new BoundedListTooLongException("Cannot add another element to the list " +
"because it would exceed the maximum length of " + maxLength);
}
return underlying.add(e);
}
@Override
public boolean remove(Object o) {
return underlying.remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
return underlying.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
int numToAdd = c.size();
if (underlying.size() > maxLength - numToAdd) {
throw new BoundedListTooLongException("Cannot add another " + numToAdd +
" element(s) to the list because it would exceed the maximum length of " +
maxLength);
}
return underlying.addAll(c);
}
@Override
public boolean addAll(int index, Collection<? extends E> c) {
int numToAdd = c.size();
if (underlying.size() > maxLength - numToAdd) {
throw new BoundedListTooLongException("Cannot add another " + numToAdd +
" element(s) to the list because it would exceed the maximum length of " +
maxLength);
}
return underlying.addAll(index, c);
}
@Override
public boolean removeAll(Collection<?> c) {
return underlying.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return underlying.retainAll(c);
}
@Override
public void clear() {
underlying.clear();
}
@Override
public E get(int index) {
return underlying.get(index);
}
@Override
public E set(int index, E element) {
return underlying.set(index, element);
}
@Override
public void add(int index, E element) {
if (underlying.size() >= maxLength) {
throw new BoundedListTooLongException("Cannot add another element to the list " +
"because it would exceed the maximum length of " + maxLength);
}
underlying.add(index, element);
}
@Override
public E remove(int index) {
return underlying.remove(index);
}
@Override
public int indexOf(Object o) {
return underlying.indexOf(o);
}
@Override
public int lastIndexOf(Object o) {
return underlying.lastIndexOf(o);
}
@Override
public ListIterator<E> listIterator() {
return Collections.unmodifiableList(underlying).listIterator();
}
@Override
public ListIterator<E> listIterator(int index) {
return Collections.unmodifiableList(underlying).listIterator(index);
}
@Override
public List<E> subList(int fromIndex, int toIndex) {
return Collections.unmodifiableList(underlying).subList(fromIndex, toIndex);
}
@Override
public boolean equals(Object o) {
return underlying.equals(o);
}
@Override
public int hashCode() {
return underlying.hashCode();
}
@Override
public String toString() {
return underlying.toString();
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.mutable;
/**
* An exception that is thrown when the BoundedList cannot add any more elements.
*/
public class BoundedListTooLongException extends RuntimeException {
private final static long serialVersionUID = 1L;
public BoundedListTooLongException(String message) {
super(message);
}
public BoundedListTooLongException(String message, Throwable t) {
super(message, t);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.mutable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(120)
public class BoundedListTest {
@Test
public void testMaxLengthMustNotBeZero() {
assertEquals("Invalid non-positive maxLength of 0",
assertThrows(IllegalArgumentException.class,
() -> new BoundedList<>(0, new ArrayList<Integer>())).
getMessage());
}
@Test
public void testMaxLengthMustNotBeNegative() {
assertEquals("Invalid non-positive maxLength of -123",
assertThrows(IllegalArgumentException.class,
() -> new BoundedList<>(-123, new ArrayList<Integer>())).
getMessage());
}
@Test
public void testOwnedListMustNotBeTooLong() {
assertEquals("Cannot wrap list, because it is longer than the maximum length 1",
assertThrows(BoundedListTooLongException.class,
() -> new BoundedList<>(1, new ArrayList<>(Arrays.asList(1, 2)))).
getMessage());
}
@Test
public void testAddingToBoundedList() {
BoundedList<Integer> list = new BoundedList<>(2, new ArrayList<>(3));
assertEquals(0, list.size());
assertTrue(list.isEmpty());
assertTrue(list.add(456));
assertTrue(list.contains(456));
assertEquals(1, list.size());
assertFalse(list.isEmpty());
assertTrue(list.add(789));
assertEquals("Cannot add another element to the list because it would exceed the " +
"maximum length of 2",
assertThrows(BoundedListTooLongException.class,
() -> list.add(912)).
getMessage());
assertEquals("Cannot add another element to the list because it would exceed the " +
"maximum length of 2",
assertThrows(BoundedListTooLongException.class,
() -> list.add(0, 912)).
getMessage());
}
private static <E> void testHashCodeAndEquals(List<E> a) {
assertEquals(a, new BoundedList<>(123, a));
assertEquals(a.hashCode(), new BoundedList<>(123, a).hashCode());
}
@Test
public void testHashCodeAndEqualsForEmptyList() {
testHashCodeAndEquals(Collections.emptyList());
}
@Test
public void testHashCodeAndEqualsForNonEmptyList() {
testHashCodeAndEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
}
@Test
public void testSet() {
ArrayList<Integer> underlying = new ArrayList<>(Arrays.asList(1, 2, 3));
BoundedList<Integer> list = new BoundedList<>(3, underlying);
list.set(1, 200);
assertEquals(Arrays.asList(1, 200, 3), list);
}
@Test
public void testRemove() {
ArrayList<String> underlying = new ArrayList<>(Arrays.asList("a", "a", "c"));
BoundedList<String> list = new BoundedList<>(3, underlying);
assertEquals(0, list.indexOf("a"));
assertEquals(1, list.lastIndexOf("a"));
list.remove("a");
assertEquals(Arrays.asList("a", "c"), list);
list.remove(0);
assertEquals(Arrays.asList("c"), list);
}
@Test
public void testClear() {
ArrayList<String> underlying = new ArrayList<>(Arrays.asList("a", "b", "c"));
BoundedList<String> list = new BoundedList<>(3, underlying);
list.clear();
assertEquals(Arrays.asList(), list);
assertTrue(list.isEmpty());
}
@Test
public void testGet() {
BoundedList<Integer> list = new BoundedList<>(3, Arrays.asList(1, 2, 3));
assertEquals(2, list.get(1));
}
@Test
public void testToArray() {
BoundedList<Integer> list = new BoundedList<>(3, Arrays.asList(1, 2, 3));
assertArrayEquals(new Integer[] {1, 2, 3}, list.toArray());
assertArrayEquals(new Integer[] {1, 2, 3}, list.toArray(new Integer[3]));
}
@Test
public void testAddAll() {
ArrayList<String> underlying = new ArrayList<>(Arrays.asList("a", "b", "c"));
BoundedList<String> list = new BoundedList<>(5, underlying);
assertEquals("Cannot add another 3 element(s) to the list because it would exceed the " +
"maximum length of 5",
assertThrows(BoundedListTooLongException.class,
() -> list.addAll(Arrays.asList("d", "e", "f"))).
getMessage());
assertEquals("Cannot add another 3 element(s) to the list because it would exceed the " +
"maximum length of 5",
assertThrows(BoundedListTooLongException.class,
() -> list.addAll(0, Arrays.asList("d", "e", "f"))).
getMessage());
list.addAll(Arrays.asList("d", "e"));
assertEquals(Arrays.asList("a", "b", "c", "d", "e"), list);
}
@Test
public void testIterator() {
BoundedList<Integer> list = new BoundedList<>(3, Arrays.asList(1, 2, 3));
assertEquals(1, list.iterator().next());
assertEquals(1, list.listIterator().next());
assertEquals(3, list.listIterator(2).next());
assertFalse(list.listIterator(3).hasNext());
}
@Test
public void testIteratorIsImmutable() {
BoundedList<Integer> list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3)));
assertThrows(UnsupportedOperationException.class,
() -> list.iterator().remove());
assertThrows(UnsupportedOperationException.class,
() -> list.listIterator().remove());
}
@Test
public void testSubList() {
BoundedList<Integer> list = new BoundedList<>(3, new ArrayList<>(Arrays.asList(1, 2, 3)));
assertEquals(Arrays.asList(2), list.subList(1, 2));
assertThrows(UnsupportedOperationException.class,
() -> list.subList(1, 2).remove(2));
}
}