KAFKA-7922: Return authorized operations in Metadata request response (KIP-430 Part-2)

-  Use automatic RPC generation in Metadata Request/Response classes
-  https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #6352 from omkreddy/KIP-430-METADATA
This commit is contained in:
Manikumar Reddy 2019-03-10 17:30:16 +05:30 committed by Manikumar Reddy
parent 65aea1f362
commit a42f16f980
35 changed files with 563 additions and 530 deletions

View File

@ -38,7 +38,7 @@ public class ConsumerGroupDescription {
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
private Set<AclOperation> authorizedOperations;
private final Set<AclOperation> authorizedOperations;
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,

View File

@ -27,6 +27,8 @@ import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptions> {
private boolean includeAuthorizedOperations;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* AdminClient should be used.
@ -38,4 +40,12 @@ public class DescribeClusterOptions extends AbstractOptions<DescribeClusterOptio
return this;
}
public DescribeClusterOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}

View File

@ -19,9 +19,11 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
import java.util.Set;
/**
* The result of the {@link KafkaAdminClient#describeCluster()} call.
@ -33,13 +35,16 @@ public class DescribeClusterResult {
private final KafkaFuture<Collection<Node>> nodes;
private final KafkaFuture<Node> controller;
private final KafkaFuture<String> clusterId;
private final KafkaFuture<Set<AclOperation>> authorizedOperations;
DescribeClusterResult(KafkaFuture<Collection<Node>> nodes,
KafkaFuture<Node> controller,
KafkaFuture<String> clusterId) {
KafkaFuture<String> clusterId,
KafkaFuture<Set<AclOperation>> authorizedOperations) {
this.nodes = nodes;
this.controller = controller;
this.clusterId = clusterId;
this.authorizedOperations = authorizedOperations;
}
/**
@ -64,4 +69,11 @@ public class DescribeClusterResult {
public KafkaFuture<String> clusterId() {
return clusterId;
}
/**
* Returns a future which yields authorized operations.
*/
public KafkaFuture<Set<AclOperation>> authorizedOperations() {
return authorizedOperations;
}
}

View File

