diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 5a5444e99c7..d5faa6e79da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -541,9 +541,10 @@ public abstract class AbstractCoordinator implements Closeable { // log the error and re-throw the exception log.error("Attempt to join group failed due to fatal error: {}", error.message()); if (error == Errors.GROUP_MAX_SIZE_REACHED) { - future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId)); + future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId + + " already has the configured maximum number of members.")); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { future.raise(error); } @@ -633,7 +634,7 @@ public abstract class AbstractCoordinator implements Closeable { requestRejoin(); if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { log.debug("SyncGroup failed because the group began another rebalance"); future.raise(error); @@ -699,7 +700,7 @@ public abstract class AbstractCoordinator implements Closeable { } future.complete(null); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage()); future.raise(error); @@ -923,7 +924,7 @@ public abstract class AbstractCoordinator implements Closeable { resetGeneration(); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 48d0c95bbdb..4866986d37e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -995,7 +995,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); @@ -1090,7 +1090,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { markCoordinatorUnknown(); future.raise(error); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - future.raise(new GroupAuthorizationException(rebalanceConfig.groupId)); + future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 121ddb25595..919be28feaf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -652,7 +652,7 @@ public class Sender implements Runnable { } else { final RuntimeException exception; if (error == Errors.TOPIC_AUTHORIZATION_FAILED) - exception = new TopicAuthorizationException(batch.topicPartition.topic()); + exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic())); else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); else diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index ea4d9d0fe60..03b023183d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1302,7 +1302,7 @@ public class TransactionManager { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) { - abortableError(new GroupAuthorizationException(builder.data().key())); + abortableError(GroupAuthorizationException.forGroupId(builder.data().key())); } else { fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" + "unexpected error: %s", coordinatorType, builder.data().key(), @@ -1401,7 +1401,7 @@ public class TransactionManager { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - abortableError(new GroupAuthorizationException(builder.consumerGroupId())); + abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId())); } else { fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); } @@ -1463,7 +1463,7 @@ public class TransactionManager { // If the topic is unknown or the coordinator is loading, retry with the current coordinator continue; } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - abortableError(new GroupAuthorizationException(builder.consumerGroupId())); + abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId())); break; } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.INVALID_PRODUCER_EPOCH diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java index c3f0795de43..22eae3b57b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupAuthorizationException.java @@ -19,13 +19,27 @@ package org.apache.kafka.common.errors; public class GroupAuthorizationException extends AuthorizationException { private final String groupId; - public GroupAuthorizationException(String groupId) { - super("Not authorized to access group: " + groupId); + public GroupAuthorizationException(String message, String groupId) { + super(message); this.groupId = groupId; } + public GroupAuthorizationException(String message) { + this(message, null); + } + + /** + * Return the group ID that failed authorization. May be null if it is not known + * in the context the exception was raised in. + * + * @return nullable groupId + */ public String groupId() { return groupId; } + public static GroupAuthorizationException forGroupId(String groupId) { + return new GroupAuthorizationException("Not authorized to access group: " + groupId, groupId); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java index 086b79ff028..85d0c7d25ce 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupMaxSizeReachedException.java @@ -22,7 +22,7 @@ package org.apache.kafka.common.errors; public class GroupMaxSizeReachedException extends ApiException { private static final long serialVersionUID = 1L; - public GroupMaxSizeReachedException(String groupId) { - super("Consumer group " + groupId + " already has the configured maximum number of members."); + public GroupMaxSizeReachedException(String message) { + super(message); } } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java index 6bf260d5cde..e2235f804e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java @@ -22,15 +22,25 @@ import java.util.Set; public class TopicAuthorizationException extends AuthorizationException { private final Set unauthorizedTopics; - public TopicAuthorizationException(Set unauthorizedTopics) { - super("Not authorized to access topics: " + unauthorizedTopics); + public TopicAuthorizationException(String message, Set unauthorizedTopics) { + super(message); this.unauthorizedTopics = unauthorizedTopics; } - public TopicAuthorizationException(String unauthorizedTopic) { - this(Collections.singleton(unauthorizedTopic)); + public TopicAuthorizationException(Set unauthorizedTopics) { + this("Not authorized to access topics: " + unauthorizedTopics, unauthorizedTopics); } + public TopicAuthorizationException(String message) { + this(message, Collections.emptySet()); + } + + /** + * Get the set of topics which failed authorization. May be empty if the set is not known + * in the context the exception was raised in. + * + * @return possibly empty set of unauthorized topics + */ public Set unauthorizedTopics() { return unauthorizedTopics; } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 7e39f693269..89bc0515d7d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -185,10 +185,8 @@ public enum Errors { RebalanceInProgressException::new), INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid.", InvalidCommitOffsetSizeException::new), - TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.", - TopicAuthorizationException::new), - GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.", - GroupAuthorizationException::new), + TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.", TopicAuthorizationException::new), + GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.", GroupAuthorizationException::new), CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.", ClusterAuthorizationException::new), INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.", diff --git a/docs/upgrade.html b/docs/upgrade.html index e6bdd4a7d93..9d0e73803a6 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -24,6 +24,11 @@
  • The bin/kafka-preferred-replica-election.sh command line tool has been deprecated. It has been replaced by bin/kafka-leader-election.sh.
  • The methods electPreferredLeaders in the Java AdminClient class have been deprecated in favor of the methods electLeaders.
  • Scala code leveraging the NewTopic(String, int, short) constructor with literal values will need to explicitly call toShort on the second literal.
  • +
  • The argument in the constructor GroupAuthorizationException(String) is now used to specify an exception message. + Previously it referred to the group that failed authorization. This was done for consistency with other exception types and to + avoid potential misuse. The constructor TopicAuthorizationException(String) which was previously used for a single + unauthorized topic was changed similarly. +
  • Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0