KAFKA-18781: Extend RefreshRetriableException related exceptions (#19136)

- Extended derived exceptions described in
[KIP-1050](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=309496816#KIP1050:ConsistenterrorhandlingforTransactions-RefreshRetriableException)
to include the new RefreshRetriableException in base hierarchy.
- Added unit tests to validate the hierarchy of the derived exceptions
in relevant scenarios.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Kaushik Raina 2025-03-14 21:41:31 +05:30 committed by GitHub
parent f50a17fa8d
commit c32c167e04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 43 additions and 3 deletions

View File

@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
* In the context of the transactional coordinator, this error will be returned if the underlying transactional log
* is under replicated or if an append to the log times out.
*/
public class CoordinatorNotAvailableException extends RetriableException {
public class CoordinatorNotAvailableException extends RefreshRetriableException {
public static final CoordinatorNotAvailableException INSTANCE = new CoordinatorNotAvailableException();
private static final long serialVersionUID = 1L;

View File

@ -19,7 +19,7 @@ package org.apache.kafka.common.errors;
/**
* An exception that may indicate the client's metadata is out of date
*/
public abstract class InvalidMetadataException extends RetriableException {
public abstract class InvalidMetadataException extends RefreshRetriableException {
private static final long serialVersionUID = 1L;

View File

@ -23,7 +23,7 @@ package org.apache.kafka.common.errors;
* In the context of the transactional coordinator, it returns this error when it receives a transactional
* request with a transactionalId the coordinator doesn't own.
*/
public class NotCoordinatorException extends RetriableException {
public class NotCoordinatorException extends RefreshRetriableException {
private static final long serialVersionUID = 1L;

View File

@ -22,7 +22,20 @@ package org.apache.kafka.common.errors;
* The request can be modified or updated with fresh metadata before being retried.
*/
public abstract class RefreshRetriableException extends RetriableException {
private static final long serialVersionUID = 1L;
public RefreshRetriableException(String message, Throwable cause) {
super(message, cause);
}
public RefreshRetriableException(String message) {
super(message);
}
public RefreshRetriableException(Throwable cause) {
super(cause);
}
public RefreshRetriableException() {
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.errors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -48,4 +49,30 @@ public class TransactionExceptionHierarchyTest {
assertFalse(RefreshRetriableException.class.isAssignableFrom(exceptionClass),
exceptionClass.getSimpleName() + " should NOT extend RefreshRetriableException");
}
/**
* Verifies that RefreshRetriableException extends RetriableException.
*/
@Test
void testRefreshRetriableException() {
assertTrue(RetriableException.class.isAssignableFrom(RefreshRetriableException.class),
"RefreshRetriableException should extend RetriableException");
}
/**
* Verifies that the given exception class extends `RefreshRetriableException`
*
* @param exceptionClass the exception class to check
*/
@ParameterizedTest
@ValueSource(classes = {
UnknownTopicOrPartitionException.class,
NotLeaderOrFollowerException.class,
NotCoordinatorException.class,
CoordinatorNotAvailableException.class
})
void testRefreshRetriableExceptionHierarchy(Class<? extends Exception> exceptionClass) {
assertTrue(RefreshRetriableException.class.isAssignableFrom(exceptionClass),
exceptionClass.getSimpleName() + " should extend RefreshRetriableException");
}
}