diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index e9b4be83b6e..128320c30a8 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LeaderNotAvailableException; -import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -502,8 +501,8 @@ public class SharePartition { if (partitionData.errorCode() != Errors.NONE.code()) { KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); - logError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.", - groupId, topicIdPartition, partitionData), ex); + maybeLogError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.", + groupId, topicIdPartition, partitionData), Errors.forCode(partitionData.errorCode()), ex); throwable = ex; return; } @@ -2325,8 +2324,8 @@ public class SharePartition { PartitionErrorData partitionData = state.partitions().get(0); if (partitionData.errorCode() != Errors.NONE.code()) { KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); - logError(String.format("Failed to write the share group state for share partition: %s-%s due to exception", - groupId, topicIdPartition), ex); + maybeLogError(String.format("Failed to write the share group state for share partition: %s-%s due to exception", + groupId, topicIdPartition), Errors.forCode(partitionData.errorCode()), ex); future.completeExceptionally(ex); return; } @@ -2344,12 +2343,8 @@ public class SharePartition { new GroupIdNotFoundException(errorMessage); case UNKNOWN_TOPIC_OR_PARTITION -> new UnknownTopicOrPartitionException(errorMessage); - case FENCED_STATE_EPOCH -> - new FencedStateEpochException(errorMessage); - case FENCED_LEADER_EPOCH -> + case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH -> new NotLeaderOrFollowerException(errorMessage); - case NETWORK_EXCEPTION -> - new NetworkException(errorMessage); default -> new UnknownServerException(errorMessage); }; @@ -2578,11 +2573,11 @@ public class SharePartition { return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); } - private void logError(String message, Throwable e) { - if (e instanceof NetworkException) { - log.debug(message, e); + private void maybeLogError(String message, Errors receivedError, Throwable wrappedException) { + if (receivedError == Errors.NETWORK_EXCEPTION) { + log.debug(message, wrappedException); } else { - log.error(message, e); + log.error(message, wrappedException); } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index ac8e9ba0fd6..495ea4227a7 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; -import org.apache.kafka.common.errors.FencedStateEpochException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -758,7 +757,7 @@ public class SharePartitionTest { result = sharePartition.maybeInitialize(); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(FencedStateEpochException.class, result); + assertFutureThrows(NotLeaderOrFollowerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); // Mock FENCED_LEADER_EPOCH error. @@ -788,6 +787,20 @@ public class SharePartitionTest { assertTrue(result.isCompletedExceptionally()); assertFutureThrows(UnknownServerException.class, result); assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); + + // Mock NETWORK_EXCEPTION error. + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 5, 10L, Errors.NETWORK_EXCEPTION.code(), Errors.NETWORK_EXCEPTION.message(), + List.of()))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertTrue(result.isCompletedExceptionally()); + assertFutureThrows(UnknownServerException.class, result); + assertEquals(SharePartitionState.FAILED, sharePartition.partitionState()); } @Test @@ -5564,7 +5577,7 @@ public class SharePartitionTest { result = sharePartition.writeShareGroupState(anyList()); assertTrue(result.isCompletedExceptionally()); - assertFutureThrows(FencedStateEpochException.class, result); + assertFutureThrows(NotLeaderOrFollowerException.class, result); // Mock Write state RPC to return error response, FENCED_LEADER_EPOCH. Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index eedf66c962e..98b08a3e50a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -402,14 +402,14 @@ public class PersisterStateManager { log.debug("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response); if (response.wasDisconnected()) { - errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException("Server response indicates disconnect.")); + errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException(String.format("Server response for %s indicates disconnect.", this.partitionKey))); return Optional.of(Errors.NETWORK_EXCEPTION); } else if (response.wasTimedOut()) { log.error("Response for RPC {} with key {} timed out - {}.", name(), this.partitionKey, response); - errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException("Server response indicates timeout.")); + errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException(String.format("Server response for %s indicates timeout.", this.partitionKey))); return Optional.of(Errors.REQUEST_TIMED_OUT); } else { - errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Server did not provide any response.")); + errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException(String.format("Server did not provide any response for %s.", this.partitionKey))); return Optional.of(Errors.UNKNOWN_SERVER_ERROR); } }