diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index f2fca12e09d..7e77c18cfcb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -238,14 +238,18 @@ public final class Metadata { } private Cluster getClusterForCurrentTopics(Cluster cluster) { + Set unauthorizedTopics = new HashSet<>(); Collection partitionInfos = new ArrayList<>(); List nodes = Collections.emptyList(); if (cluster != null) { + unauthorizedTopics.addAll(cluster.unauthorizedTopics()); + unauthorizedTopics.retainAll(this.topics); + for (String topic : this.topics) { partitionInfos.addAll(cluster.partitionsForTopic(topic)); } nodes = cluster.nodes(); } - return new Cluster(nodes, partitionInfos); + return new Cluster(nodes, partitionInfos, unauthorizedTopics); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index cce13dd17d7..a6be519cc01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -544,7 +544,6 @@ public class KafkaConsumer implements Consumer { metricGrpPrefix, metricsTags, this.time, - requestTimeoutMs, retryBackoffMs, new ConsumerCoordinator.DefaultOffsetCommitCallback(), config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), @@ -777,10 +776,11 @@ public class KafkaConsumer implements Consumer { * @throws NoOffsetForPartitionException if there is no stored offset for a subscribed partition and no automatic * offset reset policy has been configured. * @throws org.apache.kafka.common.errors.OffsetOutOfRangeException if there is OffsetOutOfRange error in fetchResponse and - * the defaultResetPolicy is NONE - * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called - * - * @throws org.apache.kafka.common.errors.AuthorizationException if caller does not have Read permission on topic. + * the defaultResetPolicy is NONE + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.AuthorizationException if caller does Read access to any of the subscribed + * topics or to the configured groupId */ @Override public ConsumerRecords poll(long timeout) { @@ -883,7 +883,10 @@ public class KafkaConsumer implements Consumer { * encountered (in which case it is thrown to the caller). * * @param offsets A map of offsets by partition with associated metadata - * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId */ @Override public void commitSync(final Map offsets) { @@ -1008,7 +1011,11 @@ public class KafkaConsumer implements Consumer { * @return The offset * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is * available. - * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called + * + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId */ public long position(TopicPartition partition) { acquire(); @@ -1035,7 +1042,10 @@ public class KafkaConsumer implements Consumer { * * @param partition The partition to check * @return The last committed offset and metadata or null if there was no prior commit - * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this function is called + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId */ @Override public OffsetAndMetadata committed(TopicPartition partition) { @@ -1160,7 +1170,7 @@ public class KafkaConsumer implements Consumer { /** * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. - * The thread which is blocking in an operation will throw {@link WakeupException}. + * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}. */ @Override public void wakeup() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 9cf825c91e8..781ff78a4e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -17,6 +17,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -88,7 +89,6 @@ public abstract class AbstractCoordinator { protected final ConsumerNetworkClient client; protected final Time time; protected final long retryBackoffMs; - protected final long requestTimeoutMs; private boolean needsJoinPrepare = true; private boolean rejoinNeeded = true; @@ -108,7 +108,6 @@ public abstract class AbstractCoordinator { String metricGrpPrefix, Map metricTags, Time time, - long requestTimeoutMs, long retryBackoffMs) { this.client = client; this.time = time; @@ -120,7 +119,6 @@ public abstract class AbstractCoordinator { this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds()); this.heartbeatTask = new HeartbeatTask(); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); - this.requestTimeoutMs = requestTimeoutMs; this.retryBackoffMs = retryBackoffMs; } @@ -178,7 +176,7 @@ public abstract class AbstractCoordinator { public void ensureCoordinatorKnown() { while (coordinatorUnknown()) { RequestFuture future = sendGroupMetadataRequest(); - client.poll(future, requestTimeoutMs); + client.poll(future); if (future.failed()) { if (future.isRetriable()) @@ -376,6 +374,8 @@ public abstract class AbstractCoordinator { log.error("Attempt to join group {} failed due to: {}", groupId, error.exception().getMessage()); future.raise(error); + } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception future.raise(new KafkaException("Unexpected error in join group response: " @@ -427,6 +427,8 @@ public abstract class AbstractCoordinator { if (errorCode == Errors.NONE.code()) { future.complete(syncResponse.memberAssignment()); sensors.syncLatency.record(response.requestLatencyMs()); + } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + future.raise(new GroupAuthorizationException(groupId)); } else { AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.forCode(errorCode)); @@ -476,7 +478,8 @@ public abstract class AbstractCoordinator { // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 - if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) { + short errorCode = groupCoordinatorResponse.errorCode(); + if (errorCode == Errors.NONE.code()) { this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port()); @@ -487,8 +490,10 @@ public abstract class AbstractCoordinator { if (generation > 0) heartbeatTask.reset(); future.complete(null); + } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(Errors.forCode(groupCoordinatorResponse.errorCode())); + future.raise(Errors.forCode(errorCode)); } } } @@ -538,31 +543,33 @@ public abstract class AbstractCoordinator { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); - short error = heartbeatResponse.errorCode(); - if (error == Errors.NONE.code()) { + short errorCode = heartbeatResponse.errorCode(); + if (errorCode == Errors.NONE.code()) { log.debug("Received successful heartbeat response."); future.complete(null); - } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() - || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); coordinatorDead(); - future.raise(Errors.forCode(error)); - } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) { + future.raise(Errors.forCode(errorCode)); + } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); - } else if (error == Errors.ILLEGAL_GENERATION.code()) { + } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) { log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); - } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) { + } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); + } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(new KafkaException("Unexpected error in heartbeat response: " - + Errors.forCode(error).exception().getMessage())); + future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " + + Errors.forCode(errorCode).exception().getMessage())); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5b5d6ff4809..c7323cb2f8e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -15,6 +15,8 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.errors.GroupAuthorizationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -82,7 +84,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl String metricGrpPrefix, Map metricTags, Time time, - long requestTimeoutMs, long retryBackoffMs, OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, @@ -95,7 +96,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl metricGrpPrefix, metricTags, time, - requestTimeoutMs, retryBackoffMs); this.metadata = metadata; @@ -136,6 +136,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl this.metadata.addListener(new Metadata.Listener() { @Override public void onMetadataUpdate(Cluster cluster) { + // if we encounter any unauthorized topics, raise an exception to the user + if (!cluster.unauthorizedTopics().isEmpty()) + throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); + if (subscriptions.hasPatternSubscription()) { final List topicsToSubscribe = new ArrayList<>(); @@ -340,13 +344,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl RequestFuture future = sendOffsetCommitRequest(offsets); client.poll(future); - if (future.succeeded()) { + if (future.succeeded()) return; - } - if (!future.isRetriable()) { + if (!future.isRetriable()) throw future.exception(); - } Utils.sleep(retryBackoffMs); } @@ -439,6 +441,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture future) { sensors.commitLatency.record(response.requestLatencyMs()); + Set unauthorizedTopics = new HashSet<>(); + for (Map.Entry entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); @@ -450,6 +454,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); + } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + future.raise(new GroupAuthorizationException(groupId)); + return; + } else if (errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { + unauthorizedTopics.add(tp.topic()); } else { if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { // just retry @@ -458,7 +467,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { coordinatorDead(); } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code() - || errorCode == Errors.ILLEGAL_GENERATION.code()) { + || errorCode == Errors.ILLEGAL_GENERATION.code() + || errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { // need to re-join group subscriptions.needReassignment(); } @@ -473,7 +483,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl } } - future.complete(null); + if (!unauthorizedTopics.isEmpty()) + future.raise(new TopicAuthorizationException(unauthorizedTopics)); + else + future.complete(null); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 4e0d5ec5729..7c5bca6828a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -78,7 +78,7 @@ public class Fetcher { private final Deserializer valueDeserializer; private final Map offsetOutOfRangePartitions; - private final Set unauthorizedTopicPartitions; + private final Set unauthorizedTopics; private final Map recordTooLargePartitions; public Fetcher(ConsumerNetworkClient client, @@ -110,7 +110,7 @@ public class Fetcher { this.records = new LinkedList>(); this.offsetOutOfRangePartitions = new HashMap<>(); - this.unauthorizedTopicPartitions = new HashSet<>(); + this.unauthorizedTopics = new HashSet<>(); this.recordTooLargePartitions = new HashMap<>(); this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags); @@ -302,19 +302,18 @@ public class Fetcher { } /** - * If any topic from previous fetchResponse contatains Authorization error, throw ApiException. - * @throws ApiException + * If any topic from previous fetchResponse contains an Authorization error, raise an exception + * @throws TopicAuthorizationException */ - private void throwIfUnauthorized() throws ApiException { - if (!unauthorizedTopicPartitions.isEmpty()) { - StringBuilder sb = new StringBuilder(); - for (TopicPartition topicPartition : unauthorizedTopicPartitions) - sb.append(topicPartition + ","); - unauthorizedTopicPartitions.clear(); - throw new AuthorizationException(String.format("Not authorized to read from %s", sb.substring(0, sb.length() - 1).toString())); + private void throwIfUnauthorizedTopics() throws TopicAuthorizationException { + if (!unauthorizedTopics.isEmpty()) { + Set topics = new HashSet<>(unauthorizedTopics); + unauthorizedTopics.clear(); + throw new TopicAuthorizationException(topics); } } - /** + + /** * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException * * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned @@ -346,7 +345,7 @@ public class Fetcher { } else { Map>> drained = new HashMap<>(); throwIfOffsetOutOfRange(); - throwIfUnauthorized(); + throwIfUnauthorizedTopics(); throwIfRecordTooLarge(); for (PartitionRecords part : this.records) { @@ -557,9 +556,9 @@ public class Fetcher { else this.offsetOutOfRangePartitions.put(tp, fetchOffset); log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp)); - } else if (partition.errorCode == Errors.AUTHORIZATION_FAILED.code()) { + } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { log.warn("Not authorized to read from topic {}.", tp.topic()); - unauthorizedTopicPartitions.add(tp); + unauthorizedTopics.add(tp.topic()); } else if (partition.errorCode == Errors.UNKNOWN.code()) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index ff3bfe6f45b..21d635b1b32 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -473,21 +474,22 @@ public class KafkaProducer implements Producer { if (!this.metadata.containsTopic(topic)) this.metadata.add(topic); - if (metadata.fetch().partitionsForTopic(topic) != null) { + if (metadata.fetch().partitionsForTopic(topic) != null) return; - } else { - long begin = time.milliseconds(); - long remainingWaitMs = maxWaitMs; - while (metadata.fetch().partitionsForTopic(topic) == null) { - log.trace("Requesting metadata update for topic {}.", topic); - int version = metadata.requestUpdate(); - sender.wakeup(); - metadata.awaitUpdate(version, remainingWaitMs); - long elapsed = time.milliseconds() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } + + long begin = time.milliseconds(); + long remainingWaitMs = maxWaitMs; + while (metadata.fetch().partitionsForTopic(topic) == null) { + log.trace("Requesting metadata update for topic {}.", topic); + int version = metadata.requestUpdate(); + sender.wakeup(); + metadata.awaitUpdate(version, remainingWaitMs); + long elapsed = time.milliseconds() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + if (metadata.fetch().unauthorizedTopics().contains(topic)) + throw new TopicAuthorizationException(topic); + remainingWaitMs = maxWaitMs - elapsed; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 56be021a792..cada6266d57 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.MetricName; @@ -288,6 +289,8 @@ public class Sender implements Runnable { error); this.accumulator.reenqueue(batch, now); this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount); + } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { + batch.done(baseOffset, new TopicAuthorizationException(batch.topicPartition.topic())); } else { // tell the user the result of their request batch.done(baseOffset, error.exception()); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index e6a2e434935..c7f3e075a53 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -23,6 +23,7 @@ import java.util.*; public final class Cluster { private final List nodes; + private final Set unauthorizedTopics; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; private final Map> availablePartitionsByTopic; @@ -34,26 +35,28 @@ public final class Cluster { * @param nodes The nodes in the cluster * @param partitions Information about a subset of the topic-partitions this cluster hosts */ - public Cluster(Collection nodes, Collection partitions) { + public Cluster(Collection nodes, + Collection partitions, + Set unauthorizedTopics) { // make a randomized, unmodifiable copy of the nodes - List copy = new ArrayList(nodes); + List copy = new ArrayList<>(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); - this.nodesById = new HashMap(); + this.nodesById = new HashMap<>(); for (Node node: nodes) this.nodesById.put(node.id(), node); // index the partitions by topic/partition for quick lookup - this.partitionsByTopicPartition = new HashMap(partitions.size()); + this.partitionsByTopicPartition = new HashMap<>(partitions.size()); for (PartitionInfo p : partitions) this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p); // index the partitions by topic and node respectively, and make the lists // unmodifiable so we can hand them out in user-facing apis without risk // of the client modifying the contents - HashMap> partsForTopic = new HashMap>(); - HashMap> partsForNode = new HashMap>(); + HashMap> partsForTopic = new HashMap<>(); + HashMap> partsForNode = new HashMap<>(); for (Node n : this.nodes) { partsForNode.put(n.id(), new ArrayList()); } @@ -68,30 +71,31 @@ public final class Cluster { psNode.add(p); } } - this.partitionsByTopic = new HashMap>(partsForTopic.size()); - this.availablePartitionsByTopic = new HashMap>(partsForTopic.size()); + this.partitionsByTopic = new HashMap<>(partsForTopic.size()); + this.availablePartitionsByTopic = new HashMap<>(partsForTopic.size()); for (Map.Entry> entry : partsForTopic.entrySet()) { String topic = entry.getKey(); List partitionList = entry.getValue(); this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList)); - List availablePartitions = new ArrayList(); + List availablePartitions = new ArrayList<>(); for (PartitionInfo part : partitionList) { if (part.leader() != null) availablePartitions.add(part); } this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions)); } - this.partitionsByNode = new HashMap>(partsForNode.size()); + this.partitionsByNode = new HashMap<>(partsForNode.size()); for (Map.Entry> entry : partsForNode.entrySet()) this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics); } /** * Create an empty cluster instance with no nodes and no topic-partitions. */ public static Cluster empty() { - return new Cluster(new ArrayList(0), new ArrayList(0)); + return new Cluster(new ArrayList(0), new ArrayList(0), Collections.emptySet()); } /** @@ -104,7 +108,7 @@ public final class Cluster { int nodeId = -1; for (InetSocketAddress address : addresses) nodes.add(new Node(nodeId--, address.getHostName(), address.getPort())); - return new Cluster(nodes, new ArrayList(0)); + return new Cluster(nodes, new ArrayList(0), Collections.emptySet()); } /** @@ -190,6 +194,10 @@ public final class Cluster { return this.partitionsByTopic.keySet(); } + public Set unauthorizedTopics() { + return unauthorizedTopics; + } + @Override public String toString() { return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")"; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java index 2a01e5e7af1..7fc932ddde7 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthorizationException.java @@ -13,7 +13,9 @@ package org.apache.kafka.common.errors; public class AuthorizationException extends ApiException { + public AuthorizationException(String message) { super(message); } + } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java new file mode 100644 index 00000000000..3a767aa5438 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class GroupAuthorizationException extends AuthorizationException { + private final String groupId; + + public GroupAuthorizationException(String groupId) { + super("Not authorized to access group: " + groupId); + this.groupId = groupId; + } + + public String groupId() { + return groupId; + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java new file mode 100644 index 00000000000..b5d33b9e3de --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import java.util.Collections; +import java.util.Set; + +public class TopicAuthorizationException extends AuthorizationException { + private final Set unauthorizedTopics; + + public TopicAuthorizationException(Set unauthorizedTopics) { + super("Not authorized to access topics: " + unauthorizedTopics); + this.unauthorizedTopics = unauthorizedTopics; + } + + public TopicAuthorizationException(String unauthorizedTopic) { + this(Collections.singleton(unauthorizedTopic)); + } + + public Set unauthorizedTopics() { + return unauthorizedTopics; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index d4eb1f98c32..516e50b2601 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -84,12 +84,16 @@ public enum Errors { new UnknownMemberIdException("The coordinator is not aware of this member.")), INVALID_SESSION_TIMEOUT(26, new ApiException("The session timeout is not within an acceptable range.")), + REBALANCE_IN_PROGRESS(27, + new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")), INVALID_COMMIT_OFFSET_SIZE(28, new ApiException("The committing offset data size is not valid")), - AUTHORIZATION_FAILED(29, - new ApiException("Request is not authorized.")), - REBALANCE_IN_PROGRESS(30, - new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")); + TOPIC_AUTHORIZATION_FAILED(29, + new AuthorizationException("Topic authorization failed.")), + GROUP_AUTHORIZATION_FAILED(30, + new AuthorizationException("Group authorization failed.")), + CLUSTER_AUTHORIZATION_FAILED(31, + new AuthorizationException("Cluster authorization failed.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java index c28de70bdf4..3fe014df3fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java @@ -26,6 +26,15 @@ public class GroupCoordinatorResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String COORDINATOR_KEY_NAME = "coordinator"; + /** + * Possible error codes: + * + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) + * GROUP_AUTHORIZATION_FAILED (30) + */ + + // coordinator level field names private static final String NODE_ID_KEY_NAME = "node_id"; private static final String HOST_KEY_NAME = "host"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index 48cb4c0f016..a595efb26f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -25,12 +25,14 @@ public class HeartbeatResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; /** - * Possible error code: + * Possible error codes: * * GROUP_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) * UNKNOWN_MEMBER_ID (25) + * REBALANCE_IN_PROGRESS (27) + * GROUP_AUTHORIZATION_FAILED (30) */ private final short errorCode; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 0615e5e60d2..c869b1edf7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -29,7 +29,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; /** - * Possible error code: + * Possible error codes: * * GROUP_LOAD_IN_PROGRESS (14) * GROUP_COORDINATOR_NOT_AVAILABLE (15) @@ -37,6 +37,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { * INCONSISTENT_GROUP_PROTOCOL (23) * UNKNOWN_MEMBER_ID (25) * INVALID_SESSION_TIMEOUT (26) + * GROUP_AUTHORIZATION_FAILED (30) */ private static final String GENERATION_ID_KEY_NAME = "generation_id"; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index 278e3e8e249..e0ca11770e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -30,6 +30,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse { * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_CONSUMER (16) * UNKNOWN_CONSUMER_ID (25) + * GROUP_AUTHORIZATION_FAILED (30) */ private final short errorCode; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index f70e8dac70c..cc1771ba7d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -12,12 +12,6 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -27,6 +21,13 @@ import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class MetadataRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); @@ -56,7 +57,8 @@ public class MetadataRequest extends AbstractRequest { topicErrors.put(topic, Errors.forException(e)); } - Cluster cluster = new Cluster(new ArrayList(), new ArrayList()); + Cluster cluster = new Cluster(Collections.emptyList(), Collections.emptyList(), + Collections.emptySet()); switch (versionId) { case 0: return new MetadataResponse(cluster, topicErrors); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index c8f2d087dfc..eb163dd5a7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -14,9 +14,12 @@ package org.apache.kafka.common.requests; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -159,8 +162,21 @@ public class MetadataResponse extends AbstractRequestResponse { errors.put(topic, Errors.forCode(topicError)); } } + this.errors = errors; - this.cluster = new Cluster(brokers.values(), partitions); + this.cluster = new Cluster(brokers.values(), partitions, unauthorizedTopics(errors)); + } + + private Set unauthorizedTopics(Map topicErrors) { + if (topicErrors.isEmpty()) + return Collections.emptySet(); + + Set unauthorizedTopics = new HashSet<>(); + for (Map.Entry topicErrorEntry : topicErrors.entrySet()) { + if (topicErrorEntry.getValue() == Errors.TOPIC_AUTHORIZATION_FAILED) + unauthorizedTopics.add(topicErrorEntry.getKey()); + } + return unauthorizedTopics; } public Map errors() { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index baea6f9ef0c..4d10a918690 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -39,7 +39,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse { private static final String ERROR_CODE_KEY_NAME = "error_code"; /** - * Possible error code: + * Possible error codes: * * OFFSET_METADATA_TOO_LARGE (12) * GROUP_LOAD_IN_PROGRESS (14) @@ -47,8 +47,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse { * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) * UNKNOWN_MEMBER_ID (25) - * COMMITTING_PARTITIONS_NOT_ASSIGNED (27) + * REBALANCE_IN_PROGRESS (27) * INVALID_COMMIT_OFFSET_SIZE (28) + * TOPIC_AUTHORIZATION_FAILED (29) + * GROUP_AUTHORIZATION_FAILED (30) */ private final Map responseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index afc5618911d..88d68eae002 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -45,13 +45,15 @@ public class OffsetFetchResponse extends AbstractRequestResponse { public static final String NO_METADATA = ""; /** - * Possible error code: + * Possible error codeS: * * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 * GROUP_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) * UNKNOWN_MEMBER_ID (25) + * TOPIC_AUTHORIZATION_FAILED (29) + * GROUP_AUTHORIZATION_FAILED (30) */ private final Map responseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index 0eb92f2e2ea..7256bd247bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -36,8 +36,8 @@ public class SyncGroupResponse extends AbstractRequestResponse { * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) * UNKNOWN_MEMBER_ID (25) - * REBALANCE_IN_PROGRESS (30) - * + * REBALANCE_IN_PROGRESS (27) + * GROUP_AUTHORIZATION_FAILED (30) */ private final short errorCode; diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 55d76084cf5..bd3bbe80589 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -128,7 +128,8 @@ public class MetadataTest { Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList( new PartitionInfo("topic", 0, null, null, null), - new PartitionInfo("topic1", 0, null, null, null))), + new PartitionInfo("topic1", 0, null, null, null)), + Collections.emptySet()), 100); assertArrayEquals("Metadata got updated with wrong set of topics.", @@ -154,7 +155,8 @@ public class MetadataTest { Arrays.asList(new Node(0, "host1", 1000)), Arrays.asList( new PartitionInfo("topic", 0, null, null, null), - new PartitionInfo("topic1", 0, null, null, null))), + new PartitionInfo("topic1", 0, null, null, null)), + Collections.emptySet()), 100); assertEquals("Listener did not update topics list correctly", @@ -179,7 +181,8 @@ public class MetadataTest { Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList( new PartitionInfo("topic", 0, null, null, null), - new PartitionInfo("topic1", 0, null, null, null))), + new PartitionInfo("topic1", 0, null, null, null)), + Collections.emptySet()), 100); metadata.removeListener(listener); @@ -188,7 +191,8 @@ public class MetadataTest { Arrays.asList(new Node(0, "host1", 1000)), Arrays.asList( new PartitionInfo("topic2", 0, null, null, null), - new PartitionInfo("topic3", 0, null, null, null))), + new PartitionInfo("topic3", 0, null, null, null)), + Collections.emptySet()), 100); assertEquals("Listener did not update topics list correctly", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 8667f2277bb..7fd6d88720a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -114,7 +115,6 @@ public class ConsumerCoordinatorTest { "consumer" + groupId, metricTags, time, - requestTimeoutMs, retryBackoffMs, defaultOffsetCommitCallback, autoCommitEnabled, @@ -144,6 +144,24 @@ public class ConsumerCoordinatorTest { assertTrue(future.succeeded()); } + @Test(expected = GroupAuthorizationException.class) + public void testGroupDescribeUnauthorized() { + client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code())); + coordinator.ensureCoordinatorKnown(); + } + + @Test(expected = GroupAuthorizationException.class) + public void testGroupReadUnauthorized() { + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.>emptyMap(), + Errors.GROUP_AUTHORIZATION_FAILED.code())); + coordinator.ensurePartitionAssignment(); + } + @Test public void testCoordinatorNotAvailable() { client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 1e7a21555d6..87118309d42 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -146,6 +147,23 @@ public class FetcherTest { fetcher.fetchedRecords(); } + @Test + public void testUnauthorizedTopic() { + subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.seek(tp, 0); + + // resize the limit of the buffer to pretend it is only fetch-size large + fetcher.initFetches(cluster); + client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0)); + consumerClient.poll(0); + try { + fetcher.fetchedRecords(); + fail("fetchedRecords should have thrown"); + } catch (TopicAuthorizationException e) { + assertEquals(Collections.singleton(topicName), e.unauthorizedTopics()); + } + } + @Test public void testFetchDuringRebalance() { subscriptions.subscribe(Arrays.asList(topicName), listener); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 7a46c561b70..0a0bdd8e42f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -58,7 +59,7 @@ public class MockProducerTest { public void testPartitioner() throws Exception { PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, null); PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null); - Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1)); + Cluster cluster = new Cluster(new ArrayList(0), asList(partitionInfo0, partitionInfo1), Collections.emptySet()); MockProducer producer = new MockProducer(cluster, true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer()); ProducerRecord record = new ProducerRecord(topic, "key", "value"); Future metadata = producer.send(record); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java index 977fa933d83..7a5cef6886a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java @@ -16,6 +16,7 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.Collections; import java.util.List; import org.apache.kafka.clients.producer.Partitioner; @@ -36,7 +37,7 @@ public class DefaultPartitionerTest { private List partitions = asList(new PartitionInfo(topic, 1, null, nodes, nodes), new PartitionInfo(topic, 2, node1, nodes, nodes), new PartitionInfo(topic, 0, node0, nodes, nodes)); - private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions); + private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions, Collections.emptySet()); @Test public void testKeyPartitionIsStable() { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 132b83b1927..4674a9140e7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -63,7 +63,7 @@ public class RecordAccumulatorTest { private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), Collections.emptySet()); private Metrics metrics = new Metrics(time); Map metricTags = new LinkedHashMap(); private final long maxBlockTimeMs = 1000; @@ -314,7 +314,7 @@ public class RecordAccumulatorTest { accum.append(tp1, key, value, null, 0); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Cluster cluster = new Cluster(new ArrayList(), new ArrayList()); + Cluster cluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); now = time.milliseconds(); List expiredBatches = accum.abortExpiredBatches(60, cluster, now); assertEquals(1, expiredBatches.size()); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 761b9dbb218..5ee11d2f1fe 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -254,7 +254,8 @@ public class RequestResponseTest { replicas[0] = node; Node[] isr = new Node[1]; isr[0] = node; - Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr))); + Cluster cluster = new Cluster(Arrays.asList(node), Arrays.asList(new PartitionInfo("topic1", 1, node, replicas, isr)), + Collections.emptySet()); Map errors = new HashMap(); errors.put("topic2", Errors.LEADER_NOT_AVAILABLE); diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index ccf3a5f5f72..0fb59e654b3 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -20,6 +20,7 @@ import static java.util.Arrays.asList; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; @@ -54,7 +55,7 @@ public class TestUtils { List parts = new ArrayList(); for (int i = 0; i < partitions; i++) parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); - return new Cluster(asList(ns), parts); + return new Cluster(asList(ns), parts, Collections.emptySet()); } /** diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java index 9fdbac78117..d11165cedf8 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java @@ -81,7 +81,6 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos metricGrpPrefix, metricTags, time, - requestTimeoutMs, retryBackoffMs); this.restUrl = restUrl; this.configStorage = configStorage; diff --git a/core/src/main/scala/kafka/common/AuthorizationException.scala b/core/src/main/scala/kafka/common/AuthorizationException.scala index 12ee0fe9fbc..55919a59896 100644 --- a/core/src/main/scala/kafka/common/AuthorizationException.scala +++ b/core/src/main/scala/kafka/common/AuthorizationException.scala @@ -20,6 +20,17 @@ package kafka.common * Exception thrown when a principal is not authorized to perform an operation. * @param message */ -class AuthorizationException(message: String) extends RuntimeException(message) { +abstract class AuthorizationException(message: String) extends RuntimeException(message) { +} + +class TopicAuthorizationException(message: String) extends AuthorizationException(message) { + def this() = this(null) +} + +class GroupAuthorizationException(message: String) extends AuthorizationException(message) { + def this() = this(null) +} + +class ClusterAuthorizationException(message: String) extends AuthorizationException(message) { def this() = this(null) } diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 6f53fac2d8a..e0ebe940703 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -57,10 +57,11 @@ object ErrorMapping { // 24: UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY // 25: UNKNOWN_CONSUMER_ID // 26: INVALID_SESSION_TIMEOUT - // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED + // 27: REBALANCE_IN_PROGRESS // 28: INVALID_COMMIT_OFFSET_SIZE - val AuthorizationCode: Short = 29 - // 30: REBALANCE_IN_PROGRESS + val TopicAuthorizationCode: Short = 29 + val GroupAuthorizationCode: Short = 30 + val ClusterAuthorizationCode: Short = 31 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -83,7 +84,9 @@ object ErrorMapping { classOf[MessageSetSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSetSizeTooLargeCode, classOf[NotEnoughReplicasException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasCode, classOf[NotEnoughReplicasAfterAppendException].asInstanceOf[Class[Throwable]] -> NotEnoughReplicasAfterAppendCode, - classOf[AuthorizationException].asInstanceOf[Class[Throwable]] -> AuthorizationCode + classOf[TopicAuthorizationException].asInstanceOf[Class[Throwable]] -> TopicAuthorizationCode, + classOf[GroupAuthorizationException].asInstanceOf[Class[Throwable]] -> GroupAuthorizationCode, + classOf[ClusterAuthorizationException].asInstanceOf[Class[Throwable]] -> ClusterAuthorizationCode ).withDefaultValue(UnknownCode) /* invert the mapping */ diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index ceb6348d381..bb397229bba 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -16,25 +16,28 @@ */ package kafka.security.auth -import kafka.common.{BaseEnum, KafkaException} +import kafka.common.{ErrorMapping, BaseEnum, KafkaException} /** * ResourceTypes. */ -sealed trait ResourceType extends BaseEnum +sealed trait ResourceType extends BaseEnum { def errorCode: Short } case object Cluster extends ResourceType { val name = "Cluster" + val errorCode = ErrorMapping.ClusterAuthorizationCode } case object Topic extends ResourceType { val name = "Topic" + val errorCode = ErrorMapping.TopicAuthorizationCode } case object Group extends ResourceType { val name = "Group" + val errorCode = ErrorMapping.GroupAuthorizationCode } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0a2e0b9dd77..21434f76b1b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -183,6 +183,14 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + // reject the request immediately if not authorized to the group + if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) { + val errors = offsetCommitRequest.requestInfo.mapValues(_ => ErrorMapping.GroupAuthorizationCode) + val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) + return + } + // filter non-exist topics val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) => !metadataCache.contains(topicAndPartition.topic) @@ -191,13 +199,12 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { case (topicAndPartition, offsetMetadata) => - authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) && - authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId)) + authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { - val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.AuthorizationCode) + val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode) mergedCommitStatus.foreach { case (topicAndPartition, errorCode) => // we only print warnings for known errors here; only replica manager could see an unknown @@ -298,7 +305,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { var errorInResponse = false - val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.AuthorizationCode, -1)) + val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1)) mergedResponseStatus.foreach { case (topicAndPartition, status) => // we only print warnings for known errors here; if it is unknown, it will cause @@ -379,7 +386,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) } - val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.AuthorizationCode, -1, MessageSet.Empty)) + val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => FetchResponsePartitionData(ErrorMapping.TopicAuthorizationCode, -1, MessageSet.Empty)) // the callback for sending a fetch response def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) { @@ -442,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) } - val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.AuthorizationCode, Nil)) + val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.TopicAuthorizationCode, Nil)) val responseMap = authorizedRequestInfo.map(elem => { val (topicAndPartition, partitionOffsetRequestInfo) = elem @@ -614,7 +621,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.AuthorizationCode)) + val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode)) val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol) val brokers = metadataCache.getAliveBrokers @@ -630,12 +637,19 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition => - authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) && - authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)) + // reject the request immediately if not authorized to the group + if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) { + val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.GroupAuthorizationCode) + val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) + return } - val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.AuthorizationCode) + val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition => + authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic)) + } + + val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.TopicAuthorizationCode) val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap val response = if (offsetFetchRequest.versionId == 0) { @@ -659,9 +673,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) - val unauthorizedTopics = unauthorizedTopicPartitions.map( topicAndPartition => - (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,ErrorMapping.AuthorizationCode))) - OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedTopics, offsetFetchRequest.correlationId) + OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId) } else { // version 1 reads offsets from Kafka; val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap @@ -683,7 +695,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseHeader = new ResponseHeader(request.header.correlationId) if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) { - val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode) + val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) @@ -716,7 +728,7 @@ class KafkaApis(val requestChannel: RequestChannel, val groups = describeRequest.groupIds().asScala.map { case groupId => if (!authorize(request.session, Describe, new Resource(Group, groupId))) { - groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED) + groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.GROUP_AUTHORIZATION_FAILED) } else { val (error, summary) = coordinator.handleDescribeGroup(groupId) val members = summary.members.map { member => @@ -738,7 +750,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseHeader = new ResponseHeader(request.header.correlationId) val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) { - ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED) + ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED) } else { val (error, groups) = coordinator.handleListGroups() val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } @@ -766,7 +778,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) { val responseBody = new JoinGroupResponse( - ErrorMapping.AuthorizationCode, + ErrorMapping.GroupAuthorizationCode, JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupResponse.UNKNOWN_PROTOCOL, JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId @@ -801,7 +813,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) { - sendResponseCallback(Array[Byte](), ErrorMapping.AuthorizationCode) + sendResponseCallback(Array[Byte](), ErrorMapping.GroupAuthorizationCode) } else { coordinator.handleSyncGroup( syncGroupRequest.groupId(), @@ -826,7 +838,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) { - val heartbeatResponse = new HeartbeatResponse(ErrorMapping.AuthorizationCode) + val heartbeatResponse = new HeartbeatResponse(ErrorMapping.GroupAuthorizationCode) requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, heartbeatResponse))) } else { @@ -877,7 +889,7 @@ class KafkaApis(val requestChannel: RequestChannel, } if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) { - val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.AuthorizationCode) + val leaveGroupResponse = new LeaveGroupResponse(ErrorMapping.GroupAuthorizationCode) requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, leaveGroupResponse))) } else { // let the coordinator to handle leave-group @@ -897,7 +909,7 @@ class KafkaApis(val requestChannel: RequestChannel, def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) - throw new AuthorizationException(s"Request $request is not authorized.") + throw new ClusterAuthorizationException(s"Request $request is not authorized.") } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e363e27db11..c8ca2a3fd73 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -10,15 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package integration.kafka.api +package kafka.api import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.nio.ByteBuffer import java.util.concurrent.ExecutionException -import java.util.{ArrayList, Properties} +import java.util.{ArrayList, Collections, Properties} -import kafka.api.RequestKeys import kafka.cluster.EndPoint import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.coordinator.GroupCoordinator @@ -26,15 +25,13 @@ import kafka.integration.KafkaServerTestHarness import kafka.security.auth._ import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException} -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.FetchRequest.PartitionData -import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState +import org.apache.kafka.common.errors._ +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.{TopicPartition, requests} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -76,42 +73,40 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT) - var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null - val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] = - Map(RequestKeys.MetadataKey -> classOf[MetadataResponse], - RequestKeys.ProduceKey -> classOf[ProduceResponse], - RequestKeys.FetchKey -> classOf[FetchResponse], - RequestKeys.OffsetsKey -> classOf[ListOffsetResponse], - RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse], - RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse], - RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse], - RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse], + Map(RequestKeys.MetadataKey -> classOf[requests.MetadataResponse], + RequestKeys.ProduceKey -> classOf[requests.ProduceResponse], + RequestKeys.FetchKey -> classOf[requests.FetchResponse], + RequestKeys.OffsetsKey -> classOf[requests.ListOffsetResponse], + RequestKeys.OffsetCommitKey -> classOf[requests.OffsetCommitResponse], + RequestKeys.OffsetFetchKey -> classOf[requests.OffsetFetchResponse], + RequestKeys.GroupCoordinatorKey -> classOf[requests.GroupCoordinatorResponse], + RequestKeys.UpdateMetadataKey -> classOf[requests.UpdateMetadataResponse], RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse], RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse], RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse], RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse], - RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse], - RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse], - RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse] + RequestKeys.LeaderAndIsrKey -> classOf[requests.LeaderAndIsrResponse], + RequestKeys.StopReplicaKey -> classOf[requests.StopReplicaResponse], + RequestKeys.ControlledShutdownKey -> classOf[requests.ControlledShutdownResponse] ) val RequestKeyToErrorCode = Map[Short, (Nothing) => Short]( - RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()), - RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), - RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()), - RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()), + RequestKeys.MetadataKey -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()), + RequestKeys.ProduceKey -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.FetchKey -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.OffsetsKey -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.OffsetCommitKey -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), + RequestKeys.OffsetFetchKey -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.GroupCoordinatorKey -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()), + RequestKeys.UpdateMetadataKey -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()), RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()), RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()), RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()), RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()), - RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode()) + RequestKeys.LeaderAndIsrKey -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), + RequestKeys.StopReplicaKey -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), + RequestKeys.ControlledShutdownKey -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()) ) val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]]( @@ -155,41 +150,6 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { servers.head.consumerCoordinator.offsetsTopicConfigs) // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers) - - val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID, "consumer", - List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) - - //we have to get a join call so the group is created and we get back a memberId - addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) - val socket = new Socket("localhost", servers.head.boundPort()) - val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey, joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse] - val memberId = joinResponse.memberId() - - //remove group acls - removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource) - - RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest]( - RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava), - RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava), - RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava), - RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava), - RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava), - RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group), - RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue, - Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, - Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava), - RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer", - List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava), - RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId -> ByteBuffer.wrap("test".getBytes())).asJava), - RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000, Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava), - RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId), - RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId), - RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue, - Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, - Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava), - RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava), - RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId) - ) } @After @@ -198,108 +158,346 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { super.tearDown() } + private def createMetadataRequest = { + new requests.MetadataRequest(List(topic).asJava) + } + + private def createProduceRequest = { + new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava) + } + + private def createFetchRequest = { + new requests.FetchRequest(5000, 100, Map(tp -> new requests.FetchRequest.PartitionData(0, 100)).asJava) + } + + private def createListOffsetsRequest = { + new requests.ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava) + } + + private def createOffsetFetchRequest = { + new requests.OffsetFetchRequest(group, List(tp).asJava) + } + + private def createGroupCoordinatorRequest = { + new requests.GroupCoordinatorRequest(group) + } + + private def createUpdateMetadataRequest = { + val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, + Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava + new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers) + } + + private def createJoinGroupRequest = { + new JoinGroupRequest(group, 30000, "", "consumer", + List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) + } + + private def createSyncGroupRequest = { + new SyncGroupRequest(group, 1, "", Map[String, ByteBuffer]().asJava) + } + + private def createOffsetCommitRequest = { + new requests.OffsetCommitRequest(group, 1, "", 1000, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava) + } + + private def createHeartbeatRequest = { + new HeartbeatRequest(group, 1, "") + } + + private def createLeaveGroupRequest = { + new LeaveGroupRequest(group, "") + } + + private def createLeaderAndIsrRequest = { + new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue, + Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava) + } + + private def createStopReplicaRequest = { + new requests.StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava) + } + + private def createControlledShutdownRequest = { + new requests.ControlledShutdownRequest(brokerId) + } + @Test def testAuthorization() { + val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest]( + RequestKeys.MetadataKey -> createMetadataRequest, + RequestKeys.ProduceKey -> createProduceRequest, + RequestKeys.FetchKey -> createFetchRequest, + RequestKeys.OffsetsKey -> createListOffsetsRequest, + RequestKeys.OffsetFetchKey -> createOffsetFetchRequest, + RequestKeys.GroupCoordinatorKey -> createGroupCoordinatorRequest, + RequestKeys.UpdateMetadataKey -> createUpdateMetadataRequest, + RequestKeys.JoinGroupKey -> createJoinGroupRequest, + RequestKeys.SyncGroupKey -> createSyncGroupRequest, + RequestKeys.OffsetCommitKey -> createOffsetCommitRequest, + RequestKeys.HeartbeatKey -> createHeartbeatRequest, + RequestKeys.LeaveGroupKey -> createLeaveGroupRequest, + RequestKeys.LeaderAndIsrKey -> createLeaderAndIsrRequest, + RequestKeys.StopReplicaKey -> createStopReplicaRequest, + RequestKeys.ControlledShutdownKey -> createControlledShutdownRequest + ) + val socket = new Socket("localhost", servers.head.boundPort()) - for ((key, request) <- RequestKeyToRequest) { + for ((key, request) <- requestKeyToRequest) { removeAllAcls - - sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode) - + val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false) for ((resource, acls) <- RequestKeysToAcls(key)) addAndVerifyAcls(acls, resource) - - sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError) + sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true) } } - @Test - def testProduceNeedsAuthorization() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) - try { - sendRecords(numRecords, tp) - Assert.fail("should have thrown exception") - } catch { - case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage, e.getMessage) - } - } - - @Test - def testOnlyWritePermissionAllowsWritingToProducer() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + @Test + def testProduceWithNoTopicAccess() { + try { sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } + } - @Test - def testCreatePermissionNeededForWritingToNonExistentTopic() { - val newTopic = "newTopic" - val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) - try { - sendRecords(numRecords, topicPartition) - Assert.fail("should have thrown exception") - } catch { - case e: TimeoutException => - //TODO Need to update the producer so it actually throws the server side of exception. - case e: Exception => Assert.fail(s"Only timeout exception should be thrown but $e thrown") - } + @Test + def testProduceWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + try { + sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) + } + } - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create), - new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) + @Test + def testProduceWithTopicRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + try { + sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) + } + } + + @Test + def testProduceWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(numRecords, tp) + } + + @Test + def testCreatePermissionNeededForWritingToNonExistentTopic() { + val newTopic = "newTopic" + val topicPartition = new TopicPartition(newTopic, 0) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + try { sendRecords(numRecords, topicPartition) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - @Test - def testConsumerNeedsAuthorization() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - //TODO: Ideally we would want to test that when consumerGroup permission is not present we still get an AuthorizationException - //but the consumer fetcher currently waits forever for the consumer metadata to become available. - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - sendRecords(1, tp) - try { - this.consumers.head.assign(List(tp).asJava) - consumeRecords(this.consumers.head) - Assert.fail("should have thrown exception") - } catch { - case e: AuthorizationException => Assert.assertEquals("Not authorized to read from topic-0", e.getMessage) - } - } + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + sendRecords(numRecords, topicPartition) + } - @Test - def testAllowingReadOnTopicAndGroupAllowsReading() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - sendRecords(1, tp) + @Test(expected = classOf[AuthorizationException]) + def testConsumeWithNoAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + } + + @Test + def testConsumeWithNoGroupAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + try { this.consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: GroupAuthorizationException => assertEquals(group, e.groupId()) + } + } + + @Test + def testConsumeWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + } + } + + @Test + def testConsumeWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + } + } + + @Test + def testConsumeWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + } + } + + @Test + def testConsumeWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + } + + @Test + def testCreatePermissionNeededToReadFromNonExistentTopic() { + val newTopic = "newTopic" + val topicPartition = new TopicPartition(newTopic, 0) + val newTopicResource = new Resource(Topic, newTopic) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) + addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) + addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource) + try { + this.consumers(0).assign(List(topicPartition).asJava) + consumeRecords(this.consumers(0)) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()); } -// TODO: The following test goes into an infinite loop as consumer waits for consumer metadata to be propogated for ever. -// @Test -// def testCreatePermissionNeededToReadFromNonExistentTopic() { -// val newTopic = "newTopic" -// val topicPartition = new TopicPartition(newTopic, 0) -// val newTopicResource = new Resource(Topic, newTopic) -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) -// addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) -// addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource) -// try { -// this.consumers(0).assign(List(topicPartition).asJava) -// consumeRecords(this.consumers(0)) -// Assert.fail("should have thrown exception") -// } catch { -// //checking for the message and type to ensure whenever these things are fixed on client side the test starts failing. -// case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.") -// } -// -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) -// -// sendRecords(numRecords, topicPartition) -// consumeRecords(this.consumers(0)) -// } + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + + sendRecords(numRecords, topicPartition) + consumeRecords(this.consumers(0), topic = newTopic, part = 0) + } + + @Test(expected = classOf[AuthorizationException]) + def testCommitWithNoAccess() { + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[GroupAuthorizationException]) + def testCommitWithNoGroupAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test + def testCommitWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[AuthorizationException]) + def testOffsetFetchWithNoAccess() { + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test(expected = classOf[GroupAuthorizationException]) + def testOffsetFetchWithNoGroupAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testOffsetFetchWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test + def testOffsetFetchTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test + def testOffsetFetchWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } def removeAllAcls() = { servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => @@ -308,7 +506,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } } - def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest, expectedErrorCode: Short): AbstractRequestResponse = { + def sendRequestAndVerifyResponseErrorCode(socket: Socket, + key: Short, + request: AbstractRequest, + resources: Set[ResourceType], + isAuthorized: Boolean): AbstractRequestResponse = { val header = new RequestHeader(key, "client", 1) val body = request.toStruct @@ -323,7 +525,14 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { ResponseHeader.parse(resp) val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse] - Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response)) + val errorCode = RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response) + + val possibleErrorCodes = resources.map(_.errorCode) + if (isAuthorized) + assertFalse(s"${ApiKeys.forId(key)} should be allowed", possibleErrorCodes.contains(errorCode)) + else + assertTrue(s"${ApiKeys.forId(key)} should be forbidden", possibleErrorCodes.contains(errorCode)) + response } @@ -364,8 +573,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } - private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int = - 0) { + private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + numRecords: Int = 1, + startingOffset: Int = 0, + topic: String = topic, + part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 var iters = 0 diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index d43fc531a4a..2a5ca9b1d09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +43,7 @@ public class DefaultPartitionGrouperTest { new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); @Test public void testGrouping() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d8141d1b2f2..909df13b3e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -77,7 +77,7 @@ public class StreamThreadTest { new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.emptySet()); PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));