MINOR: Fix api exception single argument constructor usage (#6956)

Api exception types usually have a single argument constructor which accepts the exception message. However, some types actually use this constructor to initialize a field. This inconsistency has led to some cases where exception messages were being incorrectly passed to these constructors and interpreted incorrectly. For example, this leads to confusing messages like the following in the log when we hit a GROUP_MAX_SIZE_REACHED error:
```
Attempt to join group failed due to fatal error: Consumer group The consumer group has reached its max size. already has the configured ...
```
This patch fixes the problem by changing these constructors so that the exception message is provided consistently. This affected `GroupAuthorizationException`, `TopicAuthorizationException`, and `GroupMaxSizeReachedException`. 

Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2019-07-17 09:42:57 -07:00 committed by GitHub
parent ecf23b51b0
commit 53b10a37a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 51 additions and 23 deletions

View File

@ -541,9 +541,10 @@ public abstract class AbstractCoordinator implements Closeable {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException(rebalanceConfig.groupId));
future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
" already has the configured maximum number of members."));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(error);
}
@ -633,7 +634,7 @@ public abstract class AbstractCoordinator implements Closeable {
requestRejoin();
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.debug("SyncGroup failed because the group began another rebalance");
future.raise(error);
@ -699,7 +700,7 @@ public abstract class AbstractCoordinator implements Closeable {
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
future.raise(error);
@ -923,7 +924,7 @@ public abstract class AbstractCoordinator implements Closeable {
resetGeneration();
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}

View File

@ -995,7 +995,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
@ -1090,7 +1090,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
markCoordinatorUnknown();
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(rebalanceConfig.groupId));
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}

View File

@ -652,7 +652,7 @@ public class Sender implements Runnable {
} else {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else

View File

@ -1302,7 +1302,7 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new GroupAuthorizationException(builder.data().key()));
abortableError(GroupAuthorizationException.forGroupId(builder.data().key()));
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
"unexpected error: %s", coordinatorType, builder.data().key(),
@ -1401,7 +1401,7 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
} else {
fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
}
@ -1463,7 +1463,7 @@ public class TransactionManager {
// If the topic is unknown or the coordinator is loading, retry with the current coordinator
continue;
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId()));
break;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
|| error == Errors.INVALID_PRODUCER_EPOCH

View File

@ -19,13 +19,27 @@ package org.apache.kafka.common.errors;
public class GroupAuthorizationException extends AuthorizationException {
private final String groupId;
public GroupAuthorizationException(String groupId) {
super("Not authorized to access group: " + groupId);
public GroupAuthorizationException(String message, String groupId) {
super(message);
this.groupId = groupId;
}
public GroupAuthorizationException(String message) {
this(message, null);
}
/**
* Return the group ID that failed authorization. May be null if it is not known
* in the context the exception was raised in.
*
* @return nullable groupId
*/
public String groupId() {
return groupId;
}
public static GroupAuthorizationException forGroupId(String groupId) {
return new GroupAuthorizationException("Not authorized to access group: " + groupId, groupId);
}
}

View File

@ -22,7 +22,7 @@ package org.apache.kafka.common.errors;
public class GroupMaxSizeReachedException extends ApiException {
private static final long serialVersionUID = 1L;
public GroupMaxSizeReachedException(String groupId) {
super("Consumer group " + groupId + " already has the configured maximum number of members.");
public GroupMaxSizeReachedException(String message) {
super(message);
}
}

View File

@ -22,15 +22,25 @@ import java.util.Set;
public class TopicAuthorizationException extends AuthorizationException {
private final Set<String> unauthorizedTopics;
public TopicAuthorizationException(Set<String> unauthorizedTopics) {
super("Not authorized to access topics: " + unauthorizedTopics);
public TopicAuthorizationException(String message, Set<String> unauthorizedTopics) {
super(message);
this.unauthorizedTopics = unauthorizedTopics;
}
public TopicAuthorizationException(String unauthorizedTopic) {
this(Collections.singleton(unauthorizedTopic));
public TopicAuthorizationException(Set<String> unauthorizedTopics) {
this("Not authorized to access topics: " + unauthorizedTopics, unauthorizedTopics);
}
public TopicAuthorizationException(String message) {
this(message, Collections.emptySet());
}
/**
* Get the set of topics which failed authorization. May be empty if the set is not known
* in the context the exception was raised in.
*
* @return possibly empty set of unauthorized topics
*/
public Set<String> unauthorizedTopics() {
return unauthorizedTopics;
}

View File

@ -185,10 +185,8 @@ public enum Errors {
RebalanceInProgressException::new),
INVALID_COMMIT_OFFSET_SIZE(28, "The committing offset data size is not valid.",
InvalidCommitOffsetSizeException::new),
TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.",
TopicAuthorizationException::new),
GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.",
GroupAuthorizationException::new),
TOPIC_AUTHORIZATION_FAILED(29, "Topic authorization failed.", TopicAuthorizationException::new),
GROUP_AUTHORIZATION_FAILED(30, "Group authorization failed.", GroupAuthorizationException::new),
CLUSTER_AUTHORIZATION_FAILED(31, "Cluster authorization failed.",
ClusterAuthorizationException::new),
INVALID_TIMESTAMP(32, "The timestamp of the message is out of acceptable range.",

View File

@ -24,6 +24,11 @@
<li>The <code>bin/kafka-preferred-replica-election.sh</code> command line tool has been deprecated. It has been replaced by <code>bin/kafka-leader-election.sh</code>.</li>
<li>The methods <code>electPreferredLeaders</code> in the Java <code>AdminClient</code> class have been deprecated in favor of the methods <code>electLeaders</code>.</li>
<li>Scala code leveraging the <code>NewTopic(String, int, short)</code> constructor with literal values will need to explicitly call <code>toShort</code> on the second literal.</li>
<li>The argument in the constructor <code>GroupAuthorizationException(String)</code> is now used to specify an exception message.
Previously it referred to the group that failed authorization. This was done for consistency with other exception types and to
avoid potential misuse. The constructor <code>TopicAuthorizationException(String)</code> which was previously used for a single
unauthorized topic was changed similarly.
</li>
</ul>
<h4><a id="upgrade_2_3_0" href="#upgrade_2_3_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, 2.0.x or 2.1.x or 2.2.x to 2.3.0</a></h4>