KAFKA-18783 : Extend InvalidConfigurationException related exceptions (#19731)

## Summary
Extend InvalidConfigurationException related exceptions
  - `AuthenticationException`
  - `AuthorizationException`
  - `ClusterAuthorizationException`
  - `TransactionalIdAuthorizationException`
  - `UnsupportedVersionException`
  - `UnsupportedForMessageFormatException`
  - `InvalidRecordException`
  - `InvalidRequiredAcksException`
  - `RecordBatchTooLargeException`
  - `InvalidTopicException`
  - `TopicAuthorizationException`
  - `GroupAuthorizationException`

## Testing
- Added unit tests in `TransactionExceptionHierarchyTest` to verify:
  - All configuration-related exceptions properly extend
`InvalidConfigurationException`

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Kaushik Raina 2025-05-21 05:01:33 +05:30 committed by GitHub
parent 37fdbb3575
commit 896f283e1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 53 additions and 18 deletions

View File

@ -16,9 +16,9 @@
*/ */
package org.apache.kafka.common; package org.apache.kafka.common;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.InvalidConfigurationException;
public class InvalidRecordException extends ApiException { public class InvalidRecordException extends InvalidConfigurationException {
private static final long serialVersionUID = 1; private static final long serialVersionUID = 1;

View File

@ -32,7 +32,7 @@ import javax.net.ssl.SSLException;
* <li>{@link SslAuthenticationException} if SSL handshake failed due to any {@link SSLException}. * <li>{@link SslAuthenticationException} if SSL handshake failed due to any {@link SSLException}.
* </ul> * </ul>
*/ */
public class AuthenticationException extends ApiException { public class AuthenticationException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class AuthorizationException extends ApiException { public class AuthorizationException extends InvalidConfigurationException {
public AuthorizationException(String message) { public AuthorizationException(String message) {
super(message); super(message);

View File

@ -20,12 +20,20 @@ public class InvalidConfigurationException extends ApiException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public InvalidConfigurationException(String message) {
super(message);
}
public InvalidConfigurationException(String message, Throwable cause) { public InvalidConfigurationException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }
public InvalidConfigurationException(String message) {
super(message);
}
public InvalidConfigurationException(Throwable cause) {
super(cause);
}
public InvalidConfigurationException() {
super();
}
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class InvalidReplicationFactorException extends ApiException { public class InvalidReplicationFactorException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class InvalidRequiredAcksException extends ApiException { public class InvalidRequiredAcksException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public InvalidRequiredAcksException(String message) { public InvalidRequiredAcksException(String message) {

View File

@ -27,7 +27,7 @@ import java.util.Set;
* *
* @see UnknownTopicOrPartitionException * @see UnknownTopicOrPartitionException
*/ */
public class InvalidTopicException extends ApiException { public class InvalidTopicException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final Set<String> invalidTopics; private final Set<String> invalidTopics;

View File

@ -19,7 +19,7 @@ package org.apache.kafka.common.errors;
/** /**
* This record batch is larger than the maximum allowable size * This record batch is larger than the maximum allowable size
*/ */
public class RecordBatchTooLargeException extends ApiException { public class RecordBatchTooLargeException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -20,7 +20,7 @@ package org.apache.kafka.common.errors;
* The message format version does not support the requested function. For example, if idempotence is * The message format version does not support the requested function. For example, if idempotence is
* requested and the topic is using a message format older than 0.11.0.0, then this error will be returned. * requested and the topic is using a message format older than 0.11.0.0, then this error will be returned.
*/ */
public class UnsupportedForMessageFormatException extends ApiException { public class UnsupportedForMessageFormatException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public UnsupportedForMessageFormatException(String message) { public UnsupportedForMessageFormatException(String message) {

View File

@ -28,7 +28,7 @@ import java.util.Map;
* is raised from {@link org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(Map)}, it would * is raised from {@link org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(Map)}, it would
* be possible to revert to alternative logic to set the consumer's position. * be possible to revert to alternative logic to set the consumer's position.
*/ */
public class UnsupportedVersionException extends ApiException { public class UnsupportedVersionException extends InvalidConfigurationException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public UnsupportedVersionException(String message, Throwable cause) { public UnsupportedVersionException(String message, Throwable cause) {

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
import org.apache.kafka.common.InvalidRecordException;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
@ -94,4 +96,29 @@ public class TransactionExceptionHierarchyTest {
assertTrue(ApplicationRecoverableException.class.isAssignableFrom(exceptionClass), assertTrue(ApplicationRecoverableException.class.isAssignableFrom(exceptionClass),
exceptionClass.getSimpleName() + " should extend ApplicationRecoverableException"); exceptionClass.getSimpleName() + " should extend ApplicationRecoverableException");
} }
/**
* Verifies that the given exception class extends `InvalidConfigurationException`
*
* @param exceptionClass the exception class to check
*/
@ParameterizedTest
@ValueSource(classes = {
AuthenticationException.class,
AuthorizationException.class,
ClusterAuthorizationException.class,
TransactionalIdAuthorizationException.class,
UnsupportedVersionException.class,
UnsupportedForMessageFormatException.class,
InvalidRecordException.class,
InvalidRequiredAcksException.class,
RecordBatchTooLargeException.class,
InvalidTopicException.class,
TopicAuthorizationException.class,
GroupAuthorizationException.class
})
void testInvalidConfigurationExceptionHierarchy(Class<? extends Exception> exceptionClass) {
assertTrue(InvalidConfigurationException.class.isAssignableFrom(exceptionClass),
exceptionClass.getSimpleName() + " should extend InvalidConfigurationException");
}
} }

View File

@ -86,13 +86,13 @@ object ConfigCommand extends Logging {
opts.checkArgs() opts.checkArgs()
processCommand(opts) processCommand(opts)
} catch { } catch {
case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) => case e: UnsupportedVersionException =>
logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e) logger.debug(s"Unsupported API encountered in server when executing config command with args '${args.mkString(" ")}'")
System.err.println(e.getMessage) System.err.println(e.getMessage)
Exit.exit(1) Exit.exit(1)
case e: UnsupportedVersionException => case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) =>
logger.debug(s"Unsupported API encountered in server when executing config command with args '${args.mkString(" ")}'") logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e)
System.err.println(e.getMessage) System.err.println(e.getMessage)
Exit.exit(1) Exit.exit(1)