@ -29,6 +29,8 @@ import java.util.Collection;
@InterfaceStability.Evolving
public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
private boolean includeAuthorizedOperations;
/**
* Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
* AdminClient should be used.
@ -40,4 +42,13 @@ public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions
return this;
}
public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}

View File

@ -66,6 +66,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@ -157,6 +158,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic;
import static org.apache.kafka.common.utils.Utils.closeQuietly;
/**
@ -1222,7 +1224,9 @@ public class KafkaAdminClient extends AdminClient {
// Since this only requests node information, it's safe to pass true
// for allowAutoTopicCreation (and it simplifies communication with
// older brokers)
return new MetadataRequest.Builder(Collections.emptyList(), true);
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
}
@Override
@ -1462,7 +1466,10 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
if (supportsDisablingTopicCreation)
return new MetadataRequest.Builder(topicNamesList, false);
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(convertToMetadataRequestTopic(topicNamesList))
.setAllowAutoTopicCreation(false)
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
else
return MetadataRequest.Builder.allTopics();
}
@ -1495,7 +1502,8 @@ public class KafkaAdminClient extends AdminClient {
partitions.add(topicPartitionInfo);
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions);
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
validAclOperations(response.data().topics().find(topicName).topicAuthorizedOperations()));
future.complete(topicDescription);
}
}
@ -1531,6 +1539,8 @@ public class KafkaAdminClient extends AdminClient {
final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>();
final KafkaFutureImpl<Set<AclOperation>> authorizedOperationsFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@ -1539,7 +1549,10 @@ public class KafkaAdminClient extends AdminClient {
AbstractRequest.Builder createRequest(int timeoutMs) {
// Since this only requests node information, it's safe to pass true for allowAutoTopicCreation (and it
// simplifies communication with older brokers)
return new MetadataRequest.Builder(Collections.emptyList(), true);
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true)
.setIncludeClusterAuthorizedOperations(options.includeAuthorizedOperations()));
}
@Override
@ -1548,6 +1561,8 @@ public class KafkaAdminClient extends AdminClient {
describeClusterFuture.complete(response.brokers());
controllerFuture.complete(controller(response));
clusterIdFuture.complete(response.clusterId());
authorizedOperationsFuture.complete(
validAclOperations(response.data().clusterAuthorizedOperations()));
}
private Node controller(MetadataResponse response) {
@ -1561,10 +1576,12 @@ public class KafkaAdminClient extends AdminClient {
describeClusterFuture.completeExceptionally(throwable);
controllerFuture.completeExceptionally(throwable);
clusterIdFuture.completeExceptionally(throwable);
authorizedOperationsFuture.completeExceptionally(throwable);
}
}, now);
return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture);
return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture,
authorizedOperationsFuture);
}
@Override
@ -2179,7 +2196,9 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new ArrayList<>(topics), false);
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(convertToMetadataRequestTopic(topics))
.setAllowAutoTopicCreation(false));
}
@Override
@ -2583,7 +2602,9 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.emptyList(), true);
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
}
@Override

View File

@ -18,9 +18,12 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* A detailed description of a single topic in the cluster.
@ -29,25 +32,22 @@ public class TopicDescription {
private final String name;
private final boolean internal;
private final List<TopicPartitionInfo> partitions;
private Set<AclOperation> authorizedOperations;
@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicDescription that = (TopicDescription) o;
if (internal != that.internal) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
return partitions != null ? partitions.equals(that.partitions) : that.partitions == null;
final TopicDescription that = (TopicDescription) o;
return internal == that.internal &&
Objects.equals(name, that.name) &&
Objects.equals(partitions, that.partitions) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + (internal ? 1 : 0);
result = 31 * result + (partitions != null ? partitions.hashCode() : 0);
return result;
return Objects.hash(name, internal, partitions, authorizedOperations);
}
/**
@ -57,11 +57,14 @@ public class TopicDescription {
* @param internal Whether the topic is internal to Kafka
* @param partitions A list of partitions where the index represents the partition id and the element contains
* leadership and replica information for that partition.
* @param authorizedOperations authorized operations for this topic
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions) {
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations) {
this.name = name;
this.internal = internal;
this.partitions = partitions;
this.authorizedOperations = authorizedOperations;
}
/**
@ -87,9 +90,16 @@ public class TopicDescription {
return partitions;
}
/**
* authorized operations for this topic
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}
@Override
public String toString() {
return "(name=" + name + ", internal=" + internal + ", partitions=" +
Utils.join(partitions, ",") + ")";
Utils.join(partitions, ",") + ", authorizedOperations=" + authorizedOperations + ")";
}
}

View File

@ -286,7 +286,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
*/
public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, Timer timer) {
// Save the round trip if no topics are requested.
if (!request.isAllTopics() && request.topics().isEmpty())
if (!request.isAllTopics() && request.emptyTopicList())
return Collections.emptyMap();
do {

View File

@ -24,6 +24,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
@ -87,8 +89,6 @@ import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
@ -124,7 +124,7 @@ public enum ApiKeys {
PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()),
FETCH(1, "Fetch", FetchRequest.schemaVersions(), FetchResponse.schemaVersions()),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()),
METADATA(3, "Metadata", MetadataRequest.schemaVersions(), MetadataResponse.schemaVersions()),
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequest.schemaVersions(), LeaderAndIsrResponse.schemaVersions()),
STOP_REPLICA(5, "StopReplica", true, StopReplicaRequest.schemaVersions(), StopReplicaResponse.schemaVersions()),
UPDATE_METADATA(6, "UpdateMetadata", true, UpdateMetadataRequest.schemaVersions(),

View File

@ -77,7 +77,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
case LIST_OFFSETS:
return new ListOffsetResponse(struct);
case METADATA:
return new MetadataResponse(struct);
return new MetadataResponse(struct, version);
case OFFSET_COMMIT:
return new OffsetCommitResponse(struct);
case OFFSET_FETCH:

View File

@ -17,159 +17,116 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import java.util.stream.Collectors;
public class MetadataRequest extends AbstractRequest {
private static final String TOPICS_KEY_NAME = "topics";
private static final Schema METADATA_REQUEST_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(STRING), "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics."));
private static final Schema METADATA_REQUEST_V1 = new Schema(
new Field(TOPICS_KEY_NAME, ArrayOf.nullable(STRING), "An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."));
/* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */
private static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
/* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
private static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;
/* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
private static final Field.Bool ALLOW_AUTO_TOPIC_CREATION = new Field.Bool("allow_auto_topic_creation",
"If this and the broker config <code>auto.create.topics.enable</code> are true, topics that " +
"don't exist will be created by the broker. Otherwise, no topics will be created by the broker.");
private static final Schema METADATA_REQUEST_V4 = new Schema(
new Field(TOPICS_KEY_NAME, ArrayOf.nullable(STRING), "An array of topics to fetch metadata for. " +
"If the topics array is null fetch metadata for all topics."),
ALLOW_AUTO_TOPIC_CREATION);
/* The v5 metadata request is the same as v4. An additional field for offline_replicas has been added to the v5 metadata response */
private static final Schema METADATA_REQUEST_V5 = METADATA_REQUEST_V4;
/**
* The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
*/
private static final Schema METADATA_REQUEST_V6 = METADATA_REQUEST_V5;
/**
* Bumped for the addition of the current leader epoch in the metadata response.
*/
private static final Schema METADATA_REQUEST_V7 = METADATA_REQUEST_V6;
public static Schema[] schemaVersions() {
return new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3,
METADATA_REQUEST_V4, METADATA_REQUEST_V5, METADATA_REQUEST_V6, METADATA_REQUEST_V7};
}
public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
private static final List<String> ALL_TOPICS = null;
private static final MetadataRequestData ALL_TOPICS_REQUEST_DATA = new MetadataRequestData().
setTopics(null).setAllowAutoTopicCreation(true);
// The list of topics, or null if we want to request metadata about all topics.
private final List<String> topics;
private final boolean allowAutoTopicCreation;
private final MetadataRequestData data;
public Builder(MetadataRequestData data) {
super(ApiKeys.METADATA);
this.data = data;
}
public Builder(List<String> topics, boolean allowAutoTopicCreation, short version) {
super(ApiKeys.METADATA, version);
MetadataRequestData data = new MetadataRequestData();
if (topics == null)
data.setTopics(null);
else {
topics.forEach(topic -> data.topics().add(new MetadataRequestTopic().setName(topic)));
}
data.setAllowAutoTopicCreation(allowAutoTopicCreation);
this.data = data;
}
public Builder(List<String> topics, boolean allowAutoTopicCreation) {
this(topics, allowAutoTopicCreation, ApiKeys.METADATA.latestVersion());
}
public static Builder allTopics() {
// This never causes auto-creation, but we set the boolean to true because that is the default value when
// deserializing V2 and older. This way, the value is consistent after serialization and deserialization.
return new Builder(ALL_TOPICS, true);
return new Builder(ALL_TOPICS_REQUEST_DATA);
}
public Builder(List<String> topics, boolean allowAutoTopicCreation) {
super(ApiKeys.METADATA);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}
public List<String> topics() {
return this.topics;
public boolean emptyTopicList() {
return data.topics().isEmpty();
}
public boolean isAllTopics() {
return this.topics == ALL_TOPICS;
return data.topics() == null;
}
public List<String> topics() {
return data.topics()
.stream()
.map(MetadataRequestTopic::name)
.collect(Collectors.toList());
}
@Override
public MetadataRequest build(short version) {
if (version < 1)
throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
if (!allowAutoTopicCreation && version < 4)
if (!data.allowAutoTopicCreation() && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
return new MetadataRequest(this.topics, allowAutoTopicCreation, version);
return new MetadataRequest(data, version);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=MetadataRequest").
append(", topics=");
if (topics == null) {
bld.append("<ALL>");
} else {
bld.append(Utils.join(topics, ","));
}
bld.append(")");
return bld.toString();
return data.toString();
}
}
private final List<String> topics;
private final boolean allowAutoTopicCreation;
private final MetadataRequestData data;
private final short version;
/**
* In v0 null is not allowed and an empty list indicates requesting all topics.
* Note: modern clients do not support sending v0 requests.
* In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
*/
public MetadataRequest(List<String> topics, boolean allowAutoTopicCreation, short version) {
public MetadataRequest(MetadataRequestData data, short version) {
super(ApiKeys.METADATA, version);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.data = data;
this.version = version;
}
public MetadataRequest(Struct struct, short version) {
super(ApiKeys.METADATA, version);
Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
if (topicArray != null) {
if (topicArray.length == 0 && version == 0) {
topics = null;
} else {
topics = new ArrayList<>();
for (Object topicObj: topicArray) {
topics.add((String) topicObj);
}
}
} else {
topics = null;
}
this.data = new MetadataRequestData(struct, version);
this.version = version;
}
allowAutoTopicCreation = struct.getOrElse(ALLOW_AUTO_TOPIC_CREATION, true);
public MetadataRequestData data() {
return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
Errors error = Errors.forException(e);
List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
if (topics != null) {
for (String topic : topics)
topicMetadatas.add(new MetadataResponse.TopicMetadata(error, topic, false, partitions));
MetadataResponseData responseData = new MetadataResponseData();
if (topics() != null) {
for (String topic :topics())
responseData.topics().add(new MetadataResponseData.MetadataResponseTopic()
.setName(topic)
.setErrorCode(error.code())
.setIsInternal(false)
.setPartitions(Collections.emptyList()));
}
short versionId = version();
@ -177,13 +134,15 @@ public class MetadataRequest extends AbstractRequest {
case 0:
case 1:
case 2:
return new MetadataResponse(Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
return new MetadataResponse(responseData);
case 3:
case 4:
case 5:
case 6:
case 7:
return new MetadataResponse(throttleTimeMs, Collections.emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
case 8:
responseData.setThrottleTimeMs(throttleTimeMs);
return new MetadataResponse(responseData);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ApiKeys.METADATA.latestVersion()));
@ -191,29 +150,37 @@ public class MetadataRequest extends AbstractRequest {
}
public boolean isAllTopics() {
return topics == null;
return (data.topics() == null) ||
(data.topics().isEmpty() && version == 0); //In version 0, an empty topic list indicates
// "request metadata for all topics."
}
public List<String> topics() {
return topics;
if (isAllTopics()) //In version 0, we return null for empty topic list
return null;
else
return data.topics()
.stream()
.map(MetadataRequestTopic::name)
.collect(Collectors.toList());
}
public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
return data.allowAutoTopicCreation();
}
public static MetadataRequest parse(ByteBuffer buffer, short version) {
return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version);
}
public static List<MetadataRequestTopic> convertToMetadataRequestTopic(final Collection<String> topics) {
return topics.stream().map(topic -> new MetadataRequestTopic()
.setName(topic))
.collect(Collectors.toList());
}
@Override
protected Struct toStruct() {
Struct struct = new Struct(ApiKeys.METADATA.requestSchema(version()));
if (topics == null)
struct.set(TOPICS_KEY_NAME, null);
else
struct.set(TOPICS_KEY_NAME, topics.toArray());
struct.setIfExists(ALLOW_AUTO_TOPIC_CREATION, allowAutoTopicCreation);
return struct;
return data.toStruct(version);
}
}

View File

@ -19,11 +19,14 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
@ -33,15 +36,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import java.util.stream.Collectors;
/**
* Possible topic-level error codes:
@ -57,239 +55,37 @@ import static org.apache.kafka.common.protocol.types.Type.INT32;
public class MetadataResponse extends AbstractResponse {
public static final int NO_CONTROLLER_ID = -1;
private static final Field.ComplexArray BROKERS = new Field.ComplexArray("brokers",
"Host and port information for all brokers.");
private static final Field.ComplexArray TOPIC_METADATA = new Field.ComplexArray("topic_metadata",
"Metadata for requested topics");
private MetadataResponseData data;
// cluster level fields
private static final Field.NullableStr CLUSTER_ID = new Field.NullableStr("cluster_id",
"The cluster id that this broker belongs to.");
private static final Field.Int32 CONTROLLER_ID = new Field.Int32("controller_id",
"The broker id of the controller broker.");
// broker level fields
private static final Field.Int32 NODE_ID = new Field.Int32("node_id", "The broker id.");
private static final Field.Str HOST = new Field.Str("host", "The hostname of the broker.");
private static final Field.Int32 PORT = new Field.Int32("port", "The port on which the broker accepts requests.");
private static final Field.NullableStr RACK = new Field.NullableStr("rack", "The rack of the broker.");
// topic level fields
private static final Field.ComplexArray PARTITION_METADATA = new Field.ComplexArray("partition_metadata",
"Metadata for each partition of the topic.");
private static final Field.Bool IS_INTERNAL = new Field.Bool("is_internal",
"Indicates if the topic is considered a Kafka internal topic");
// partition level fields
private static final Field.Int32 LEADER = new Field.Int32("leader",
"The id of the broker acting as leader for this partition.");
private static final Field.Array REPLICAS = new Field.Array("replicas", INT32,
"The set of all nodes that host this partition.");
private static final Field.Array ISR = new Field.Array("isr", INT32,
"The set of nodes that are in sync with the leader for this partition.");
private static final Field.Array OFFLINE_REPLICAS = new Field.Array("offline_replicas", INT32,
"The set of offline replicas of this partition.");
private static final Field METADATA_BROKER_V0 = BROKERS.withFields(
NODE_ID,
HOST,
PORT);
private static final Field PARTITION_METADATA_V0 = PARTITION_METADATA.withFields(
ERROR_CODE,
PARTITION_ID,
LEADER,
REPLICAS,
ISR);
private static final Field TOPIC_METADATA_V0 = TOPIC_METADATA.withFields(
ERROR_CODE,
TOPIC_NAME,
PARTITION_METADATA_V0);
private static final Schema METADATA_RESPONSE_V0 = new Schema(
METADATA_BROKER_V0,
TOPIC_METADATA_V0);
// V1 adds fields for the rack of each broker, the controller id, and whether or not the topic is internal
private static final Field METADATA_BROKER_V1 = BROKERS.withFields(
NODE_ID,
HOST,
PORT,
RACK);
private static final Field TOPIC_METADATA_V1 = TOPIC_METADATA.withFields(
ERROR_CODE,
TOPIC_NAME,
IS_INTERNAL,
PARTITION_METADATA_V0);
private static final Schema METADATA_RESPONSE_V1 = new Schema(
METADATA_BROKER_V1,
CONTROLLER_ID,
TOPIC_METADATA_V1);
// V2 added a field for the cluster id
private static final Schema METADATA_RESPONSE_V2 = new Schema(
METADATA_BROKER_V1,
CLUSTER_ID,
CONTROLLER_ID,
TOPIC_METADATA_V1);
// V3 adds the throttle time to the response
private static final Schema METADATA_RESPONSE_V3 = new Schema(
THROTTLE_TIME_MS,
METADATA_BROKER_V1,
CLUSTER_ID,
CONTROLLER_ID,
TOPIC_METADATA_V1);
private static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;
// V5 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline.
private static final Field PARTITION_METADATA_V5 = PARTITION_METADATA.withFields(
ERROR_CODE,
PARTITION_ID,
LEADER,
REPLICAS,
ISR,
OFFLINE_REPLICAS);
private static final Field TOPIC_METADATA_V5 = TOPIC_METADATA.withFields(
ERROR_CODE,
TOPIC_NAME,
IS_INTERNAL,
PARTITION_METADATA_V5);
private static final Schema METADATA_RESPONSE_V5 = new Schema(
THROTTLE_TIME_MS,
METADATA_BROKER_V1,
CLUSTER_ID,
CONTROLLER_ID,
TOPIC_METADATA_V5);
// V6 bump used to indicate that on quota violation brokers send out responses before throttling.
private static final Schema METADATA_RESPONSE_V6 = METADATA_RESPONSE_V5;
// V7 adds the leader epoch to the partition metadata
private static final Field PARTITION_METADATA_V7 = PARTITION_METADATA.withFields(
ERROR_CODE,
PARTITION_ID,
LEADER,
LEADER_EPOCH,
REPLICAS,
ISR,
OFFLINE_REPLICAS);
private static final Field TOPIC_METADATA_V7 = TOPIC_METADATA.withFields(
ERROR_CODE,
TOPIC_NAME,
IS_INTERNAL,
PARTITION_METADATA_V7);
private static final Schema METADATA_RESPONSE_V7 = new Schema(
THROTTLE_TIME_MS,
METADATA_BROKER_V1,
CLUSTER_ID,
CONTROLLER_ID,
TOPIC_METADATA_V7);
public static Schema[] schemaVersions() {
return new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3,
METADATA_RESPONSE_V4, METADATA_RESPONSE_V5, METADATA_RESPONSE_V6, METADATA_RESPONSE_V7};
public MetadataResponse(MetadataResponseData data) {
this.data = data;
}
private final int throttleTimeMs;
private final Collection<Node> brokers;
private final Node controller;
private final List<TopicMetadata> topicMetadata;
private final String clusterId;
/**
* Constructor for all versions.
*/
public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
this(DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
private Map<Integer, Node> brokersMap() {
return data.brokers().stream().collect(
Collectors.toMap(MetadataResponseBroker::nodeId, b -> new Node(b.nodeId(), b.host(), b.port(), b.rack())));
}
public MetadataResponse(int throttleTimeMs, List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) {
this.throttleTimeMs = throttleTimeMs;
this.brokers = brokers;
this.controller = getControllerNode(controllerId, brokers);
this.topicMetadata = topicMetadata;
this.clusterId = clusterId;
public MetadataResponse(Struct struct, short version) {
this(new MetadataResponseData(struct, version));
}
public MetadataResponse(Struct struct) {
this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME);
Map<Integer, Node> brokers = new HashMap<>();
Object[] brokerStructs = struct.get(BROKERS);
for (Object brokerStruct : brokerStructs) {
Struct broker = (Struct) brokerStruct;
int nodeId = broker.get(NODE_ID);
String host = broker.get(HOST);
int port = broker.get(PORT);
// This field only exists in v1+
// When we can't know if a rack exists in a v0 response we default to null
String rack = broker.getOrElse(RACK, null);
brokers.put(nodeId, new Node(nodeId, host, port, rack));
}
// This field only exists in v1+
// When we can't know the controller id in a v0 response we default to NO_CONTROLLER_ID
int controllerId = struct.getOrElse(CONTROLLER_ID, NO_CONTROLLER_ID);
// This field only exists in v2+
this.clusterId = struct.getOrElse(CLUSTER_ID, null);
List<TopicMetadata> topicMetadata = new ArrayList<>();
Object[] topicInfos = struct.get(TOPIC_METADATA);
for (Object topicInfoObj : topicInfos) {
Struct topicInfo = (Struct) topicInfoObj;
Errors topicError = Errors.forCode(topicInfo.get(ERROR_CODE));
String topic = topicInfo.get(TOPIC_NAME);
// This field only exists in v1+
// When we can't know if a topic is internal or not in a v0 response we default to false
boolean isInternal = topicInfo.getOrElse(IS_INTERNAL, false);
List<PartitionMetadata> partitionMetadata = new ArrayList<>();
Object[] partitionInfos = topicInfo.get(PARTITION_METADATA);
for (Object partitionInfoObj : partitionInfos) {
Struct partitionInfo = (Struct) partitionInfoObj;
Errors partitionError = Errors.forCode(partitionInfo.get(ERROR_CODE));
int partition = partitionInfo.get(PARTITION_ID);
int leader = partitionInfo.get(LEADER);
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionInfo, LEADER_EPOCH);
Node leaderNode = leader == -1 ? null : brokers.get(leader);
Object[] replicas = partitionInfo.get(REPLICAS);
List<Node> replicaNodes = convertToNodes(brokers, replicas);
Object[] isr = partitionInfo.get(ISR);
List<Node> isrNodes = convertToNodes(brokers, isr);
Object[] offlineReplicas = partitionInfo.getOrEmpty(OFFLINE_REPLICAS);
List<Node> offlineNodes = convertToNodes(brokers, offlineReplicas);
partitionMetadata.add(new PartitionMetadata(partitionError, partition, leaderNode, leaderEpoch,
replicaNodes, isrNodes, offlineNodes));
}
topicMetadata.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadata));
}
this.brokers = brokers.values();
this.controller = getControllerNode(controllerId, brokers.values());
this.topicMetadata = topicMetadata;
@Override
protected Struct toStruct(short version) {
return data.toStruct(version);
}
private List<Node> convertToNodes(Map<Integer, Node> brokers, Object[] brokerIds) {
List<Node> nodes = new ArrayList<>(brokerIds.length);
for (Object brokerId : brokerIds)
public MetadataResponseData data() {
return data;
}
private List<Node> convertToNodes(Map<Integer, Node> brokers, List<Integer> brokerIds) {
List<Node> nodes = new ArrayList<>(brokerIds.size());
for (Integer brokerId : brokerIds)
if (brokers.containsKey(brokerId))
nodes.add(brokers.get(brokerId));
else
nodes.add(new Node((int) brokerId, "", -1));
nodes.add(new Node(brokerId, "", -1));
return nodes;
}
@ -303,7 +99,7 @@ public class MetadataResponse extends AbstractResponse {
@Override
public int throttleTimeMs() {
return throttleTimeMs;
return data.throttleTimeMs();
}
/**
@ -312,9 +108,9 @@ public class MetadataResponse extends AbstractResponse {
*/
public Map<String, Errors> errors() {
Map<String, Errors> errors = new HashMap<>();
for (TopicMetadata metadata : topicMetadata) {
if (metadata.error != Errors.NONE)
errors.put(metadata.topic(), metadata.error);
for (MetadataResponseTopic metadata : data.topics()) {
if (metadata.errorCode() != Errors.NONE.code())
errors.put(metadata.name(), Errors.forCode(metadata.errorCode()));
}
return errors;
}
@ -322,8 +118,8 @@ public class MetadataResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
for (TopicMetadata metadata : topicMetadata)
updateErrorCounts(errorCounts, metadata.error);
for (MetadataResponseTopic metadata : data.topics())
updateErrorCounts(errorCounts, Errors.forCode(metadata.errorCode()));
return errorCounts;
}
@ -332,9 +128,9 @@ public class MetadataResponse extends AbstractResponse {
*/
public Set<String> topicsByError(Errors error) {
Set<String> errorTopics = new HashSet<>();
for (TopicMetadata metadata : topicMetadata) {
if (metadata.error == error)
errorTopics.add(metadata.topic());
for (MetadataResponseTopic metadata : data.topics()) {
if (metadata.errorCode() == error.code())
errorTopics.add(metadata.name());
}
return errorTopics;
}
@ -346,7 +142,7 @@ public class MetadataResponse extends AbstractResponse {
public Cluster cluster() {
Set<String> internalTopics = new HashSet<>();
List<PartitionInfo> partitions = new ArrayList<>();
for (TopicMetadata metadata : topicMetadata) {
for (TopicMetadata metadata : topicMetadata()) {
if (metadata.error == Errors.NONE) {
if (metadata.isInternal)
@ -356,8 +152,8 @@ public class MetadataResponse extends AbstractResponse {
}
}
}
return new Cluster(this.clusterId, this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, this.controller);
return new Cluster(data.clusterId(), brokersMap().values(), partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED),
topicsByError(Errors.INVALID_TOPIC_EXCEPTION), internalTopics, controller());
}
/**
@ -379,7 +175,7 @@ public class MetadataResponse extends AbstractResponse {
* @return the brokers
*/
public Collection<Node> brokers() {
return brokers;
return new ArrayList<>(brokersMap().values());
}
/**
@ -387,7 +183,30 @@ public class MetadataResponse extends AbstractResponse {
* @return the topicMetadata
*/
public Collection<TopicMetadata> topicMetadata() {
return topicMetadata;
List<TopicMetadata> topicMetadataList = new ArrayList<>();
for (MetadataResponseTopic topicMetadata : data.topics()) {
Errors topicError = Errors.forCode(topicMetadata.errorCode());
String topic = topicMetadata.name();
boolean isInternal = topicMetadata.isInternal();
List<PartitionMetadata> partitionMetadataList = new ArrayList<>();
for (MetadataResponsePartition partitionMetadata : topicMetadata.partitions()) {
Errors partitionError = Errors.forCode(partitionMetadata.errorCode());
int partitionIndex = partitionMetadata.partitionIndex();
int leader = partitionMetadata.leaderId();
Optional<Integer> leaderEpoch = RequestUtils.getLeaderEpoch(partitionMetadata.leaderEpoch());
Node leaderNode = leader == -1 ? null : brokersMap().get(leader);
List<Node> replicaNodes = convertToNodes(brokersMap(), partitionMetadata.replicaNodes());
List<Node> isrNodes = convertToNodes(brokersMap(), partitionMetadata.isrNodes());
List<Node> offlineNodes = convertToNodes(brokersMap(), partitionMetadata.offlineReplicas());
partitionMetadataList.add(new PartitionMetadata(partitionError, partitionIndex, leaderNode, leaderEpoch,
replicaNodes, isrNodes, offlineNodes));
}
topicMetadataList.add(new TopicMetadata(topicError, topic, isInternal, partitionMetadataList,
topicMetadata.topicAuthorizedOperations()));
}
return topicMetadataList;
}
/**
@ -395,7 +214,7 @@ public class MetadataResponse extends AbstractResponse {
* @return the controller node or null if it doesn't exist
*/
public Node controller() {
return controller;
return getControllerNode(data.controllerId(), brokers());
}
/**
@ -403,11 +222,11 @@ public class MetadataResponse extends AbstractResponse {
* @return cluster identifier if it is present in the response, null otherwise.
*/
public String clusterId() {
return this.clusterId;
return this.data.clusterId();
}
public static MetadataResponse parse(ByteBuffer buffer, short version) {
return new MetadataResponse(ApiKeys.METADATA.parseResponse(version, buffer));
return new MetadataResponse(ApiKeys.METADATA.responseSchema(version).read(buffer), version);
}
public static class TopicMetadata {
@ -415,15 +234,25 @@ public class MetadataResponse extends AbstractResponse {
private final String topic;
private final boolean isInternal;
private final List<PartitionMetadata> partitionMetadata;
private int authorizedOperations;
public TopicMetadata(Errors error,
String topic,
boolean isInternal,
List<PartitionMetadata> partitionMetadata,
int authorizedOperations) {
this.error = error;
this.topic = topic;
this.isInternal = isInternal;
this.partitionMetadata = partitionMetadata;
this.authorizedOperations = authorizedOperations;
}
public TopicMetadata(Errors error,
String topic,
boolean isInternal,
List<PartitionMetadata> partitionMetadata) {
this.error = error;
this.topic = topic;
this.isInternal = isInternal;
this.partitionMetadata = partitionMetadata;
this(error, topic, isInternal, partitionMetadata, 0);
}
public Errors error() {
@ -442,13 +271,40 @@ public class MetadataResponse extends AbstractResponse {
return partitionMetadata;
}
public void authorizedOperations(int authorizedOperations) {
this.authorizedOperations = authorizedOperations;
}
public int authorizedOperations() {
return authorizedOperations;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final TopicMetadata that = (TopicMetadata) o;
return isInternal == that.isInternal &&
error == that.error &&
Objects.equals(topic, that.topic) &&
Objects.equals(partitionMetadata, that.partitionMetadata) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(error, topic, isInternal, partitionMetadata, authorizedOperations);
}
@Override
public String toString() {
return "(type=TopicMetadata" +
", error=" + error +
", topic=" + topic +
", isInternal=" + isInternal +
", partitionMetadata=" + partitionMetadata + ')';
return "TopicMetadata{" +
"error=" + error +
", topic='" + topic + '\'' +
", isInternal=" + isInternal +
", partitionMetadata=" + partitionMetadata +
", authorizedOperations=" + authorizedOperations +
'}';
}
}
@ -523,68 +379,54 @@ public class MetadataResponse extends AbstractResponse {
}
}
@Override
protected Struct toStruct(short version) {
Struct struct = new Struct(ApiKeys.METADATA.responseSchema(version));
struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs);
List<Struct> brokerArray = new ArrayList<>();
for (Node node : brokers) {
Struct broker = struct.instance(BROKERS);
broker.set(NODE_ID, node.id());
broker.set(HOST, node.host());
broker.set(PORT, node.port());
// This field only exists in v1+
broker.setIfExists(RACK, node.rack());
brokerArray.add(broker);
}
struct.set(BROKERS, brokerArray.toArray());
public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList,
int clusterAuthorizedOperations) {
MetadataResponseData responseData = new MetadataResponseData();
responseData.setThrottleTimeMs(throttleTimeMs);
brokers.forEach(broker -> {
responseData.brokers().add(new MetadataResponseBroker()
.setNodeId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setRack(broker.rack()));
});
// This field only exists in v1+
struct.setIfExists(CONTROLLER_ID, controller == null ? NO_CONTROLLER_ID : controller.id());
responseData.setClusterId(clusterId);
responseData.setControllerId(controllerId);
responseData.setClusterAuthorizedOperations(clusterAuthorizedOperations);
// This field only exists in v2+
struct.setIfExists(CLUSTER_ID, clusterId);
List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size());
for (TopicMetadata metadata : topicMetadata) {
Struct topicData = struct.instance(TOPIC_METADATA);
topicData.set(TOPIC_NAME, metadata.topic);
topicData.set(ERROR_CODE, metadata.error.code());
// This field only exists in v1+
topicData.setIfExists(IS_INTERNAL, metadata.isInternal());
List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size());
for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
Struct partitionData = topicData.instance(PARTITION_METADATA);
partitionData.set(ERROR_CODE, partitionMetadata.error.code());
partitionData.set(PARTITION_ID, partitionMetadata.partition);
partitionData.set(LEADER, partitionMetadata.leaderId());
// Leader epoch exists in v7 forward
RequestUtils.setLeaderEpochIfExists(partitionData, LEADER_EPOCH, partitionMetadata.leaderEpoch);
ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size());
for (Node node : partitionMetadata.replicas)
replicas.add(node.id());
partitionData.set(REPLICAS, replicas.toArray());
ArrayList<Integer> isr = new ArrayList<>(partitionMetadata.isr.size());
for (Node node : partitionMetadata.isr)
isr.add(node.id());
partitionData.set(ISR, isr.toArray());
if (partitionData.hasField(OFFLINE_REPLICAS)) {
ArrayList<Integer> offlineReplicas = new ArrayList<>(partitionMetadata.offlineReplicas.size());
for (Node node : partitionMetadata.offlineReplicas)
offlineReplicas.add(node.id());
partitionData.set(OFFLINE_REPLICAS, offlineReplicas.toArray());
}
partitionMetadataArray.add(partitionData);
topicMetadataList.forEach(topicMetadata -> {
MetadataResponseTopic metadataResponseTopic = new MetadataResponseTopic();
metadataResponseTopic
.setErrorCode(topicMetadata.error.code())
.setName(topicMetadata.topic)
.setIsInternal(topicMetadata.isInternal)
.setTopicAuthorizedOperations(topicMetadata.authorizedOperations);
for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) {
metadataResponseTopic.partitions().add(new MetadataResponsePartition()
.setErrorCode(partitionMetadata.error.code())
.setPartitionIndex(partitionMetadata.partition)
.setLeaderId(partitionMetadata.leader == null ? -1 : partitionMetadata.leader.id())
.setLeaderEpoch(partitionMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setReplicaNodes(partitionMetadata.replicas.stream().map(Node::id).collect(Collectors.toList()))
.setIsrNodes(partitionMetadata.isr.stream().map(Node::id).collect(Collectors.toList()))
.setOfflineReplicas(partitionMetadata.offlineReplicas.stream().map(Node::id).collect(Collectors.toList())));
}
topicData.set(PARTITION_METADATA, partitionMetadataArray.toArray());
topicMetadataArray.add(topicData);
}
struct.set(TOPIC_METADATA, topicMetadataArray.toArray());
return struct;
responseData.topics().add(metadataResponseTopic);
});
return new MetadataResponse(responseData);
}
public static MetadataResponse prepareResponse(int throttleTimeMs, List<Node> brokers, String clusterId,
int controllerId, List<TopicMetadata> topicMetadataList) {
return prepareResponse(throttleTimeMs, brokers, clusterId, controllerId, topicMetadataList, 0);
}
public static MetadataResponse prepareResponse(List<Node> brokers, String clusterId, int controllerId,
List<TopicMetadata> topicMetadata) {
return prepareResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, brokers, clusterId, controllerId, topicMetadata);
}
@Override

View File

@ -117,4 +117,9 @@ final class RequestUtils {
return leaderEpochOpt;
}
static Optional<Integer> getLeaderEpoch(int leaderEpoch) {
Optional<Integer> leaderEpochOpt = leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
return leaderEpochOpt;
}
}

View File

@ -17,7 +17,7 @@
"apiKey": 3,
"type": "request",
"name": "MetadataRequest",
"validVersions": "0-7",
"validVersions": "0-8",
"fields": [
// In version 0, an empty array indicates "request metadata for all topics." In version 1 and
// higher, an empty array indicates "request metadata for no topics," and a null array is used to
@ -26,12 +26,17 @@
// Version 2 and 3 are the same as version 1.
//
// Version 4 adds AllowAutoTopicCreation.
// Starting in version 8, authorized operations can be requested for cluster and topic resource.
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The topic name." }
]},
{ "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
"about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." }
"about": "If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so." },
{ "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include cluster authorized operations." },
{ "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+",
"about": "Whether to include topic authorized operations." }
]
}

