KAFKA-2691: Improve handling of authorization failure during metadata refresh

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Jun Rao

Closes #394 from hachikuji/KAFKA-2691
This commit is contained in:
Jason Gustafson 2015-11-04 11:02:30 -08:00 committed by Gwen Shapira
parent c30ee50d82
commit c39e79bb5a
37 changed files with 721 additions and 289 deletions

View File

@ -238,14 +238,18 @@ public final class Metadata {
}
private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
if (cluster != null) {
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
unauthorizedTopics.retainAll(this.topics);
for (String topic : this.topics) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
nodes = cluster.nodes();
}
return new Cluster(nodes, partitionInfos);
return new Cluster(nodes, partitionInfos, unauthorizedTopics);
}
}

View File

@ -544,7 +544,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
metricGrpPrefix,
metricsTags,
this.time,
requestTimeoutMs,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
@ -777,10 +776,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic
* offset reset policy has been configured.
* @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*
* @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic.
* the defaultResetPolicy is NONE
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed
* topics or to the configured groupId
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
@ -883,7 +883,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* encountered (in which case it is thrown to the caller).
*
* @param offsets A map of offsets by partition with associated metadata
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
@ -1008,7 +1011,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The offset
* @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
* available.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
*
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
*/
public long position(TopicPartition partition) {
acquire();
@ -1035,7 +1042,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partition The partition to check
* @return The last committed offset and metadata or null if there was no prior commit
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
@ -1160,7 +1170,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
* The thread which is blocking in an operation will throw {@link WakeupException}.
* The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
*/
@Override
public void wakeup() {

View File

@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@ -88,7 +89,6 @@ public abstract class AbstractCoordinator {
protected final ConsumerNetworkClient client;
protected final Time time;
protected final long retryBackoffMs;
protected final long requestTimeoutMs;
private boolean needsJoinPrepare = true;
private boolean rejoinNeeded = true;
@ -108,7 +108,6 @@ public abstract class AbstractCoordinator {
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
long requestTimeoutMs,
long retryBackoffMs) {
this.client = client;
this.time = time;
@ -120,7 +119,6 @@ public abstract class AbstractCoordinator {
this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
this.heartbeatTask = new HeartbeatTask();
this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
this.requestTimeoutMs = requestTimeoutMs;
this.retryBackoffMs = retryBackoffMs;
}
@ -178,7 +176,7 @@ public abstract class AbstractCoordinator {
public void ensureCoordinatorKnown() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = sendGroupMetadataRequest();
client.poll(future, requestTimeoutMs);
client.poll(future);
if (future.failed()) {
if (future.isRetriable())
@ -376,6 +374,8 @@ public abstract class AbstractCoordinator {
log.error("Attempt to join group {} failed due to: {}",
groupId, error.exception().getMessage());
future.raise(error);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
future.raise(new KafkaException("Unexpected error in join group response: "
@ -427,6 +427,8 @@ public abstract class AbstractCoordinator {
if (errorCode == Errors.NONE.code()) {
future.complete(syncResponse.memberAssignment());
sensors.syncLatency.record(response.requestLatencyMs());
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
future.raise(new GroupAuthorizationException(groupId));
} else {
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.forCode(errorCode));
@ -476,7 +478,8 @@ public abstract class AbstractCoordinator {
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
short errorCode = groupCoordinatorResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
@ -487,8 +490,10 @@ public abstract class AbstractCoordinator {
if (generation > 0)
heartbeatTask.reset();
future.complete(null);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
future.raise(Errors.forCode(errorCode));
}
}
}
@ -538,31 +543,33 @@ public abstract class AbstractCoordinator {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
short error = heartbeatResponse.errorCode();
if (error == Errors.NONE.code()) {
short errorCode = heartbeatResponse.errorCode();
if (errorCode == Errors.NONE.code()) {
log.debug("Received successful heartbeat response.");
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
|| error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
} else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
|| errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
coordinatorDead();
future.raise(Errors.forCode(error));
} else if (error == Errors.REBALANCE_IN_PROGRESS.code()) {
future.raise(Errors.forCode(errorCode));
} else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION.code()) {
} else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID.code()) {
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
future.raise(new GroupAuthorizationException(groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: "
+ Errors.forCode(error).exception().getMessage()));
future.raise(new KafkaException("Unexpected errorCode in heartbeat response: "
+ Errors.forCode(errorCode).exception().getMessage()));
}
}
}

View File

@ -15,6 +15,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@ -82,7 +84,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
String metricGrpPrefix,
Map<String, String> metricTags,
Time time,
long requestTimeoutMs,
long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
@ -95,7 +96,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
metricGrpPrefix,
metricTags,
time,
requestTimeoutMs,
retryBackoffMs);
this.metadata = metadata;
@ -136,6 +136,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
// if we encounter any unauthorized topics, raise an exception to the user
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
@ -340,13 +344,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
if (future.succeeded()) {
if (future.succeeded())
return;
}
if (!future.isRetriable()) {
if (!future.isRetriable())
throw future.exception();
}
Utils.sleep(retryBackoffMs);
}
@ -439,6 +441,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
@ -450,6 +454,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
} else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
future.raise(new GroupAuthorizationException(groupId));
return;
} else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
unauthorizedTopics.add(tp.topic());
} else {
if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
// just retry
@ -458,7 +467,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
|| errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
coordinatorDead();
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
|| errorCode == Errors.ILLEGAL_GENERATION.code()) {
|| errorCode == Errors.ILLEGAL_GENERATION.code()
|| errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
// need to re-join group
subscriptions.needReassignment();
}
@ -473,7 +483,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
}
}
future.complete(null);
if (!unauthorizedTopics.isEmpty())
future.raise(new TopicAuthorizationException(unauthorizedTopics));
else
future.complete(null);
}
}

