KAFKA-4591; Create Topic Policy (KIP-108)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Gwen Shapira <cshapi@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #2361 from ijuma/kafka-4591-create-topic-policy
This commit is contained in:
Ismael Juma 2017-01-13 19:45:49 -08:00 committed by Jason Gustafson
parent bc61d9bee6
commit da57bc27e7
23 changed files with 635 additions and 230 deletions

View File

@ -125,6 +125,12 @@
</subpackage>
</subpackage>
<subpackage name="server">
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
<subpackage name="tools">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.producer" />

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class PolicyViolationException extends ApiException {
public PolicyViolationException(String message) {
super(message);
}
public PolicyViolationException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@ -164,7 +165,8 @@ public enum Errors {
new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
" the message was sent to an incompatible broker. See the broker logs for more details.")),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request."));
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the system policy."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -913,6 +913,10 @@ public class Protocol {
public static final Schema TOPIC_ERROR_CODE = new Schema(new Field("topic", STRING), new Field("error_code", INT16));
// Improves on TOPIC_ERROR_CODE by adding an error_message to complement the error_code
public static final Schema TOPIC_ERROR = new Schema(new Field("topic", STRING), new Field("error_code", INT16),
new Field("error_message", NULLABLE_STRING));
/* CreateTopic api */
public static final Schema SINGLE_CREATE_TOPIC_REQUEST_V0 = new Schema(
new Field("topic",
@ -940,12 +944,30 @@ public class Protocol {
"The time in ms to wait for a topic to be completely created on the controller node. Values <= 0 will trigger topic creation and return immediately"));
public static final Schema CREATE_TOPICS_RESPONSE_V0 = new Schema(
new Field("topic_error_codes",
new Field("topic_errors",
new ArrayOf(TOPIC_ERROR_CODE),
"An array of per topic error codes."));
public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0};
public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0};
public static final Schema SINGLE_CREATE_TOPIC_REQUEST_V1 = SINGLE_CREATE_TOPIC_REQUEST_V0;
public static final Schema CREATE_TOPICS_REQUEST_V1 = new Schema(
new Field("create_topic_requests",
new ArrayOf(SINGLE_CREATE_TOPIC_REQUEST_V1),
"An array of single topic creation requests. Can not have multiple entries for the same topic."),
new Field("timeout",
INT32,
"The time in ms to wait for a topic to be completely created on the controller node. Values <= 0 will trigger topic creation and return immediately"),
new Field("validate_only",
BOOLEAN,
"If this is true, the request will be validated, but the topic won't be created."));
public static final Schema CREATE_TOPICS_RESPONSE_V1 = new Schema(
new Field("topic_errors",
new ArrayOf(TOPIC_ERROR),
"An array of per topic errors."));
public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1};
public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1};
/* DeleteTopic api */
public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema(

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.ObsoleteBrokerException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
@ -35,6 +36,7 @@ public class CreateTopicsRequest extends AbstractRequest {
private static final String REQUESTS_KEY_NAME = "create_topic_requests";
private static final String TIMEOUT_KEY_NAME = "timeout";
private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
private static final String TOPIC_KEY_NAME = "topic";
private static final String NUM_PARTITIONS_KEY_NAME = "num_partitions";
private static final String REPLICATION_FACTOR_KEY_NAME = "replication_factor";
@ -96,17 +98,25 @@ public class CreateTopicsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<CreateTopicsRequest> {
private final Map<String, TopicDetails> topics;
private final Integer timeout;
private final int timeout;
private final boolean validateOnly; // introduced in V1
public Builder(Map<String, TopicDetails> topics, Integer timeout) {
public Builder(Map<String, TopicDetails> topics, int timeout) {
this(topics, timeout, false);
}
public Builder(Map<String, TopicDetails> topics, int timeout, boolean validateOnly) {
super(ApiKeys.CREATE_TOPICS);
this.topics = topics;
this.timeout = timeout;
this.validateOnly = validateOnly;
}
@Override
public CreateTopicsRequest build() {
return new CreateTopicsRequest(topics, timeout, version());
if (validateOnly && version() == 0)
throw new ObsoleteBrokerException("validateOnly is not supported in version 0 of CreateTopicsRequest");
return new CreateTopicsRequest(topics, timeout, validateOnly, version());
}
@Override
@ -115,6 +125,7 @@ public class CreateTopicsRequest extends AbstractRequest {
bld.append("(type=CreateTopicsRequest").
append(", topics=").append(Utils.mkString(topics)).
append(", timeout=").append(timeout).
append(", validateOnly=").append(validateOnly).
append(")");
return bld.toString();
}
@ -122,6 +133,7 @@ public class CreateTopicsRequest extends AbstractRequest {
private final Map<String, TopicDetails> topics;
private final Integer timeout;
private final boolean validateOnly; // introduced in V1
// Set to handle special case where 2 requests for the same topic exist on the wire.
// This allows the broker to return an error code for these topics.
@ -130,7 +142,7 @@ public class CreateTopicsRequest extends AbstractRequest {
public static final int NO_NUM_PARTITIONS = -1;
public static final short NO_REPLICATION_FACTOR = -1;
private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, short version) {
private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer timeout, boolean validateOnly, short version) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.CREATE_TOPICS.id, version)), version);
List<Struct> createTopicRequestStructs = new ArrayList<>(topics.size());
@ -167,9 +179,12 @@ public class CreateTopicsRequest extends AbstractRequest {
}
struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
struct.set(TIMEOUT_KEY_NAME, timeout);
if (version >= 1)
struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
this.topics = topics;
this.timeout = timeout;
this.validateOnly = validateOnly;
this.duplicateTopics = Collections.emptySet();
}
@ -225,20 +240,28 @@ public class CreateTopicsRequest extends AbstractRequest {
this.topics = topics;
this.timeout = struct.getInt(TIMEOUT_KEY_NAME);
if (struct.hasField(VALIDATE_ONLY_KEY_NAME))
this.validateOnly = struct.getBoolean(VALIDATE_ONLY_KEY_NAME);
else
this.validateOnly = false;
this.duplicateTopics = duplicateTopics;
}
@Override
public AbstractResponse getErrorResponse(Throwable e) {
Map<String, Errors> topicErrors = new HashMap<>();
Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
for (String topic : topics.keySet()) {
topicErrors.put(topic, Errors.forException(e));
Errors error = Errors.forException(e);
// Avoid populating the error message if it's a generic one
String message = error.message().equals(e.getMessage()) ? null : e.getMessage();
topicErrors.put(topic, new CreateTopicsResponse.Error(error, message));
}
short versionId = version();
switch (versionId) {
case 0:
return new CreateTopicsResponse(topicErrors);
case 1:
return new CreateTopicsResponse(topicErrors, versionId);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CREATE_TOPICS.id)));
@ -249,10 +272,14 @@ public class CreateTopicsRequest extends AbstractRequest {
return this.topics;
}
public Integer timeout() {
public int timeout() {
return this.timeout;
}
public boolean validateOnly() {
return validateOnly;
}
public Set<String> duplicateTopics() {
return this.duplicateTopics;
}

View File

@ -32,9 +32,43 @@ import java.util.Map;
public class CreateTopicsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPICS.id);
private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
private static final String TOPIC_KEY_NAME = "topic";
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
public static class Error {
private final Errors error;
private final String message; // introduced in V1
public Error(Errors error, String message) {
this.error = error;
this.message = message;
}
public boolean is(Errors error) {
return this.error == error;
}
public Errors error() {
return error;
}
public String message() {
return message;
}
public String messageWithFallback() {
if (message == null)
return error.message();
return message;
}
@Override
public String toString() {
return "Error(error=" + error + ", message=" + message + ")";
}
}
/**
* Possible error codes:
@ -51,19 +85,22 @@ public class CreateTopicsResponse extends AbstractResponse {
* INVALID_REQUEST(42)
*/
private final Map<String, Errors> errors;
private final Map<String, Error> errors;
public CreateTopicsResponse(Map<String, Errors> errors) {
super(new Struct(CURRENT_SCHEMA));
public CreateTopicsResponse(Map<String, Error> errors, short version) {
super(new Struct(ProtoUtils.responseSchema(ApiKeys.CREATE_TOPICS.id, version)));
List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, topicError.getValue().code());
topicErrorCodeStructs.add(topicErrorCodeStruct);
List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
for (Map.Entry<String, Error> topicError : errors.entrySet()) {
Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
Error error = topicError.getValue();
topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code());
if (version >= 1)
topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message());
topicErrorsStructs.add(topicErrorsStruct);
}
struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray());
struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
this.errors = errors;
}
@ -71,19 +108,22 @@ public class CreateTopicsResponse extends AbstractResponse {
public CreateTopicsResponse(Struct struct) {
super(struct);
Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
Map<String, Errors> errors = new HashMap<>();
for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj;
Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
Map<String, Error> errors = new HashMap<>();
for (Object topicErrorStructObj : topicErrorStructs) {
Struct topicErrorCodeStruct = (Struct) topicErrorStructObj;
String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
short errorCode = topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME);
errors.put(topic, Errors.forCode(errorCode));
String errorMessage = null;
if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME))
errorMessage = topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME);
errors.put(topic, new Error(Errors.forCode(errorCode), errorMessage));
}
this.errors = errors;
}
public Map<String, Errors> errors() {
public Map<String, Error> errors() {
return errors;
}

View File

@ -146,6 +146,10 @@ public class JoinGroupResponse extends AbstractResponse {
return members;
}
public static JoinGroupResponse parse(ByteBuffer buffer, int version) {
return new JoinGroupResponse(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version).read(buffer));
}
public static JoinGroupResponse parse(ByteBuffer buffer) {
return new JoinGroupResponse(CURRENT_SCHEMA.read(buffer));
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.server.policy;
import org.apache.kafka.common.errors.PolicyViolationException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public interface CreateTopicPolicy {
class RequestMetadata {
private final String topic;
private final int numPartitions;
private final short replicationFactor;
private final Map<Integer, List<Integer>> replicasAssignments;
private final Map<String, String> configs;
public RequestMetadata(String topic, int numPartitions, short replicationFactor,
Map<Integer, List<Integer>> replicasAssignments, Map<String, String> configs) {
this.topic = topic;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
this.configs = Collections.unmodifiableMap(configs);
}
public String topic() {
return topic;
}
public int numPartitions() {
return numPartitions;
}
public Map<Integer, List<Integer>> replicasAssignments() {
return replicasAssignments;
}
public Map<String, String> configs() {
return configs;
}
@Override
public String toString() {
return "RequestMetadata(topic=" + topic +
", numPartitions=" + numPartitions +
", replicationFactor=" + replicationFactor +
", replicasAssignments=" + replicasAssignments +
", configs=" + configs + ")";
}
}
void validate(RequestMetadata requestMetadata) throws PolicyViolationException;
}

View File

@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ObsoleteBrokerException;
import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.network.ListenerName;
@ -45,11 +46,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RequestResponseTest {
private static final Logger log = LoggerFactory.getLogger(RequestResponseTest.class);
@Test
public void testSerialization() throws Exception {
@ -61,15 +58,15 @@ public class RequestResponseTest {
checkSerialization(createControlledShutdownRequest());
checkSerialization(createControlledShutdownResponse(), null);
checkSerialization(createControlledShutdownRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createFetchRequest(3));
checkSerialization(createFetchRequest(3).getErrorResponse(new UnknownServerException()), null);
checkSerialization(createFetchRequest(3), 3);
checkSerialization(createFetchRequest(3).getErrorResponse(new UnknownServerException()), 3);
checkSerialization(createFetchResponse(), null);
checkSerialization(createHeartBeatRequest());
checkSerialization(createHeartBeatRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createHeartBeatResponse(), null);
checkSerialization(createJoinGroupRequest(1));
checkSerialization(createJoinGroupRequest(0).getErrorResponse(new UnknownServerException()), null);
checkSerialization(createJoinGroupRequest(1).getErrorResponse(new UnknownServerException()), null);
checkSerialization(createJoinGroupRequest(1), 1);
checkSerialization(createJoinGroupRequest(0).getErrorResponse(new UnknownServerException()), 0);
checkSerialization(createJoinGroupRequest(1).getErrorResponse(new UnknownServerException()), 1);
checkSerialization(createJoinGroupResponse(), null);
checkSerialization(createLeaveGroupRequest());
checkSerialization(createLeaveGroupRequest().getErrorResponse(new UnknownServerException()), null);
@ -80,16 +77,16 @@ public class RequestResponseTest {
checkSerialization(createDescribeGroupRequest());
checkSerialization(createDescribeGroupRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createDescribeGroupResponse(), null);
checkSerialization(createListOffsetRequest(1));
checkSerialization(createListOffsetRequest(1).getErrorResponse(new UnknownServerException()), null);
checkSerialization(createListOffsetResponse(1), null);
checkSerialization(MetadataRequest.allTopics((short) 2));
checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")));
checkSerialization(createListOffsetRequest(1), 1);
checkSerialization(createListOffsetRequest(1).getErrorResponse(new UnknownServerException()), 1);
checkSerialization(createListOffsetResponse(1), 1);
checkSerialization(MetadataRequest.allTopics((short) 2), 2);
checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")), 1);
checkSerialization(createMetadataRequest(1, Arrays.asList("topic1")).getErrorResponse(new UnknownServerException()), 1);
checkSerialization(createMetadataResponse(2), null);
checkSerialization(createMetadataResponse(2), 2);
checkSerialization(createMetadataRequest(2, Arrays.asList("topic1")).getErrorResponse(new UnknownServerException()), 2);
checkSerialization(createOffsetCommitRequest(2));
checkSerialization(createOffsetCommitRequest(2).getErrorResponse(new UnknownServerException()), null);
checkSerialization(createOffsetCommitRequest(2), 2);
checkSerialization(createOffsetCommitRequest(2).getErrorResponse(new UnknownServerException()), 2);
checkSerialization(createOffsetCommitResponse(), null);
checkSerialization(OffsetFetchRequest.forAllPartitions("group1"));
checkSerialization(OffsetFetchRequest.forAllPartitions("group1").getErrorResponse(new NotCoordinatorForGroupException()), 2);
@ -112,9 +109,12 @@ public class RequestResponseTest {
checkSerialization(createApiVersionRequest());
checkSerialization(createApiVersionRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createApiVersionResponse(), null);
checkSerialization(createCreateTopicRequest());
checkSerialization(createCreateTopicRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createCreateTopicResponse(), null);
checkSerialization(createCreateTopicRequest(0), 0);
checkSerialization(createCreateTopicRequest(0).getErrorResponse(new UnknownServerException()), 0);
checkSerialization(createCreateTopicResponse(0), 0);
checkSerialization(createCreateTopicRequest(1), 1);
checkSerialization(createCreateTopicRequest(1).getErrorResponse(new UnknownServerException()), 1);
checkSerialization(createCreateTopicResponse(1), 1);
checkSerialization(createDeleteTopicsRequest());
checkSerialization(createDeleteTopicsRequest().getErrorResponse(new UnknownServerException()), null);
checkSerialization(createDeleteTopicsResponse(), null);
@ -132,12 +132,12 @@ public class RequestResponseTest {
checkSerialization(createUpdateMetadataRequest(1, null), 1);
checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(new UnknownServerException()), 1);
checkSerialization(createUpdateMetadataRequest(2, "rack1"));
checkSerialization(createUpdateMetadataRequest(2, null));
checkSerialization(createUpdateMetadataRequest(2, "rack1").getErrorResponse(new UnknownServerException()), null);
checkSerialization(createUpdateMetadataRequest(2, "rack1"), 2);
checkSerialization(createUpdateMetadataRequest(2, null), 2);
checkSerialization(createUpdateMetadataRequest(2, "rack1").getErrorResponse(new UnknownServerException()), 2);
checkSerialization(createUpdateMetadataRequest(3, "rack1"));
checkSerialization(createUpdateMetadataRequest(3, null));
checkSerialization(createUpdateMetadataRequest(3, "rack1").getErrorResponse(new UnknownServerException()), null);
checkSerialization(createUpdateMetadataRequest(3, "rack1").getErrorResponse(new UnknownServerException()), 3);
checkSerialization(createUpdateMetadataResponse(), null);
checkSerialization(createListOffsetRequest(0), 0);
checkSerialization(createListOffsetRequest(0).getErrorResponse(new UnknownServerException()), 0);
@ -168,7 +168,8 @@ public class RequestResponseTest {
Method deserializer = req.getClass().getDeclaredMethod("parse", ByteBuffer.class, Integer.TYPE);
deserialized = (AbstractRequestResponse) deserializer.invoke(null, buffer, version);
}
assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + "(version " + version + ") should be the same.", req, deserialized);
assertEquals("The original and deserialized of " + req.getClass().getSimpleName() +
"(version " + version + ") should be the same.", req, deserialized);
assertEquals("The original and deserialized of " + req.getClass().getSimpleName() + " should have the same hashcode.",
req.hashCode(), deserialized.hashCode());
}
@ -261,6 +262,11 @@ public class RequestResponseTest {
assertEquals("", deserialized.clientId()); // null is defaulted to ""
}
@Test(expected = ObsoleteBrokerException.class)
public void testCreateTopicRequestV0FailsIfValidateOnly() {
createCreateTopicRequest(0, true);
}
private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
@ -550,7 +556,11 @@ public class RequestResponseTest {
return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
}
private CreateTopicsRequest createCreateTopicRequest() {
private CreateTopicsRequest createCreateTopicRequest(int version) {
return createCreateTopicRequest(version, version >= 1);
}
private CreateTopicsRequest createCreateTopicRequest(int version, boolean validateOnly) {
CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
@ -565,14 +575,14 @@ public class RequestResponseTest {
Map<String, CreateTopicsRequest.TopicDetails> request = new HashMap<>();
request.put("my_t1", request1);
request.put("my_t2", request2);
return new CreateTopicsRequest.Builder(request, 0).build();
return new CreateTopicsRequest.Builder(request, 0, validateOnly).setVersion((short) version).build();
}
private CreateTopicsResponse createCreateTopicResponse() {
Map<String, Errors> errors = new HashMap<>();
errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
errors.put("t2", Errors.LEADER_NOT_AVAILABLE);
return new CreateTopicsResponse(errors);
private CreateTopicsResponse createCreateTopicResponse(int version) {
Map<String, CreateTopicsResponse.Error> errors = new HashMap<>();
errors.put("t1", new CreateTopicsResponse.Error(Errors.INVALID_TOPIC_EXCEPTION, null));
errors.put("t2", new CreateTopicsResponse.Error(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available."));
return new CreateTopicsResponse(errors, (short) version);
}
private DeleteTopicsRequest createDeleteTopicsRequest() {

View File

@ -421,7 +421,8 @@ object AdminUtils extends Logging with AdminUtilities {
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
update: Boolean = false,
validateOnly: Boolean = false) {
// validate arguments
Topic.validate(topic)
@ -450,13 +451,17 @@ object AdminUtils extends Logging with AdminUtilities {
// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
LogConfig.validate(config)
writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
if (!validateOnly) {
// write out the config if there is any, this isn't transactional with the partition assignments
writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
}
}
// create the partition assignment
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
if (!validateOnly) {
// create the partition assignment
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
}
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {

View File

@ -27,6 +27,9 @@ import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest._
import org.apache.kafka.common.requests.CreateTopicsResponse
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import scala.collection._
import scala.collection.JavaConverters._
@ -37,7 +40,10 @@ class AdminManager(val config: KafkaConfig,
val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicsPolicyClassNameProp, classOf[CreateTopicPolicy]))
def hasDelayedTopicOperations = topicPurgatory.delayed() != 0
@ -55,8 +61,9 @@ class AdminManager(val config: KafkaConfig,
* The callback function will be triggered either when timeout, error or the topics are created.
*/
def createTopics(timeout: Int,
validateOnly: Boolean,
createInfo: Map[String, TopicDetails],
responseCallback: Map[String, Errors] => Unit) {
responseCallback: Map[String, CreateTopicsResponse.Error] => Unit) {
// 1. map over topics creating assignment and calling zookeeper
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
@ -73,32 +80,38 @@ class AdminManager(val config: KafkaConfig,
&& !arguments.replicasAssignments.isEmpty)
throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
"Both cannot be used at the same time.")
else if (!arguments.replicasAssignments.isEmpty) {
// Note: we don't check that replicaAssignment doesn't contain unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
arguments.replicasAssignments.asScala.map { case (partitionId, replicas) =>
(partitionId.intValue, replicas.asScala.map(_.intValue))
else {
createTopicPolicy.foreach(_.validate(new RequestMetadata(topic, arguments.numPartitions,
arguments.replicationFactor, arguments.replicasAssignments, arguments.configs)))
if (!arguments.replicasAssignments.isEmpty) {
// Note: we don't check that replicaAssignment doesn't contain unknown brokers - unlike in add-partitions case,
// this follows the existing logic in TopicCommand
arguments.replicasAssignments.asScala.map { case (partitionId, replicas) =>
(partitionId.intValue, replicas.asScala.map(_.intValue))
}
} else {
AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor)
}
} else {
AdminUtils.assignReplicasToBrokers(brokers, arguments.numPartitions, arguments.replicationFactor)
}
}
trace(s"Assignments for topic $topic are $assignments ")
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
CreateTopicMetadata(topic, assignments, Errors.NONE)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs,
update = false, validateOnly = validateOnly)
CreateTopicMetadata(topic, assignments, new CreateTopicsResponse.Error(Errors.NONE, null))
} catch {
case e: Throwable =>
warn(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreateTopicMetadata(topic, Map(), Errors.forException(e))
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
}
}
// 2. if timeout <= 0 or no topics can proceed return immediately
if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
// 2. if timeout <= 0, validateOnly or no topics can proceed return immediately
if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) {
val results = metadata.map { createTopicMetadata =>
// ignore topics that already have errors
if (createTopicMetadata.error == Errors.NONE) {
(createTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
(createTopicMetadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
} else {
(createTopicMetadata.topic, createTopicMetadata.error)
}

View File

@ -19,6 +19,7 @@ package kafka.server
import kafka.api.LeaderAndIsr
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsResponse
import scala.collection._
@ -28,7 +29,7 @@ import scala.collection._
* TODO: local state doesn't count, need to know state of all relevant brokers
*
*/
case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: Errors)
case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: CreateTopicsResponse.Error)
/**
* A delayed create topics operation that can be created by the admin manager and watched
@ -37,7 +38,7 @@ case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[I
class DelayedCreateTopics(delayMs: Long,
createMetadata: Seq[CreateTopicMetadata],
adminManager: AdminManager,
responseCallback: Map[String, Errors] => Unit)
responseCallback: Map[String, CreateTopicsResponse.Error] => Unit)
extends DelayedOperation(delayMs) {
/**
@ -47,7 +48,7 @@ class DelayedCreateTopics(delayMs: Long,
override def tryComplete() : Boolean = {
trace(s"Trying to complete operation for $createMetadata")
val leaderlessPartitionCount = createMetadata.filter(_.error == Errors.NONE)
val leaderlessPartitionCount = createMetadata.filter(_.error.is(Errors.NONE))
.foldLeft(0) { case (topicCounter, metadata) =>
topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
}
@ -68,8 +69,8 @@ class DelayedCreateTopics(delayMs: Long,
trace(s"Completing operation for $createMetadata")
val results = createMetadata.map { metadata =>
// ignore topics that already have errors
if (metadata.error == Errors.NONE && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
(metadata.topic, Errors.REQUEST_TIMED_OUT)
if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
(metadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
else
(metadata.topic, metadata.error)
}.toMap

View File

@ -1181,20 +1181,20 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleCreateTopicsRequest(request: RequestChannel.Request) {
val createTopicsRequest = request.body.asInstanceOf[CreateTopicsRequest]
def sendResponseCallback(results: Map[String, Errors]): Unit = {
val responseBody = new CreateTopicsResponse(results.asJava)
def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
val responseBody = new CreateTopicsResponse(results.asJava, request.header.apiVersion)
trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
}
if (!controller.isActive) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, Errors.NOT_CONTROLLER)
(topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null))
}
sendResponseCallback(results)
} else if (!authorize(request.session, Create, Resource.ClusterResource)) {
val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
(topic, Errors.CLUSTER_AUTHORIZATION_FAILED)
(topic, new CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
}
sendResponseCallback(results)
} else {
@ -1203,15 +1203,25 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// Special handling to add duplicate topics to the response
def sendResponseWithDuplicatesCallback(results: Map[String, Errors]): Unit = {
if (duplicateTopics.nonEmpty)
warn(s"Create topics request from client ${request.header.clientId} contains multiple entries for the following topics: ${duplicateTopics.keySet.mkString(",")}")
val completeResults = results ++ duplicateTopics.keySet.map((_, Errors.INVALID_REQUEST)).toMap
def sendResponseWithDuplicatesCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = {
val duplicatedTopicsResults =
if (duplicateTopics.nonEmpty) {
val errorMessage = s"Create topics request from client `${request.header.clientId}` contains multiple entries " +
s"for the following topics: ${duplicateTopics.keySet.mkString(",")}"
// We can send the error message in the response for version 1, so we don't have to log it any more
if (request.header.apiVersion == 0)
warn(errorMessage)
duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
val completeResults = results ++ duplicatedTopicsResults
sendResponseCallback(completeResults)
}
adminManager.createTopics(
createTopicsRequest.timeout.toInt,
createTopicsRequest.timeout,
createTopicsRequest.validateOnly,
validTopics,
sendResponseWithDuplicatesCallback
)

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.{MetricsReporter, Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.server.policy.CreateTopicPolicy
import scala.collection.{Map, immutable}
import scala.collection.JavaConverters._
@ -280,6 +281,7 @@ object KafkaConfig {
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
val MinInSyncReplicasProp = "min.insync.replicas"
val CreateTopicsPolicyClassNameProp = "create.topics.policy.class.name"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@ -488,6 +490,9 @@ object KafkaConfig {
"create a topic with a replication factor of 3, set min.insync.replicas to 2, and " +
"produce with acks of \"all\". This will ensure that the producer raises an exception " +
"if a majority of replicas do not receive a write."
val CreateTopicsPolicyClassNameDoc = "The create topics policy class that should be used for validation. The class should " +
"implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
val ControllerMessageQueueSizeDoc = "The buffer size for controller-to-broker-channels"
@ -688,6 +693,7 @@ object KafkaConfig {
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicsPolicyClassNameProp, CLASS, null, LOW, CreateTopicsPolicyClassNameDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)

View File

@ -171,7 +171,7 @@ object CoreUtils extends Logging {
/**
* Create an instance of the class with the given class name
*/
def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
val klass = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*)

View File

@ -120,7 +120,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.code),
ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error.code),
ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2.code)
)

View File

@ -21,7 +21,6 @@ import java.security.Permission
import java.util
import kafka.server.KafkaConfig
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.types.Password
import org.junit.{After, Before, Test}
import org.junit.Assert._

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
import scala.collection.JavaConverters._
class AbstractCreateTopicsRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties): Unit =
properties.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
val response = sendCreateTopicRequest(request)
val error = response.errors.values.asScala.find(!_.is(Errors.NONE))
assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
request.topics.asScala.foreach { case (topic, details) =>
def verifyMetadata(socketServer: SocketServer) = {
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
val metadataForTopic = metadata.filter(_.topic == topic).head
val partitions = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.size
else
details.numPartitions
val replication = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.asScala.head._2.size
else
details.replicationFactor
if (request.validateOnly) {
assertNotNull(s"Topic $topic should be created", metadataForTopic)
assertFalse(s"Error ${metadataForTopic.error} for topic $topic", metadataForTopic.error == Errors.NONE)
assertTrue("The topic should have no partitions", metadataForTopic.partitionMetadata.isEmpty)
}
else {
assertNotNull("The topic should be created", metadataForTopic)
assertEquals(Errors.NONE, metadataForTopic.error)
assertEquals("The topic should have the correct number of partitions", partitions, metadataForTopic.partitionMetadata.size)
assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
}
}
// Verify controller broker has the correct metadata
verifyMetadata(controllerSocketServer)
if (!request.validateOnly) {
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
}
verifyMetadata(notControllerSocketServer)
}
}
protected def error(error: Errors, errorMessage: Option[String] = None): CreateTopicsResponse.Error =
new CreateTopicsResponse.Error(error, errorMessage.orNull)
protected def duplicateFirstTopic(request: CreateTopicsRequest) = {
val struct = request.toStruct
val topics = struct.getArray("create_topic_requests")
val firstTopic = topics(0).asInstanceOf[Struct]
val newTopics = firstTopic :: topics.toList
struct.set("create_topic_requests", newTopics.toArray)
new CreateTopicsRequest(struct, request.version)
}
protected def addPartitionsAndReplicationFactorToFirstTopic(request: CreateTopicsRequest) = {
val struct = request.toStruct
val topics = struct.getArray("create_topic_requests")
val firstTopic = topics(0).asInstanceOf[Struct]
firstTopic.set("num_partitions", 1)
firstTopic.set("replication_factor", 1.toShort)
new CreateTopicsRequest(struct, request.version)
}
protected def validateErrorCreateTopicsRequests(request: CreateTopicsRequest,
expectedResponse: Map[String, CreateTopicsResponse.Error],
checkErrorMessage: Boolean = true): Unit = {
val response = sendCreateTopicRequest(request)
val errors = response.errors.asScala
assertEquals("The response size should match", expectedResponse.size, response.errors.size)
expectedResponse.foreach { case (topic, expectedError) =>
val expected = expectedResponse(topic)
val actual = errors(topic)
assertEquals("The response error should match", expected.error, actual.error)
if (checkErrorMessage) {
assertEquals(expected.message, actual.message)
assertEquals(expected.messageWithFallback, actual.messageWithFallback)
}
// If no error validate topic exists
if (expectedError.is(Errors.NONE) && !request.validateOnly) {
validateTopicExists(topic)
}
}
}
protected def validateTopicExists(topic: String): Unit = {
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE))
}
protected def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {
assignments.map { case (k, v) => (k:Integer, v.map { i => i:Integer }.asJava) }.asJava
}
protected def sendCreateTopicRequest(request: CreateTopicsRequest, socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
val response = send(request, ApiKeys.CREATE_TOPICS, socketServer)
CreateTopicsResponse.parse(response, request.version)
}
protected def sendMetadataRequest(request: MetadataRequest, destination: SocketServer = anySocketServer): MetadataResponse = {
val version = ProtoUtils.latestVersion(ApiKeys.METADATA.id)
val response = send(request, ApiKeys.METADATA, destination = destination)
MetadataResponse.parse(response, version)
}
}

View File

@ -26,9 +26,8 @@ import kafka.integration.KafkaServerTestHarness
import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader}
import org.junit.Before
abstract class BaseRequestTest extends KafkaServerTestHarness {
private var correlationId = 0
@ -122,7 +121,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket): ByteBuffer = {
correlationId += 1
val serializedBytes = {
val header = new RequestHeader(apiKey.id, request.version, "", correlationId)
val header = new RequestHeader(apiKey.id, request.version, "client-id", correlationId)
val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf)
header.writeTo(byteBuffer)
request.writeTo(byteBuffer)

View File

@ -17,18 +17,15 @@
package kafka.server
import kafka.network.SocketServer
import kafka.utils._
import org.apache.kafka.common.protocol.types.Struct
import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
class CreateTopicsRequestTest extends BaseRequestTest {
class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
@Test
def testValidCreateTopicsRequests() {
@ -51,42 +48,11 @@ class CreateTopicsRequestTest extends BaseRequestTest {
"topic7" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
"topic8" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout).build()
)
}
private def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = {
val response = sendCreateTopicRequest(request)
val error = response.errors.values.asScala.find(_ != Errors.NONE)
assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
request.topics.asScala.foreach { case (topic, details) =>
def verifyMetadata(socketServer: SocketServer) = {
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
val metadataForTopic = metadata.filter(p => p.topic.equals(topic)).head
val partitions = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.size
else
details.numPartitions
val replication = if (!details.replicasAssignments.isEmpty)
details.replicasAssignments.asScala.head._2.size
else
details.replicationFactor
assertNotNull("The topic should be created", metadataForTopic)
assertEquals("The topic should have the correct number of partitions", partitions, metadataForTopic.partitionMetadata.size)
assertEquals("The topic should have the correct replication factor", replication, metadataForTopic.partitionMetadata.asScala.head.replicas.size)
}
// Verify controller broker has the correct metadata
verifyMetadata(controllerSocketServer)
// Wait until metadata is propagated and validate non-controller broker has the correct metadata
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
verifyMetadata(notControllerSocketServer)
}
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(
"topic9" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"topic10" -> new CreateTopicsRequest.TopicDetails(5, 2.toShort),
"topic11" -> new CreateTopicsRequest.TopicDetails(assignments8)).asJava, timeout, true).build()
)
}
@Test
@ -97,17 +63,17 @@ class CreateTopicsRequestTest extends BaseRequestTest {
// Basic
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map(existingTopic -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
Map(existingTopic -> Errors.TOPIC_ALREADY_EXISTS))
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic "existing-topic" already exists."""))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-partitions" -> new CreateTopicsRequest.TopicDetails(-1, 1.toShort)).asJava, timeout).build(),
Map("error-partitions" -> Errors.INVALID_PARTITIONS))
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-replication" -> new CreateTopicsRequest.TopicDetails(1, (numBrokers + 1).toShort)).asJava, timeout).build(),
Map("error-replication" -> Errors.INVALID_REPLICATION_FACTOR))
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR)), checkErrorMessage = false)
val invalidConfig = Map("not.a.property" -> "error").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
Map("error-config" -> Errors.INVALID_CONFIG))
Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
Map("error-assignment" -> Errors.INVALID_REPLICA_ASSIGNMENT))
Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)
// Partial
validateErrorCreateTopicsRequests(
@ -118,24 +84,24 @@ class CreateTopicsRequestTest extends BaseRequestTest {
"partial-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments),
"partial-none" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, timeout).build(),
Map(
existingTopic -> Errors.TOPIC_ALREADY_EXISTS,
"partial-partitions" -> Errors.INVALID_PARTITIONS,
"partial-replication" -> Errors.INVALID_REPLICATION_FACTOR,
"partial-assignment" -> Errors.INVALID_REPLICA_ASSIGNMENT,
"partial-none" -> Errors.NONE
)
existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS),
"partial-partitions" -> error(Errors.INVALID_PARTITIONS),
"partial-replication" -> error(Errors.INVALID_REPLICATION_FACTOR),
"partial-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT),
"partial-none" -> error(Errors.NONE)
), checkErrorMessage = false
)
validateTopicExists("partial-none")
// Timeout
// We don't expect a request to ever complete within 1ms. A timeout of 1 ms allows us to test the purgatory timeout logic.
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 1).build(),
Map("error-timeout" -> Errors.REQUEST_TIMED_OUT))
Map("error-timeout" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-zero" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, 0).build(),
Map("error-timeout-zero" -> Errors.REQUEST_TIMED_OUT))
Map("error-timeout-zero" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// Negative timeouts are treated the same as 0
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-timeout-negative" -> new CreateTopicsRequest.TopicDetails(10, 3.toShort)).asJava, -1).build(),
Map("error-timeout-negative" -> Errors.REQUEST_TIMED_OUT))
Map("error-timeout-negative" -> error(Errors.REQUEST_TIMED_OUT)), checkErrorMessage = false)
// The topics should still get created eventually
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout", 0)
TestUtils.waitUntilMetadataIsPropagated(servers, "error-timeout-zero", 0)
@ -152,56 +118,43 @@ class CreateTopicsRequestTest extends BaseRequestTest {
new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
val duplicateRequest = duplicateFirstTopic(singleRequest)
assertFalse("Request doesn't have duplicate topics", duplicateRequest.duplicateTopics().isEmpty)
validateErrorCreateTopicsRequests(duplicateRequest, Map("duplicate-topic" -> Errors.INVALID_REQUEST))
validateErrorCreateTopicsRequests(duplicateRequest, Map("duplicate-topic" -> error(Errors.INVALID_REQUEST,
Some("""Create topics request from client `client-id` contains multiple entries for the following topics: duplicate-topic"""))))
// Duplicate Partial with validateOnly
val doubleRequestValidateOnly = new CreateTopicsRequest.Builder(Map(
"duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000, true).build()
val duplicateDoubleRequestValidateOnly = duplicateFirstTopic(doubleRequestValidateOnly)
assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequestValidateOnly.duplicateTopics.isEmpty)
validateErrorCreateTopicsRequests(duplicateDoubleRequestValidateOnly, Map(
"duplicate-topic" -> error(Errors.INVALID_REQUEST),
"other-topic" -> error(Errors.NONE)), checkErrorMessage = false)
// Duplicate Partial
val doubleRequest = new CreateTopicsRequest.Builder(Map(
"duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort),
"other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
val duplicateDoubleRequest = duplicateFirstTopic(doubleRequest)
assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequest.duplicateTopics().isEmpty)
assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequest.duplicateTopics.isEmpty)
validateErrorCreateTopicsRequests(duplicateDoubleRequest, Map(
"duplicate-topic" -> Errors.INVALID_REQUEST,
"other-topic" -> Errors.NONE))
"duplicate-topic" -> error(Errors.INVALID_REQUEST),
"other-topic" -> error(Errors.NONE)), checkErrorMessage = false)
// Partitions/ReplicationFactor and ReplicaAssignment
val assignments = replicaAssignmentToJava(Map(0 -> List(0)))
val assignmentRequest = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000).build()
val badArgumentsRequest = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequest)
validateErrorCreateTopicsRequests(badArgumentsRequest, Map("bad-args-topic" -> Errors.INVALID_REQUEST))
}
validateErrorCreateTopicsRequests(badArgumentsRequest, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
private def duplicateFirstTopic(request: CreateTopicsRequest) = {
val struct = request.toStruct
val topics = struct.getArray("create_topic_requests")
val firstTopic = topics(0).asInstanceOf[Struct]
val newTopics = firstTopic :: topics.toList
struct.set("create_topic_requests", newTopics.toArray)
new CreateTopicsRequest(struct, request.version)
}
private def addPartitionsAndReplicationFactorToFirstTopic(request: CreateTopicsRequest) = {
val struct = request.toStruct
val topics = struct.getArray("create_topic_requests")
val firstTopic = topics(0).asInstanceOf[Struct]
firstTopic.set("num_partitions", 1)
firstTopic.set("replication_factor", 1.toShort)
new CreateTopicsRequest(struct, request.version)
}
private def validateErrorCreateTopicsRequests(request: CreateTopicsRequest, expectedResponse: Map[String, Errors]): Unit = {
val response = sendCreateTopicRequest(request)
val errors = response.errors.asScala
assertEquals("The response size should match", expectedResponse.size, response.errors.size)
expectedResponse.foreach { case (topic, expectedError) =>
assertEquals("The response error should match", expectedResponse(topic), errors(topic))
// If no error validate topic exists
if (expectedError == Errors.NONE) {
validateTopicExists(topic)
}
}
// Partitions/ReplicationFactor and ReplicaAssignment with validateOnly
val assignmentRequestValidateOnly = new CreateTopicsRequest.Builder(Map("bad-args-topic" ->
new CreateTopicsRequest.TopicDetails(assignments)).asJava, 1000, true).build()
val badArgumentsRequestValidateOnly = addPartitionsAndReplicationFactorToFirstTopic(assignmentRequestValidateOnly)
validateErrorCreateTopicsRequests(badArgumentsRequestValidateOnly, Map("bad-args-topic" -> error(Errors.INVALID_REQUEST)),
checkErrorMessage = false)
}
@Test
@ -209,29 +162,8 @@ class CreateTopicsRequestTest extends BaseRequestTest {
val request = new CreateTopicsRequest.Builder(Map("topic1" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build()
val response = sendCreateTopicRequest(request, notControllerSocketServer)
val error = response.errors.asScala.head._2
assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error)
val error = response.errors.asScala.head._2.error
assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error)
}
private def validateTopicExists(topic: String): Unit = {
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
val metadata = sendMetadataRequest(
new MetadataRequest.Builder(List(topic).asJava).build()).topicMetadata.asScala
assertTrue("The topic should be created", metadata.exists(p => p.topic.equals(topic) && p.error() == Errors.NONE))
}
private def replicaAssignmentToJava(assignments: Map[Int, List[Int]]) = {
assignments.map { case (k, v) => (k:Integer, v.map { i => i:Integer }.asJava) }.asJava
}
private def sendCreateTopicRequest(request: CreateTopicsRequest, socketServer: SocketServer = controllerSocketServer): CreateTopicsResponse = {
val response = send(request, ApiKeys.CREATE_TOPICS, socketServer)
CreateTopicsResponse.parse(response, request.version)
}
private def sendMetadataRequest(request: MetadataRequest, destination: SocketServer = anySocketServer): MetadataResponse = {
val version = ProtoUtils.latestVersion(ApiKeys.METADATA.id)
val response = send(request, ApiKeys.METADATA, destination = destination)
MetadataResponse.parse(response, version)
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.PolicyViolationException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.CreateTopicsRequest
import org.apache.kafka.server.policy.CreateTopicPolicy
import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
import org.junit.Test
import scala.collection.JavaConverters._
class CreateTopicsRequestWithPolicyTest extends AbstractCreateTopicsRequestTest {
import CreateTopicsRequestWithPolicyTest._
override def propertyOverrides(properties: Properties): Unit = {
super.propertyOverrides(properties)
properties.put(KafkaConfig.CreateTopicsPolicyClassNameProp, classOf[Policy].getName)
}
@Test
def testValidCreateTopicsRequests() {
val timeout = 10000
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic1" -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build())
validateValidCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic2" -> new CreateTopicsRequest.TopicDetails(5, 3.toShort)).asJava, timeout, true).build())
}
@Test
def testErrorCreateTopicsRequests() {
val timeout = 10000
val existingTopic = "existing-topic"
TestUtils.createTopic(zkUtils, existingTopic, 1, 1, servers)
// Policy violations
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic3" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout).build(),
Map("topic3" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("topic4" -> new CreateTopicsRequest.TopicDetails(4, 1.toShort)).asJava, timeout, true).build(),
Map("topic4" -> error(Errors.POLICY_VIOLATION, Some("Topics should have at least 5 partitions, received 4"))))
// Check that basic errors still work
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map(existingTopic -> new CreateTopicsRequest.TopicDetails(5, 1.toShort)).asJava, timeout).build(),
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS, Some("""Topic "existing-topic" already exists."""))))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
Map("error-replication" -> new CreateTopicsRequest.TopicDetails(10, (numBrokers + 1).toShort)).asJava, timeout, true).build(),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR, Some("replication factor: 4 larger than available brokers: 3"))))
}
}
object CreateTopicsRequestWithPolicyTest {
class Policy extends CreateTopicPolicy {
def validate(requestMetadata: RequestMetadata): Unit =
if (requestMetadata.numPartitions < 5)
throw new PolicyViolationException(s"Topics should have at least 5 partitions, received ${requestMetadata.numPartitions}")
}
}

View File

@ -547,6 +547,7 @@ class KafkaConfigTest {
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.CreateTopicsPolicyClassNameProp => //ignore string
case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.HostNameProp => // ignore string

View File

@ -126,20 +126,21 @@ public class StreamsKafkaClient {
topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
}
final CreateTopicsRequest.Builder createTopicsRequest =
new CreateTopicsRequest.Builder(topicRequestDetails, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
new CreateTopicsRequest.Builder(topicRequestDetails,
streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
final ClientResponse clientResponse = sendRequest(createTopicsRequest);
if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
throw new StreamsException("Inconsistent response type for internal topic creation request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
}
final CreateTopicsResponse createTopicsResponse = (CreateTopicsResponse) clientResponse.responseBody();
for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
short errorCode = createTopicsResponse.errors().get(internalTopicConfig.name()).code();
if (errorCode > 0) {
if (errorCode == Errors.TOPIC_ALREADY_EXISTS.code()) {
for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
CreateTopicsResponse.Error error = createTopicsResponse.errors().get(internalTopicConfig.name());
if (!error.is(Errors.NONE)) {
if (error.is(Errors.TOPIC_ALREADY_EXISTS)) {
continue;
} else {
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + ". " + createTopicsResponse.errors().get(internalTopicConfig.name()).name());
throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback());
}
}
}
@ -173,9 +174,9 @@ public class StreamsKafkaClient {
throw new StreamsException("Inconsistent response type for internal topic deletion request. Expected DeleteTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
}
final DeleteTopicsResponse deleteTopicsResponse = (DeleteTopicsResponse) clientResponse.responseBody();
for (String topicName: deleteTopicsResponse.errors().keySet()) {
if (deleteTopicsResponse.errors().get(topicName).code() > 0) {
throw new StreamsException("Could not delete topic: " + topicName);
for (Map.Entry<String, Errors> entry : deleteTopicsResponse.errors().entrySet()) {
if (entry.getValue() != Errors.NONE) {
throw new StreamsException("Could not delete topic: " + entry.getKey() + " due to " + entry.getValue().message());
}
}
@ -224,10 +225,13 @@ public class StreamsKafkaClient {
if (responseList.size() > 1) {
throw new StreamsException("Sent one request but received multiple or no responses.");
}
if (responseList.get(0).requestHeader().correlationId() == clientRequest.correlationId()) {
return responseList.get(0);
ClientResponse response = responseList.get(0);
if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
return response;
} else {
throw new StreamsException("Inconsistent response received.");
throw new StreamsException("Inconsistent response received from broker " + brokerId +
", expected correlation id " + clientRequest.correlationId() + ", but received " +
response.requestHeader().correlationId());
}
}
}