View File

@ -32,7 +32,8 @@
// Starting in version 6, on quota violation, brokers send out responses before throttling.
//
// Version 7 adds the leader epoch to the partition metadata.
"validVersions": "0-7",
// Starting in version 8, brokers can send authorized operations for topic and cluster.
"validVersions": "0-8",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@ -47,7 +48,7 @@
{ "name": "Rack", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, "default": "null",
"about": "The rack of the broker, or null if it has not been assigned to a rack." }
]},
{ "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true,
{ "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null",
"about": "The cluster ID that responding broker belongs to." },
{ "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The ID of the controller broker." },
@ -75,7 +76,11 @@
"about": "The set of nodes that are in sync with the leader for this partition." },
{ "name": "OfflineReplicas", "type": "[]int32", "versions": "5+", "ignorable": true,
"about": "The set of offline replicas of this partition." }
]}
]}
]},
{ "name": "TopicAuthorizedOperations", "type": "int32", "versions": "8+",
"about": "32-bit bitfield to represent authorized operations for this topic." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8+",
"about": "32-bit bitfield to represent authorized operations for this cluster." }
]
}

View File

@ -52,7 +52,7 @@ public class MetadataTest {
new ClusterResourceListeners());
private static MetadataResponse emptyMetadataResponse() {
return new MetadataResponse(
return MetadataResponse.prepareResponse(
Collections.emptyList(),
null,
-1,

View File

@ -664,7 +664,7 @@ public class MockClient implements KafkaClient {
private void maybeCheckExpectedTopics(MetadataUpdate update, MetadataRequest.Builder builder) {
if (update.expectMatchRefreshTopics) {
if (builder.topics() == null)
if (builder.isAllTopics())
throw new IllegalStateException("The metadata topics does not match expectation. "
+ "Expected topics: " + update.topics()
+ ", asked topics: ALL");

View File

@ -250,7 +250,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest, null, true);
env.kafkaClient().prepareResponse(request -> request instanceof MetadataRequest,
new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
prepareCreateTopicsResponse("myTopic", Errors.NONE));
@ -274,7 +274,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setUnreachable(cluster.nodes().get(0), 200);
env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest,
new MetadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
MetadataResponse.prepareResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
prepareCreateTopicsResponse("myTopic", Errors.NONE));
@ -369,7 +369,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponseFrom(
prepareCreateTopicsResponse("myTopic", Errors.NOT_CONTROLLER),
env.cluster().nodeById(0));
env.kafkaClient().prepareResponse(new MetadataResponse(env.cluster().nodes(),
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
1,
Collections.<MetadataResponse.TopicMetadata>emptyList()));
@ -457,7 +457,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(null, true);
// The next one succeeds and gives us the controller id
env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(),
initializedCluster.controller().id(),
Collections.emptyList()));
@ -467,7 +467,7 @@ public class KafkaAdminClientTest {
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
Errors.NONE, 0, leader, Optional.of(10), singletonList(leader),
singletonList(leader), singletonList(leader));
env.kafkaClient().prepareResponse(new MetadataResponse(initializedCluster.nodes(),
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, false,
singletonList(partitionMetadata)))));
@ -845,7 +845,7 @@ public class KafkaAdminClientTest {
t.add(new MetadataResponse.TopicMetadata(Errors.NONE, "my_topic", false, p));
env.kafkaClient().prepareResponse(new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(), cluster.controller().id(), t));
env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(cluster.nodes(), cluster.clusterResource().clusterId(), cluster.controller().id(), t));
env.kafkaClient().prepareResponse(new DeleteRecordsResponse(0, m));
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
@ -925,14 +925,14 @@ public class KafkaAdminClientTest {
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
new MetadataResponse(
MetadataResponse.prepareResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
env.kafkaClient().prepareResponse(
new MetadataResponse(
MetadataResponse.prepareResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
@ -1027,7 +1027,7 @@ public class KafkaAdminClientTest {
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
new MetadataResponse(
MetadataResponse.prepareResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
@ -38,6 +39,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@ -125,19 +127,22 @@ public class MockAdminClient extends AdminClient {
KafkaFutureImpl<Collection<Node>> nodesFuture = new KafkaFutureImpl<>();
KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>();
KafkaFutureImpl<String> brokerIdFuture = new KafkaFutureImpl<>();
KafkaFutureImpl<Set<AclOperation>> authorizedOperationsFuture = new KafkaFutureImpl<>();
if (timeoutNextRequests > 0) {
nodesFuture.completeExceptionally(new TimeoutException());
controllerFuture.completeExceptionally(new TimeoutException());
brokerIdFuture.completeExceptionally(new TimeoutException());
authorizedOperationsFuture.completeExceptionally(new TimeoutException());
--timeoutNextRequests;
} else {
nodesFuture.complete(brokers);
controllerFuture.complete(controller);
brokerIdFuture.complete(clusterId);
authorizedOperationsFuture.complete(Collections.emptySet());
}
return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture);
return new DescribeClusterResult(nodesFuture, controllerFuture, brokerIdFuture, authorizedOperationsFuture);
}
@Override
@ -228,7 +233,8 @@ public class MockAdminClient extends AdminClient {
if (topicName.equals(requestedTopic) && !topicDescription.getValue().markedForDeletion) {
TopicMetadata topicMetadata = topicDescription.getValue();
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions));
future.complete(new TopicDescription(topicName, topicMetadata.isInternalTopic, topicMetadata.partitions,
Collections.emptySet()));
topicDescriptions.put(topicName, future);
break;
}

View File

@ -1921,7 +1921,7 @@ public class KafkaConsumerTest {
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
invalidTopicName, false, Collections.emptyList()));
MetadataResponse updateResponse = new MetadataResponse(cluster.nodes(),
MetadataResponse updateResponse = MetadataResponse.prepareResponse(cluster.nodes(),
cluster.clusterResource().clusterId(),
cluster.controller().id(),
topicMetadata);

View File

@ -1171,7 +1171,7 @@ public class ConsumerCoordinatorTest {
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
Topic.GROUP_METADATA_TOPIC_NAME, true, singletonList(partitionMetadata));
client.updateMetadata(new MetadataResponse(singletonList(node), "clusterId", node.id(),
client.updateMetadata(MetadataResponse.prepareResponse(singletonList(node), "clusterId", node.id(),
singletonList(topicMetadata)));
coordinator.maybeUpdateSubscriptionMetadata();

View File

@ -75,7 +75,8 @@ public class ConsumerMetadataTest {
topics.add(topicMetadata("__matching_topic", false));
topics.add(topicMetadata("non_matching_topic", false));
MetadataResponse response = new MetadataResponse(singletonList(node), "clusterId", node.id(), topics);
MetadataResponse response = MetadataResponse.prepareResponse(singletonList(node),
"clusterId", node.id(), topics);
metadata.update(response, time.milliseconds());
if (includeInternalTopics)
@ -142,7 +143,8 @@ public class ConsumerMetadataTest {
for (String expectedInternalTopic : expectedInternalTopics)
topics.add(topicMetadata(expectedInternalTopic, true));
MetadataResponse response = new MetadataResponse(singletonList(node), "clusterId", node.id(), topics);
MetadataResponse response = MetadataResponse.prepareResponse(singletonList(node),
"clusterId", node.id(), topics);
metadata.update(response, time.milliseconds());
assertEquals(allTopics, metadata.fetch().topics());

View File

@ -1720,7 +1720,7 @@ public class FetcherTest {
altTopics.add(alteredTopic);
}
Node controller = originalResponse.controller();
MetadataResponse altered = new MetadataResponse(
MetadataResponse altered = MetadataResponse.prepareResponse(
(List<Node>) originalResponse.brokers(),
originalResponse.clusterId(),
controller != null ? controller.id() : MetadataResponse.NO_CONTROLLER_ID,
@ -3162,7 +3162,7 @@ public class FetcherTest {
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false,
partitionsMetadata);
List<Node> brokers = new ArrayList<>(initialUpdateResponse.brokers());
return new MetadataResponse(brokers, initialUpdateResponse.clusterId(),
return MetadataResponse.prepareResponse(brokers, initialUpdateResponse.clusterId(),
initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata));
}

View File

@ -704,7 +704,7 @@ public class KafkaProducerTest {
List<MetadataResponse.TopicMetadata> topicMetadata = new ArrayList<>();
topicMetadata.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION,
invalidTopicName, false, Collections.emptyList()));
MetadataResponse updateResponse = new MetadataResponse(
MetadataResponse updateResponse = MetadataResponse.prepareResponse(
new ArrayList<>(initialUpdateResponse.brokers()),
initialUpdateResponse.clusterId(),
initialUpdateResponse.controller().id(),

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicSet;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -46,6 +47,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@Ignore
public final class MessageTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);

View File

@ -16,8 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.message.MetadataRequestData;
import org.junit.Test;
import java.util.Collections;
@ -31,22 +30,18 @@ public class MetadataRequestTest {
@Test
public void testEmptyMeansAllTopicsV0() {
Struct rawRequest = new Struct(MetadataRequest.schemaVersions()[0]);
rawRequest.set("topics", new Object[0]);
MetadataRequest parsedRequest = new MetadataRequest(rawRequest, (short) 0);
MetadataRequestData data = new MetadataRequestData();
MetadataRequest parsedRequest = new MetadataRequest(data, (short) 0);
assertTrue(parsedRequest.isAllTopics());
assertNull(parsedRequest.topics());
}
@Test
public void testEmptyMeansEmptyForVersionsAboveV0() {
for (int i = 1; i < MetadataRequest.schemaVersions().length; i++) {
Schema schema = MetadataRequest.schemaVersions()[i];
Struct rawRequest = new Struct(schema);
rawRequest.set("topics", new Object[0]);
if (rawRequest.hasField("allow_auto_topic_creation"))
rawRequest.set("allow_auto_topic_creation", true);
MetadataRequest parsedRequest = new MetadataRequest(rawRequest, (short) i);
for (int i = 1; i < MetadataRequestData.SCHEMAS.length; i++) {
MetadataRequestData data = new MetadataRequestData();
data.setAllowAutoTopicCreation(true);
MetadataRequest parsedRequest = new MetadataRequest(data, (short) i);
assertFalse(parsedRequest.isAllTopics());
assertEquals(Collections.emptyList(), parsedRequest.topics());
}

View File

@ -874,7 +874,7 @@ public class RequestResponseTest {
asList(new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
Optional.empty(), replicas, isr, offlineReplicas))));
return new MetadataResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
return MetadataResponse.prepareResponse(asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
}
@SuppressWarnings("deprecation")

View File

@ -155,7 +155,7 @@ public class TestUtils {
Topic.isInternal(topic), Collections.emptyList()));
}
return new MetadataResponse(nodes, clusterId, 0, topicMetadata);
return MetadataResponse.prepareResponse(nodes, clusterId, 0, topicMetadata);
}
@FunctionalInterface

View File

@ -1043,6 +1043,20 @@ class KafkaApis(val requestChannel: RequestChannel,
getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,
errorUnavailableEndpoints, errorUnavailableListeners)
var clusterAuthorizedOperations = 0
if (request.header.apiVersion >= 8) {
// get cluster authorized operations
if (metadataRequest.data().includeClusterAuthorizedOperations() &&
authorize(request.session, Describe, Resource.ClusterResource))
clusterAuthorizedOperations = authorizedOperations(request.session, Resource.ClusterResource)
// get topic authorized operations
if (metadataRequest.data().includeTopicAuthorizedOperations())
topicMetadata.foreach(topicData => {
topicData.authorizedOperations(authorizedOperations(request.session, Resource(Topic, topicData.topic(), LITERAL)))
})
}
val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
val brokers = metadataCache.getAliveBrokers
@ -1051,12 +1065,13 @@ class KafkaApis(val requestChannel: RequestChannel,
brokers.mkString(","), request.header.correlationId, request.header.clientId))
sendResponseMaybeThrottle(request, requestThrottleMs =>
new MetadataResponse(
requestThrottleMs,
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
MetadataResponse.prepareResponse(
requestThrottleMs,
brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava,
clusterAuthorizedOperations
))
}

View File

@ -53,7 +53,7 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.lang.{Long => JLong}
import kafka.security.auth.Group
import kafka.security.auth.{Cluster, Group, Topic}
/**
* An integration test of the KafkaAdminClient.
@ -224,6 +224,40 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(topics.toSet, topicDesc.keySet.asScala)
}
@Test
def testAuthorizedOperations(): Unit = {
client = AdminClient.create(createConfig())
// without includeAuthorizedOperations flag
var result = client.describeCluster
assertEquals(Set().asJava, result.authorizedOperations().get())
//with includeAuthorizedOperations flag
result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true))
var expectedOperations = configuredClusterPermissions.asJava
assertEquals(expectedOperations, result.authorizedOperations().get())
val topic = "mytopic"
val newTopics = Seq(new NewTopic(topic, 3, 3))
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, expectedPresent = Seq(topic), expectedMissing = List())
// without includeAuthorizedOperations flag
var topicResult = client.describeTopics(Seq(topic).asJava).values
assertEquals(Set().asJava, topicResult.get(topic).get().authorizedOperations())
//with includeAuthorizedOperations flag
topicResult = client.describeTopics(Seq(topic).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).values
expectedOperations = Topic.supportedOperations
.map(operation => operation.toJava).asJava
assertEquals(expectedOperations, topicResult.get(topic).get().authorizedOperations())
}
def configuredClusterPermissions() : Set[AclOperation] = {
Cluster.supportedOperations.map(operation => operation.toJava)
}
/**
* describe should not auto create topics
*/
@ -245,10 +279,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
@Test
def testDescribeCluster(): Unit = {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster.nodes.get()
val clusterId = client.describeCluster().clusterId().get()
val result = client.describeCluster
val nodes = result.nodes.get()
val clusterId = result.clusterId().get()
assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
val controller = client.describeCluster().controller().get()
val controller = result.controller().get()
assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val brokers = brokerList.split(",")

View File

@ -16,10 +16,10 @@ import java.io.File
import java.util
import java.util.Properties
import kafka.security.auth.{Allow, Alter, Authorizer, ClusterAction, Group, Operation, PermissionType, SimpleAclAuthorizer, Acl => AuthAcl, Resource => AuthResource}
import kafka.security.auth.{Allow, Alter, Authorizer, Cluster, ClusterAction, Describe, Group, Operation, PermissionType, Resource, SimpleAclAuthorizer, Topic, Acl => AuthAcl}
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, DescribeConsumerGroupsOptions}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@ -38,6 +38,8 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
val group1 = "group1"
val group2 = "group2"
val group3 = "group3"
val topic1 = "topic1"
val topic2 = "topic2"
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@ -45,11 +47,13 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
override def configureSecurityBeforeServersStart() {
val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
val topicResource = Resource(Topic, Resource.WildCardResource, PatternType.LITERAL)
try {
authorizer.configure(this.configs.head.originals())
authorizer.addAcls(Set(clusterAcl(JaasTestUtils.KafkaServerPrincipalUnqualifiedName, Allow, ClusterAction),
clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Alter)),
AuthResource.ClusterResource)
clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Alter)), Resource.ClusterResource)
authorizer.addAcls(Set(clusterAcl(JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, Allow, Describe)), topicResource)
} finally {
authorizer.close()
}
@ -84,6 +88,15 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
val group3Acl = new AclBinding(new ResourcePattern(ResourceType.GROUP, group3, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW))
val clusteAllAcl = new AclBinding(Resource.ClusterResource.toPattern,
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW))
val topic1Acl = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic1, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.ALL, AclPermissionType.ALLOW))
val topic2All = new AclBinding(new ResourcePattern(ResourceType.TOPIC, topic2, PatternType.LITERAL),
new AccessControlEntry("User:" + JaasTestUtils.KafkaClientPrincipalUnqualifiedName2, "*", AclOperation.DELETE, AclPermissionType.ALLOW))
def createConfig(): Properties = {
val adminClientConfig = new Properties()
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@ -118,4 +131,63 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE), group3Description.authorizedOperations().asScala.toSet)
}
@Test
def testClusterAuthorizedOperations(): Unit = {
client = AdminClient.create(createConfig())
// test without includeAuthorizedOperations flag
var clusterDescribeResult = client.describeCluster()
assertEquals(Set(), clusterDescribeResult.authorizedOperations().get().asScala.toSet)
//test with includeAuthorizedOperations flag, we have give Alter permission
// in configureSecurityBeforeServersStart()
clusterDescribeResult = client.describeCluster(new DescribeClusterOptions().
includeAuthorizedOperations(true))
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.ALTER),
clusterDescribeResult.authorizedOperations().get().asScala.toSet)
// enable all operations for cluster resource
val results = client.createAcls(List(clusteAllAcl).asJava)
assertEquals(Set(clusteAllAcl), results.values.keySet.asScala)
results.all.get
val expectedOperations = Cluster.supportedOperations
.map(operation => operation.toJava).asJava
clusterDescribeResult = client.describeCluster(new DescribeClusterOptions().
includeAuthorizedOperations(true))
assertEquals(expectedOperations, clusterDescribeResult.authorizedOperations().get())
}
@Test
def testTopicAuthorizedOperations(): Unit = {
client = AdminClient.create(createConfig())
createTopic(topic1)
createTopic(topic2)
// test without includeAuthorizedOperations flag
var describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava).all.get()
assertEquals(Set(), describeTopicsResult.get(topic1).authorizedOperations().asScala.toSet)
assertEquals(Set(), describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
//test with includeAuthorizedOperations flag
describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(topic1).authorizedOperations().asScala.toSet)
assertEquals(Set(AclOperation.DESCRIBE), describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
//add few permissions
val results = client.createAcls(List(topic1Acl, topic2All).asJava)
assertEquals(Set(topic1Acl, topic2All), results.values.keySet.asScala)
results.all.get
val expectedOperations = Topic.supportedOperations
.map(operation => operation.toJava).asJava
describeTopicsResult = client.describeTopics(Set(topic1, topic2).asJava,
new DescribeTopicsOptions().includeAuthorizedOperations(true)).all.get()
assertEquals(expectedOperations, describeTopicsResult.get(topic1).authorizedOperations())
assertEquals(Set(AclOperation.DESCRIBE, AclOperation.DELETE),
describeTopicsResult.get(topic2).authorizedOperations().asScala.toSet)
}
}

