diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 91a11d488f4..e9b4be83b6e 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.InvalidRecordStateException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -501,8 +502,8 @@ public class SharePartition { if (partitionData.errorCode() != Errors.NONE.code()) { KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); - log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.", - groupId, topicIdPartition, partitionData); + logError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.", + groupId, topicIdPartition, partitionData), ex); throwable = ex; return; } @@ -2076,7 +2077,7 @@ public class SharePartition { lock.writeLock().lock(); try { if (exception != null) { - log.error("Failed to write state to persister for the share partition: {}-{}", + log.debug("Failed to write state to persister for the share partition: {}-{}", groupId, topicIdPartition, exception); updatedStates.forEach(state -> state.completeStateTransition(false)); future.completeExceptionally(exception); @@ -2324,8 +2325,8 @@ public class SharePartition { PartitionErrorData partitionData = state.partitions().get(0); if (partitionData.errorCode() != Errors.NONE.code()) { KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); - log.error("Failed to write the share group state for share partition: {}-{} due to exception", - groupId, topicIdPartition, ex); + logError(String.format("Failed to write the share group state for share partition: %s-%s due to exception", + groupId, topicIdPartition), ex); future.completeExceptionally(ex); return; } @@ -2347,6 +2348,8 @@ public class SharePartition { new FencedStateEpochException(errorMessage); case FENCED_LEADER_EPOCH -> new NotLeaderOrFollowerException(errorMessage); + case NETWORK_EXCEPTION -> + new NetworkException(errorMessage); default -> new UnknownServerException(errorMessage); }; @@ -2423,7 +2426,7 @@ public class SharePartition { if (!stateBatches.isEmpty()) { writeShareGroupState(stateBatches).whenComplete((result, exception) -> { if (exception != null) { - log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", + log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId, exception); } // Even if write share group state RPC call fails, we will still go ahead with the state transition. @@ -2575,6 +2578,14 @@ public class SharePartition { return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); } + private void logError(String message, Throwable e) { + if (e instanceof NetworkException) { + log.debug(message, e); + } else { + log.error(message, e); + } + } + /** * This function filters out the offsets present in the acquired records list that are also a part of batches that need to be archived. * It follows an iterative refinement of acquired records to eliminate batches to be archived. diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java index ae8b8c317c3..fe66efa7f7f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java @@ -139,6 +139,9 @@ public class DefaultStatePersister implements Persister { .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + log.debug("{}-{}-{}: stateEpoch - {}, leaderEpoch - {}.", + groupId, topicData.topicId(), partitionData.partition(), partitionData.stateEpoch(), partitionData.leaderEpoch()); + handlers.add( stateManager.new WriteStateHandler( groupId, diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 9e451b4b1cd..eedf66c962e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -374,7 +374,7 @@ public class PersisterStateManager { // We don't know if FIND_COORD or actual REQUEST. Let's err on side of request. if (response == null) { - requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response")); + requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response (response = null)")); sender.wakeup(); return; } @@ -399,16 +399,17 @@ public class PersisterStateManager { return Optional.empty(); } - log.error("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response); + log.debug("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response); if (response.wasDisconnected()) { - errorConsumer.accept(Errors.NETWORK_EXCEPTION, null); + errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException("Server response indicates disconnect.")); return Optional.of(Errors.NETWORK_EXCEPTION); } else if (response.wasTimedOut()) { - errorConsumer.accept(Errors.REQUEST_TIMED_OUT, null); + log.error("Response for RPC {} with key {} timed out - {}.", name(), this.partitionKey, response); + errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException("Server response indicates timeout.")); return Optional.of(Errors.REQUEST_TIMED_OUT); } else { - errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response")); + errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Server did not provide any response.")); return Optional.of(Errors.UNKNOWN_SERVER_ERROR); } } @@ -453,7 +454,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: // retriable error codes case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message()); + log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message()); if (!findCoordBackoff.canAttempt()) { log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey()); findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success.")); @@ -580,7 +581,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message()); if (!initializeStateBackoff.canAttempt()) { log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete initialize state RPC without success.")); @@ -738,7 +739,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message()); if (!writeStateBackoff.canAttempt()) { log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete write state RPC without success.")); @@ -880,7 +881,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); if (!readStateBackoff.canAttempt()) { log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); @@ -1022,7 +1023,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); if (!readStateSummaryBackoff.canAttempt()) { log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success.")); @@ -1161,7 +1162,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: - log.warn("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message()); if (!deleteStateBackoff.canAttempt()) { log.error("Exhausted max retries for delete state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete delete state RPC without success."));