View File

@ -78,7 +78,7 @@ public class Fetcher<K, V> {
private final Deserializer<V> valueDeserializer;
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
private final Set<TopicPartition> unauthorizedTopicPartitions;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, Long> recordTooLargePartitions;
public Fetcher(ConsumerNetworkClient client,
@ -110,7 +110,7 @@ public class Fetcher<K, V> {
this.records = new LinkedList<PartitionRecords<K, V>>();
this.offsetOutOfRangePartitions = new HashMap<>();
this.unauthorizedTopicPartitions = new HashSet<>();
this.unauthorizedTopics = new HashSet<>();
this.recordTooLargePartitions = new HashMap<>();
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
@ -302,19 +302,18 @@ public class Fetcher<K, V> {
}
/**
* If any topic from previous fetchResponse contatains Authorization error, throw ApiException.
* @throws ApiException
* If any topic from previous fetchResponse contains an Authorization error, raise an exception
* @throws TopicAuthorizationException
*/
private void throwIfUnauthorized() throws ApiException {
if (!unauthorizedTopicPartitions.isEmpty()) {
StringBuilder sb = new StringBuilder();
for (TopicPartition topicPartition : unauthorizedTopicPartitions)
sb.append(topicPartition + ",");
unauthorizedTopicPartitions.clear();
throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString()));
private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
if (!unauthorizedTopics.isEmpty()) {
Set<String> topics = new HashSet<>(unauthorizedTopics);
unauthorizedTopics.clear();
throw new TopicAuthorizationException(topics);
}
}
/**
/**
* If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
*
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
@ -346,7 +345,7 @@ public class Fetcher<K, V> {
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
throwIfOffsetOutOfRange();
throwIfUnauthorized();
throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
for (PartitionRecords<K, V> part : this.records) {
@ -557,9 +556,9 @@ public class Fetcher<K, V> {
else
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp));
} else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) {
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
log.warn("Not authorized to read from topic {}.", tp.topic());
unauthorizedTopicPartitions.add(tp);
unauthorizedTopics.add(tp.topic());
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@ -473,21 +474,22 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null) {
if (metadata.fetch().partitionsForTopic(topic) != null)
return;
} else {
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@ -288,6 +289,8 @@ public class Sender implements Runnable {
error);
this.accumulator.reenqueue(batch, now);
this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
batch.done(baseOffset, new TopicAuthorizationException(batch.topicPartition.topic()));
} else {
// tell the user the result of their request
batch.done(baseOffset, error.exception());

View File

@ -23,6 +23,7 @@ import java.util.*;
public final class Cluster {
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
@ -34,26 +35,28 @@ public final class Cluster {
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
public Cluster(Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics) {
// make a randomized, unmodifiable copy of the nodes
List<Node> copy = new ArrayList<Node>(nodes);
List<Node> copy = new ArrayList<>(nodes);
Collections.shuffle(copy);
this.nodes = Collections.unmodifiableList(copy);
this.nodesById = new HashMap<Integer, Node>();
this.nodesById = new HashMap<>();
for (Node node: nodes)
this.nodesById.put(node.id(), node);
// index the partitions by topic/partition for quick lookup
this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
this.partitionsByTopicPartition = new HashMap<>(partitions.size());
for (PartitionInfo p : partitions)
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
// index the partitions by topic and node respectively, and make the lists
// unmodifiable so we can hand them out in user-facing apis without risk
// of the client modifying the contents
HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<>();
HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<>();
for (Node n : this.nodes) {
partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
}
@ -68,30 +71,31 @@ public final class Cluster {
psNode.add(p);
}
}
this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
this.partitionsByTopic = new HashMap<>(partsForTopic.size());
this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size());
for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
String topic = entry.getKey();
List<PartitionInfo> partitionList = entry.getValue();
this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
List<PartitionInfo> availablePartitions = new ArrayList<>();
for (PartitionInfo part : partitionList) {
if (part.leader() != null)
availablePartitions.add(part);
}
this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
}
this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
this.partitionsByNode = new HashMap<>(partsForNode.size());
for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
}
/**
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
}
/**
@ -104,7 +108,7 @@ public final class Cluster {
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
}
/**
@ -190,6 +194,10 @@ public final class Cluster {
return this.partitionsByTopic.keySet();
}
public Set<String> unauthorizedTopics() {
return unauthorizedTopics;
}
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";

View File

@ -13,7 +13,9 @@
package org.apache.kafka.common.errors;
public class AuthorizationException extends ApiException {
public AuthorizationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
public class GroupAuthorizationException extends AuthorizationException {
private final String groupId;
public GroupAuthorizationException(String groupId) {
super("Not authorized to access group: " + groupId);
this.groupId = groupId;
}
public String groupId() {
return groupId;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
import java.util.Collections;
import java.util.Set;
public class TopicAuthorizationException extends AuthorizationException {
private final Set<String> unauthorizedTopics;
public TopicAuthorizationException(Set<String> unauthorizedTopics) {
super("Not authorized to access topics: " + unauthorizedTopics);
this.unauthorizedTopics = unauthorizedTopics;
}
public TopicAuthorizationException(String unauthorizedTopic) {
this(Collections.singleton(unauthorizedTopic));
}
public Set<String> unauthorizedTopics() {
return unauthorizedTopics;
}
}

View File

@ -84,12 +84,16 @@ public enum Errors {
new UnknownMemberIdException("The coordinator is not aware of this member.")),
INVALID_SESSION_TIMEOUT(26,
new ApiException("The session timeout is not within an acceptable range.")),
REBALANCE_IN_PROGRESS(27,
new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")),
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid")),
AUTHORIZATION_FAILED(29,
new ApiException("Request is not authorized.")),
REBALANCE_IN_PROGRESS(30,
new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
TOPIC_AUTHORIZATION_FAILED(29,
new AuthorizationException("Topic authorization failed.")),
GROUP_AUTHORIZATION_FAILED(30,
new AuthorizationException("Group authorization failed.")),
CLUSTER_AUTHORIZATION_FAILED(31,
new AuthorizationException("Cluster authorization failed."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -26,6 +26,15 @@ public class GroupCoordinatorResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
private static final String COORDINATOR_KEY_NAME = "coordinator";
/**
* Possible error codes:
*
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* GROUP_AUTHORIZATION_FAILED (30)
*/
// coordinator level field names
private static final String NODE_ID_KEY_NAME = "node_id";
private static final String HOST_KEY_NAME = "host";

View File

@ -25,12 +25,14 @@ public class HeartbeatResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
* Possible error codes:
*
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (27)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;

View File

@ -29,7 +29,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
* Possible error codes:
*
* GROUP_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
@ -37,6 +37,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
* INCONSISTENT_GROUP_PROTOCOL (23)
* UNKNOWN_MEMBER_ID (25)
* INVALID_SESSION_TIMEOUT (26)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private static final String GENERATION_ID_KEY_NAME = "generation_id";

View File

@ -30,6 +30,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse {
* CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* UNKNOWN_CONSUMER_ID (25)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;

View File

@ -12,12 +12,6 @@
*/
package org.apache.kafka.common.requests;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@ -27,6 +21,13 @@ import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MetadataRequest extends AbstractRequest {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
@ -56,7 +57,8 @@ public class MetadataRequest extends AbstractRequest {
topicErrors.put(topic, Errors.forException(e));
}
Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
Cluster cluster = new Cluster(Collections.<Node>emptyList(), Collections.<PartitionInfo>emptyList(),
Collections.<String>emptySet());
switch (versionId) {
case 0:
return new MetadataResponse(cluster, topicErrors);

View File

@ -14,9 +14,12 @@ package org.apache.kafka.common.requests;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@ -159,8 +162,21 @@ public class MetadataResponse extends AbstractRequestResponse {
errors.put(topic, Errors.forCode(topicError));
}
}
this.errors = errors;
this.cluster = new Cluster(brokers.values(), partitions);
this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors));
}
private Set<String> unauthorizedTopics(Map<String, Errors> topicErrors) {
if (topicErrors.isEmpty())
return Collections.emptySet();
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<String, Errors> topicErrorEntry : topicErrors.entrySet()) {
if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED)
unauthorizedTopics.add(topicErrorEntry.getKey());
}
return unauthorizedTopics;
}
public Map<String, Errors> errors() {

View File

@ -39,7 +39,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
private static final String ERROR_CODE_KEY_NAME = "error_code";
/**
* Possible error code:
* Possible error codes:
*
* OFFSET_METADATA_TOO_LARGE (12)
* GROUP_LOAD_IN_PROGRESS (14)
@ -47,8 +47,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
* REBALANCE_IN_PROGRESS (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final Map<TopicPartition, Short> responseData;

View File

@ -45,13 +45,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
public static final String NO_METADATA = "";
/**
* Possible error code:
* Possible error codeS:
*
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* GROUP_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final Map<TopicPartition, PartitionData> responseData;

View File

@ -36,8 +36,8 @@ public class SyncGroupResponse extends AbstractRequestResponse {
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (30)
*
* REBALANCE_IN_PROGRESS (27)
* GROUP_AUTHORIZATION_FAILED (30)
*/
private final short errorCode;

View File

@ -128,7 +128,8 @@ public class MetadataTest {
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null))),
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet()),
100);
assertArrayEquals("Metadata got updated with wrong set of topics.",
@ -154,7 +155,8 @@ public class MetadataTest {
Arrays.asList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null))),
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet()),
100);
assertEquals("Listener did not update topics list correctly",
@ -179,7 +181,8 @@ public class MetadataTest {
Collections.singletonList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic", 0, null, null, null),
new PartitionInfo("topic1", 0, null, null, null))),
new PartitionInfo("topic1", 0, null, null, null)),
Collections.<String>emptySet()),
100);
metadata.removeListener(listener);
@ -188,7 +191,8 @@ public class MetadataTest {
Arrays.asList(new Node(0, "host1", 1000)),
Arrays.asList(
new PartitionInfo("topic2", 0, null, null, null),
new PartitionInfo("topic3", 0, null, null, null))),
new PartitionInfo("topic3", 0, null, null, null)),
Collections.<String>emptySet()),
100);
assertEquals("Listener did not update topics list correctly",

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -114,7 +115,6 @@ public class ConsumerCoordinatorTest {
"consumer" + groupId,
metricTags,
time,
requestTimeoutMs,
retryBackoffMs,
defaultOffsetCommitCallback,
autoCommitEnabled,
@ -144,6 +144,24 @@ public class ConsumerCoordinatorTest {
assertTrue(future.succeeded());
}
@Test(expected = GroupAuthorizationException.class)
public void testGroupDescribeUnauthorized() {
client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
coordinator.ensureCoordinatorKnown();
}
@Test(expected = GroupAuthorizationException.class)
public void testGroupReadUnauthorized() {
subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorKnown();
client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
Errors.GROUP_AUTHORIZATION_FAILED.code()));
coordinator.ensurePartitionAssignment();
}
@Test
public void testCoordinatorNotAvailable() {
client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@ -146,6 +147,23 @@ public class FetcherTest {
fetcher.fetchedRecords();
}
@Test
public void testUnauthorizedTopic() {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
// resize the limit of the buffer to pretend it is only fetch-size large
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
consumerClient.poll(0);
try {
fetcher.fetchedRecords();
fail("fetchedRecords should have thrown");
} catch (TopicAuthorizationException e) {
assertEquals(Collections.singleton(topicName), e.unauthorizedTopics());
}
}
@Test
public void testFetchDuringRebalance() {
subscriptions.subscribe(Arrays.asList(topicName), listener);

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@ -58,7 +59,7 @@ public class MockProducerTest {
public void testPartitioner() throws Exception {
PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1));
Cluster cluster = new Cluster(new ArrayList<Node>(0), asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet());
MockProducer<String, String> producer = new MockProducer<String, String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);

View File

@ -16,6 +16,7 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.producer.Partitioner;
@ -36,7 +37,7 @@ public class DefaultPartitionerTest {
private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes),
new PartitionInfo(topic, 2, node1, nodes, nodes),
new PartitionInfo(topic, 0, node0, nodes, nodes));
private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.<String>emptySet());
@Test
public void testKeyPartitionIsStable() {

View File

@ -63,7 +63,7 @@ public class RecordAccumulatorTest {
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3));
private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.<String>emptySet());
private Metrics metrics = new Metrics(time);
Map<String, String> metricTags = new LinkedHashMap<String, String>();
private final long maxBlockTimeMs = 1000;
@ -314,7 +314,7 @@ public class RecordAccumulatorTest {
accum.append(tp1, key, value, null, 0);
Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet());
now = time.milliseconds();
List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
assertEquals(1, expiredBatches.size());