View File

@ -19,8 +19,7 @@ import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, Cluster
import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
@ -278,6 +277,11 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException])
}
override def configuredClusterPermissions(): Set[AclOperation] = {
Set(AclOperation.ALTER, AclOperation.CREATE, AclOperation.CLUSTER_ACTION, AclOperation.ALTER_CONFIGS,
AclOperation.DESCRIBE, AclOperation.DESCRIBE_CONFIGS)
}
private def verifyCauseIsClusterAuth(e: Throwable): Unit = {
if (!e.getCause.isInstanceOf[ClusterAuthorizationException]) {
throw e.getCause

View File

@ -23,6 +23,7 @@ import kafka.network.SocketServer
import kafka.utils.TestUtils
import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.Assert._
@ -116,7 +117,7 @@ class MetadataRequestTest extends BaseRequestTest {
// v0, Doesn't support a "no topics" request
// v1, Empty list represents "no topics"
val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 1.toShort))
val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List[String]().asJava, true, 1.toShort).build)
assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
}
@ -137,15 +138,15 @@ class MetadataRequestTest extends BaseRequestTest {
val topic4 = "t4"
createTopic(topic1, 1, 1)
val response1 = sendMetadataRequest(new MetadataRequest(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion))
val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, ApiKeys.METADATA.latestVersion).build())
checkAutoCreatedTopic(topic1, topic2, response1)
// V3 doesn't support a configurable allowAutoTopicCreation, so the fact that we set it to `false` has no effect
val response2 = sendMetadataRequest(new MetadataRequest(Seq(topic2, topic3).asJava, false, 3))
val response2 = sendMetadataRequest(new MetadataRequest(requestData(List(topic2, topic3), false), 3.toShort))
checkAutoCreatedTopic(topic2, topic3, response2)
// V4 and higher support a configurable allowAutoTopicCreation
val response3 = sendMetadataRequest(new MetadataRequest(Seq(topic3, topic4).asJava, false, 4))
val response3 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3, topic4).asJava, false, 4.toShort).build)
assertNull(response3.errors.get(topic3))
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response3.errors.get(topic4))
assertEquals(None, zkClient.getTopicPartitionCount(topic4))
@ -201,7 +202,7 @@ class MetadataRequestTest extends BaseRequestTest {
createTopic("t2", 3, 2)
// v0, Empty list represents all topics
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava, true, 0.toShort))
val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(requestData(List(), true), 0.toShort))
assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
@ -238,6 +239,15 @@ class MetadataRequestTest extends BaseRequestTest {
}
}
def requestData(topics: List[String], allowAutoTopicCreation: Boolean): MetadataRequestData = {
val data = new MetadataRequestData
if (topics == null) data.setTopics(null)
else topics.foreach(topic => data.topics.add(new MetadataRequestData.MetadataRequestTopic().setName(topic)))
data.setAllowAutoTopicCreation(allowAutoTopicCreation)
data
}
@Test
def testReplicaDownResponse() {
val replicaDownTopic = "replicaDown"
@ -247,7 +257,7 @@ class MetadataRequestTest extends BaseRequestTest {
createTopic(replicaDownTopic, 1, replicaCount)
// Kill a replica node that is not the leader
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
val downNode = servers.find { server =>
val serverId = server.dataPlaneRequestProcessor.brokerId
@ -258,14 +268,14 @@ class MetadataRequestTest extends BaseRequestTest {
downNode.shutdown()
TestUtils.waitUntilTrue(() => {
val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val response = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
replica.host == "" & replica.port == -1
}, "Replica was not found down", 5000)
// Validate version 0 still filters unavailable replicas and contains error
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 0.toShort))
val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(requestData(List(replicaDownTopic), true), 0.toShort))
val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
@ -275,7 +285,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size == replicaCount - 1)
// Validate version 1 returns unavailable replicas with no error
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val v1MetadataResponse = sendMetadataRequest(new MetadataRequest.Builder(List(replicaDownTopic).asJava, true, 1.toShort).build())
val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))

