MINOR: Change exceptions for few error codes in SharePartition. (#20020)
CI / build (push) Waiting to run Details

* The `SharePartition` class wraps the errors received from
`PersisterStateManager` to be sent to the client.
* In this PR, we are categorizing the errors a bit better.
* Some exception messages in `PersisterStateManager` have been updated
to show the share partition key.
* Tests have been updated wherever needed.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>
This commit is contained in:
Sushant Mahajan 2025-06-23 23:57:15 +05:30 committed by GitHub
parent 4fedffd282
commit 3d4407ff9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 20 deletions

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -502,8 +501,8 @@ public class SharePartition {
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
logError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.",
groupId, topicIdPartition, partitionData), ex);
maybeLogError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.",
groupId, topicIdPartition, partitionData), Errors.forCode(partitionData.errorCode()), ex);
throwable = ex;
return;
}
@ -2325,8 +2324,8 @@ public class SharePartition {
PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
logError(String.format("Failed to write the share group state for share partition: %s-%s due to exception",
groupId, topicIdPartition), ex);
maybeLogError(String.format("Failed to write the share group state for share partition: %s-%s due to exception",
groupId, topicIdPartition), Errors.forCode(partitionData.errorCode()), ex);
future.completeExceptionally(ex);
return;
}
@ -2344,12 +2343,8 @@ public class SharePartition {
new GroupIdNotFoundException(errorMessage);
case UNKNOWN_TOPIC_OR_PARTITION ->
new UnknownTopicOrPartitionException(errorMessage);
case FENCED_STATE_EPOCH ->
new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH ->
case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH ->
new NotLeaderOrFollowerException(errorMessage);
case NETWORK_EXCEPTION ->
new NetworkException(errorMessage);
default ->
new UnknownServerException(errorMessage);
};
@ -2578,11 +2573,11 @@ public class SharePartition {
return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive);
}
private void logError(String message, Throwable e) {
if (e instanceof NetworkException) {
log.debug(message, e);
private void maybeLogError(String message, Errors receivedError, Throwable wrappedException) {
if (receivedError == Errors.NETWORK_EXCEPTION) {
log.debug(message, wrappedException);
} else {
log.error(message, e);
log.error(message, wrappedException);
}
}

View File

@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -758,7 +757,7 @@ public class SharePartitionTest {
result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(FencedStateEpochException.class, result);
assertFutureThrows(NotLeaderOrFollowerException.class, result);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
// Mock FENCED_LEADER_EPOCH error.
@ -788,6 +787,20 @@ public class SharePartitionTest {
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(UnknownServerException.class, result);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
// Mock NETWORK_EXCEPTION error.
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
PartitionFactory.newPartitionAllData(0, 5, 10L, Errors.NETWORK_EXCEPTION.code(), Errors.NETWORK_EXCEPTION.message(),
List.of())))));
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
sharePartition = SharePartitionBuilder.builder().withPersister(persister).build();
result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(UnknownServerException.class, result);
assertEquals(SharePartitionState.FAILED, sharePartition.partitionState());
}
@Test
@ -5564,7 +5577,7 @@ public class SharePartitionTest {
result = sharePartition.writeShareGroupState(anyList());
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(FencedStateEpochException.class, result);
assertFutureThrows(NotLeaderOrFollowerException.class, result);
// Mock Write state RPC to return error response, FENCED_LEADER_EPOCH.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(

View File

@ -402,14 +402,14 @@ public class PersisterStateManager {
log.debug("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response);
if (response.wasDisconnected()) {
errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException("Server response indicates disconnect."));
errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException(String.format("Server response for %s indicates disconnect.", this.partitionKey)));
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) {
log.error("Response for RPC {} with key {} timed out - {}.", name(), this.partitionKey, response);
errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException("Server response indicates timeout."));
errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException(String.format("Server response for %s indicates timeout.", this.partitionKey)));
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Server did not provide any response."));
errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException(String.format("Server did not provide any response for %s.", this.partitionKey)));
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
}
}