View File

@ -254,7 +254,8 @@ public class RequestResponseTest {
replicas[0] = node;
Node[] isr = new Node[1];
isr[0] = node;
Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)));
Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)),
Collections.<String>emptySet());
Map<String, Errors> errors = new HashMap<String, Errors>();
errors.put("topic2", Errors.LEADER_NOT_AVAILABLE);

View File

@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@ -54,7 +55,7 @@ public class TestUtils {
List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
for (int i = 0; i < partitions; i++)
parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
return new Cluster(asList(ns), parts);
return new Cluster(asList(ns), parts, Collections.<String>emptySet());
}
/**

View File

@ -81,7 +81,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
metricGrpPrefix,
metricTags,
time,
requestTimeoutMs,
retryBackoffMs);
this.restUrl = restUrl;
this.configStorage = configStorage;

View File

@ -20,6 +20,17 @@ package kafka.common
* Exception thrown when a principal is not authorized to perform an operation.
* @param message
*/
class AuthorizationException(message: String) extends RuntimeException(message) {
abstract class AuthorizationException(message: String) extends RuntimeException(message) {
}
class TopicAuthorizationException(message: String) extends AuthorizationException(message) {
def this() = this(null)
}
class GroupAuthorizationException(message: String) extends AuthorizationException(message) {
def this() = this(null)
}
class ClusterAuthorizationException(message: String) extends AuthorizationException(message) {
def this() = this(null)
}

View File

@ -57,10 +57,11 @@ object ErrorMapping {
// 24: UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY
// 25: UNKNOWN_CONSUMER_ID
// 26: INVALID_SESSION_TIMEOUT
// 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
// 27: REBALANCE_IN_PROGRESS
// 28: INVALID_COMMIT_OFFSET_SIZE
val AuthorizationCode: Short = 29
// 30: REBALANCE_IN_PROGRESS
val TopicAuthorizationCode: Short = 29
val GroupAuthorizationCode: Short = 30
val ClusterAuthorizationCode: Short = 31
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -83,7 +84,9 @@ object ErrorMapping {
classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode,
classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode,
classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode,
classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode
classOf[TopicAuthorizationException].asInstanceOf[Class[Throwable]] -> TopicAuthorizationCode,
classOf[GroupAuthorizationException].asInstanceOf[Class[Throwable]] -> GroupAuthorizationCode,
classOf[ClusterAuthorizationException].asInstanceOf[Class[Throwable]] -> ClusterAuthorizationCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

View File

@ -16,25 +16,28 @@
*/
package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import kafka.common.{ErrorMapping, BaseEnum, KafkaException}
/**
* ResourceTypes.
*/
sealed trait ResourceType extends BaseEnum
sealed trait ResourceType extends BaseEnum { def errorCode: Short }
case object Cluster extends ResourceType {
val name = "Cluster"
val errorCode = ErrorMapping.ClusterAuthorizationCode
}
case object Topic extends ResourceType {
val name = "Topic"
val errorCode = ErrorMapping.TopicAuthorizationCode
}
case object Group extends ResourceType {
val name = "Group"
val errorCode = ErrorMapping.GroupAuthorizationCode
}

View File

@ -183,6 +183,14 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
// reject the request immediately if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode)
val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
return
}
// filter non-exist topics
val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
!metadataCache.contains(topicAndPartition.topic)
@ -191,13 +199,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition {
case (topicAndPartition, offsetMetadata) =>
authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) &&
authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))
authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode)
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
// we only print warnings for known errors here; only replica manager could see an unknown
@ -298,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
var errorInResponse = false
val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1))
val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
mergedResponseStatus.foreach { case (topicAndPartition, status) =>
// we only print warnings for known errors here; if it is unknown, it will cause
@ -379,7 +386,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty))
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty))
// the callback for sending a fetch response
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
@ -442,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil))
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.TopicAuthorizationCode, Nil))
val responseMap = authorizedRequestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
@ -614,7 +621,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode))
val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))
val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
val brokers = metadataCache.getAliveBrokers
@ -630,12 +637,19 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) &&
authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))
// reject the request immediately if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode)
val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
return
}
val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode)
val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
}
val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode)
val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
val response = if (offsetFetchRequest.versionId == 0) {
@ -659,9 +673,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition =>
(topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode)))
OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId)
OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId)
} else {
// version 1 reads offsets from Kafka;
val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
@ -683,7 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode)
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
@ -716,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val groups = describeRequest.groupIds().asScala.map {
case groupId =>
if (!authorize(request.session, Describe, new Resource(Group, groupId))) {
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED)
groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED)
} else {
val (error, summary) = coordinator.handleDescribeGroup(groupId)
val members = summary.members.map { member =>
@ -738,7 +750,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseHeader = new ResponseHeader(request.header.correlationId)
val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) {
ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED)
ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED)
} else {
val (error, groups) = coordinator.handleListGroups()
val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
@ -766,7 +778,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
val responseBody = new JoinGroupResponse(
ErrorMapping.AuthorizationCode,
ErrorMapping.GroupAuthorizationCode,
JoinGroupResponse.UNKNOWN_GENERATION_ID,
JoinGroupResponse.UNKNOWN_PROTOCOL,
JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@ -801,7 +813,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode)
sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode)
} else {
coordinator.handleSyncGroup(
syncGroupRequest.groupId(),
@ -826,7 +838,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode)
val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse)))
}
else {
@ -877,7 +889,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode)
val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode)
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse)))
} else {
// let the coordinator to handle leave-group
@ -897,7 +909,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new AuthorizationException(s"Request $request is not authorized.")
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
}
}