View File

@ -435,7 +435,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.PRODUCE => new ProduceResponse(response).throttleTimeMs
case ApiKeys.FETCH => FetchResponse.parse(response).throttleTimeMs
case ApiKeys.LIST_OFFSETS => new ListOffsetResponse(response).throttleTimeMs
case ApiKeys.METADATA => new MetadataResponse(response).throttleTimeMs
case ApiKeys.METADATA =>
new MetadataResponse(response, ApiKeys.DESCRIBE_GROUPS.latestVersion()).throttleTimeMs
case ApiKeys.OFFSET_COMMIT => new OffsetCommitResponse(response).throttleTimeMs
case ApiKeys.OFFSET_FETCH => new OffsetFetchResponse(response).throttleTimeMs
case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response).throttleTimeMs

View File

@ -113,17 +113,17 @@ public class InternalTopicManagerTest {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get());
}, Collections.emptySet()), mockAdminClient.describeTopics(Collections.singleton(topic)).values().get(topic).get());
assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
}, Collections.emptySet()), mockAdminClient.describeTopics(Collections.singleton(topic2)).values().get(topic2).get());
assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
}, Collections.emptySet()), mockAdminClient.describeTopics(Collections.singleton(topic3)).values().get(topic3).get());
final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
final ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, topic2);

View File

@ -81,7 +81,8 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
Collections.emptySet()),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
@ -98,7 +99,8 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
Collections.emptySet()),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
@ -178,7 +180,8 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList())),
Collections.emptySet()),
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).values().get(TEST_TOPIC).get()
);
}