View File

@ -10,15 +10,14 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package integration.kafka.api
package kafka.api
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
import java.util.concurrent.ExecutionException
import java.util.{ArrayList, Properties}
import java.util.{ArrayList, Collections, Properties}
import kafka.api.RequestKeys
import kafka.cluster.EndPoint
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.coordinator.GroupCoordinator
@ -26,15 +25,13 @@ import kafka.integration.KafkaServerTestHarness
import kafka.security.auth._
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException}
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, requests}
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@ -76,42 +73,40 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null
val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
Map(RequestKeys.MetadataKey -> classOf[MetadataResponse],
RequestKeys.ProduceKey -> classOf[ProduceResponse],
RequestKeys.FetchKey -> classOf[FetchResponse],
RequestKeys.OffsetsKey -> classOf[ListOffsetResponse],
RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse],
RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse],
RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse],
RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse],
Map(RequestKeys.MetadataKey -> classOf[requests.MetadataResponse],
RequestKeys.ProduceKey -> classOf[requests.ProduceResponse],
RequestKeys.FetchKey -> classOf[requests.FetchResponse],
RequestKeys.OffsetsKey -> classOf[requests.ListOffsetResponse],
RequestKeys.OffsetCommitKey -> classOf[requests.OffsetCommitResponse],
RequestKeys.OffsetFetchKey -> classOf[requests.OffsetFetchResponse],
RequestKeys.GroupCoordinatorKey -> classOf[requests.GroupCoordinatorResponse],
RequestKeys.UpdateMetadataKey -> classOf[requests.UpdateMetadataResponse],
RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse],
RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse],
RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse],
RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse],
RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse]
RequestKeys.LeaderAndIsrKey -> classOf[requests.LeaderAndIsrResponse],
RequestKeys.StopReplicaKey -> classOf[requests.StopReplicaResponse],
RequestKeys.ControlledShutdownKey -> classOf[requests.ControlledShutdownResponse]
)
val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()),
RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()),
RequestKeys.MetadataKey -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
RequestKeys.ProduceKey -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.FetchKey -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetsKey -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.OffsetCommitKey -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
RequestKeys.OffsetFetchKey -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
RequestKeys.GroupCoordinatorKey -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
RequestKeys.UpdateMetadataKey -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()),
RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()),
RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode())
RequestKeys.LeaderAndIsrKey -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
RequestKeys.StopReplicaKey -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
RequestKeys.ControlledShutdownKey -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode())
)
val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
@ -155,41 +150,6 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
servers.head.consumerCoordinator.offsetsTopicConfigs)
// create the test topic with all the brokers as replicas
TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID, "consumer",
List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
//we have to get a join call so the group is created and we get back a memberId
addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
val socket = new Socket("localhost", servers.head.boundPort())
val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey, joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse]
val memberId = joinResponse.memberId()
//remove group acls
removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava),
RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava),
RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava),
RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava),
RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava),
RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group),
RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue,
Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava),
RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer",
List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava),
RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId -> ByteBuffer.wrap("test".getBytes())).asJava),
RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000, Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava),
RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId),
RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId),
RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue,
Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava),
RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava),
RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId)
)
}
@After
@ -198,108 +158,346 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
super.tearDown()
}
private def createMetadataRequest = {
new requests.MetadataRequest(List(topic).asJava)
}
private def createProduceRequest = {
new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava)
}
private def createFetchRequest = {
new requests.FetchRequest(5000, 100, Map(tp -> new requests.FetchRequest.PartitionData(0, 100)).asJava)
}
private def createListOffsetsRequest = {
new requests.ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava)
}
private def createOffsetFetchRequest = {
new requests.OffsetFetchRequest(group, List(tp).asJava)
}
private def createGroupCoordinatorRequest = {
new requests.GroupCoordinatorRequest(group)
}
private def createUpdateMetadataRequest = {
val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava
new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
}
private def createJoinGroupRequest = {
new JoinGroupRequest(group, 30000, "", "consumer",
List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
}
private def createSyncGroupRequest = {
new SyncGroupRequest(group, 1, "", Map[String, ByteBuffer]().asJava)
}
private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest(group, 1, "", 1000, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava)
}
private def createHeartbeatRequest = {
new HeartbeatRequest(group, 1, "")
}
private def createLeaveGroupRequest = {
new LeaveGroupRequest(group, "")
}
private def createLeaderAndIsrRequest = {
new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava)
}
private def createStopReplicaRequest = {
new requests.StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava)
}
private def createControlledShutdownRequest = {
new requests.ControlledShutdownRequest(brokerId)
}
@Test
def testAuthorization() {
val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
RequestKeys.MetadataKey -> createMetadataRequest,
RequestKeys.ProduceKey -> createProduceRequest,
RequestKeys.FetchKey -> createFetchRequest,
RequestKeys.OffsetsKey -> createListOffsetsRequest,
RequestKeys.OffsetFetchKey -> createOffsetFetchRequest,
RequestKeys.GroupCoordinatorKey -> createGroupCoordinatorRequest,
RequestKeys.UpdateMetadataKey -> createUpdateMetadataRequest,
RequestKeys.JoinGroupKey -> createJoinGroupRequest,
RequestKeys.SyncGroupKey -> createSyncGroupRequest,
RequestKeys.OffsetCommitKey -> createOffsetCommitRequest,
RequestKeys.HeartbeatKey -> createHeartbeatRequest,
RequestKeys.LeaveGroupKey -> createLeaveGroupRequest,
RequestKeys.LeaderAndIsrKey -> createLeaderAndIsrRequest,
RequestKeys.StopReplicaKey -> createStopReplicaRequest,
RequestKeys.ControlledShutdownKey -> createControlledShutdownRequest
)
val socket = new Socket("localhost", servers.head.boundPort())
for ((key, request) <- RequestKeyToRequest) {
for ((key, request) <- requestKeyToRequest) {
removeAllAcls
sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode)
val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet
sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false)
for ((resource, acls) <- RequestKeysToAcls(key))
addAndVerifyAcls(acls, resource)
sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError)
sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true)
}
}
@Test
def testProduceNeedsAuthorization() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
try {
sendRecords(numRecords, tp)
Assert.fail("should have thrown exception")
} catch {
case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage, e.getMessage)
}
}
@Test
def testOnlyWritePermissionAllowsWritingToProducer() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
@Test
def testProduceWithNoTopicAccess() {
try {
sendRecords(numRecords, tp)
fail("sendRecords should have thrown")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
}
@Test
def testCreatePermissionNeededForWritingToNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
try {
sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception")
} catch {
case e: TimeoutException =>
//TODO Need to update the producer so it actually throws the server side of exception.
case e: Exception => Assert.fail(s"Only timeout exception should be thrown but $e thrown")
}
@Test
def testProduceWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
try {
sendRecords(numRecords, tp)
fail("sendRecords should have thrown")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
}
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create),
new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource)
@Test
def testProduceWithTopicRead() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
sendRecords(numRecords, tp)
fail("sendRecords should have thrown")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics())
}
}
@Test
def testProduceWithTopicWrite() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(numRecords, tp)
}
@Test
def testCreatePermissionNeededForWritingToNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic))
try {
sendRecords(numRecords, topicPartition)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics())
}
@Test
def testConsumerNeedsAuthorization() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
//TODO: Ideally we would want to test that when consumerGroup permission is not present we still get an AuthorizationException
//but the consumer fetcher currently waits forever for the consumer metadata to become available.
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
sendRecords(1, tp)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: AuthorizationException => Assert.assertEquals("Not authorized to read from topic-0", e.getMessage)
}
}
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
sendRecords(numRecords, topicPartition)
}
@Test
def testAllowingReadOnTopicAndGroupAllowsReading() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
sendRecords(1, tp)
@Test(expected = classOf[AuthorizationException])
def testConsumeWithNoAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
}
@Test
def testConsumeWithNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: GroupAuthorizationException => assertEquals(group, e.groupId())
}
}
@Test
def testConsumeWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
}
}
@Test
def testConsumeWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
}
}
@Test
def testConsumeWithTopicWrite() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
try {
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(topic), e.unauthorizedTopics());
}
}
@Test
def testConsumeWithTopicAndGroupRead() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
}
@Test
def testCreatePermissionNeededToReadFromNonExistentTopic() {
val newTopic = "newTopic"
val topicPartition = new TopicPartition(newTopic, 0)
val newTopicResource = new Resource(Topic, newTopic)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
try {
this.consumers(0).assign(List(topicPartition).asJava)
consumeRecords(this.consumers(0))
Assert.fail("should have thrown exception")
} catch {
case e: TopicAuthorizationException =>
assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics());
}
// TODO: The following test goes into an infinite loop as consumer waits for consumer metadata to be propogated for ever.
// @Test
// def testCreatePermissionNeededToReadFromNonExistentTopic() {
// val newTopic = "newTopic"
// val topicPartition = new TopicPartition(newTopic, 0)
// val newTopicResource = new Resource(Topic, newTopic)
// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource)
// addAndVerifyAcls(GroupReadAcl(groupResource), groupResource)
// addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource)
// try {
// this.consumers(0).assign(List(topicPartition).asJava)
// consumeRecords(this.consumers(0))
// Assert.fail("should have thrown exception")
// } catch {
// //checking for the message and type to ensure whenever these things are fixed on client side the test starts failing.
// case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.")
// }
//
// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
//
// sendRecords(numRecords, topicPartition)
// consumeRecords(this.consumers(0))
// }
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource)
sendRecords(numRecords, topicPartition)
consumeRecords(this.consumers(0), topic = newTopic, part = 0)
}
@Test(expected = classOf[AuthorizationException])
def testCommitWithNoAccess() {
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[TopicAuthorizationException])
def testCommitWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[TopicAuthorizationException])
def testCommitWithTopicWrite() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[TopicAuthorizationException])
def testCommitWithTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[GroupAuthorizationException])
def testCommitWithNoGroupAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test
def testCommitWithTopicAndGroupRead() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)
}
@Test(expected = classOf[AuthorizationException])
def testOffsetFetchWithNoAccess() {
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test(expected = classOf[GroupAuthorizationException])
def testOffsetFetchWithNoGroupAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test(expected = classOf[TopicAuthorizationException])
def testOffsetFetchWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test
def testOffsetFetchTopicDescribe() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
@Test
def testOffsetFetchWithTopicAndGroupRead() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
this.consumers.head.position(tp)
}
def removeAllAcls() = {
servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
@ -308,7 +506,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
}
}
def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest, expectedErrorCode: Short): AbstractRequestResponse = {
def sendRequestAndVerifyResponseErrorCode(socket: Socket,
key: Short,
request: AbstractRequest,
resources: Set[ResourceType],
isAuthorized: Boolean): AbstractRequestResponse = {
val header = new RequestHeader(key, "client", 1)
val body = request.toStruct
@ -323,7 +525,14 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
ResponseHeader.parse(resp)
val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse]
Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response))
val errorCode = RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response)
val possibleErrorCodes = resources.map(_.errorCode)
if (isAuthorized)
assertFalse(s"${ApiKeys.forId(key)} should be allowed", possibleErrorCodes.contains(errorCode))
else
assertTrue(s"${ApiKeys.forId(key)} should be forbidden", possibleErrorCodes.contains(errorCode))
response
}
@ -364,8 +573,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
}
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int =
0) {
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,
topic: String = topic,
part: Int = part) {
val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
val maxIters = numRecords * 50
var iters = 0

View File

@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -42,7 +43,7 @@ public class DefaultPartitionGrouperTest {
new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
@Test
public void testGrouping() {

View File

@ -77,7 +77,7 @@ public class StreamThreadTest {
new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos);
private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));