MINOR: Reduce logging in persister. (#19998)
CI / build (push) Waiting to run Details

* Few logs in `PersisterStateManager` were noisy and not adding much
value.
* For the sake of reducing pollution, they have been moved to debug
level.
* Additional debug log in `DefaultStatePersister` to track epochs.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Andrew Schofield
 <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-06-20 18:23:46 +05:30 committed by GitHub
parent 659ace836c
commit d5e2ecae95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 32 additions and 17 deletions

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@ -501,8 +502,8 @@ public class SharePartition {
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.",
groupId, topicIdPartition, partitionData);
logError(String.format("Failed to initialize the share partition: %s-%s. Exception occurred: %s.",
groupId, topicIdPartition, partitionData), ex);
throwable = ex;
return;
}
@ -2076,7 +2077,7 @@ public class SharePartition {
lock.writeLock().lock();
try {
if (exception != null) {
log.error("Failed to write state to persister for the share partition: {}-{}",
log.debug("Failed to write state to persister for the share partition: {}-{}",
groupId, topicIdPartition, exception);
updatedStates.forEach(state -> state.completeStateTransition(false));
future.completeExceptionally(exception);
@ -2324,8 +2325,8 @@ public class SharePartition {
PartitionErrorData partitionData = state.partitions().get(0);
if (partitionData.errorCode() != Errors.NONE.code()) {
KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
log.error("Failed to write the share group state for share partition: {}-{} due to exception",
groupId, topicIdPartition, ex);
logError(String.format("Failed to write the share group state for share partition: %s-%s due to exception",
groupId, topicIdPartition), ex);
future.completeExceptionally(ex);
return;
}
@ -2347,6 +2348,8 @@ public class SharePartition {
new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH ->
new NotLeaderOrFollowerException(errorMessage);
case NETWORK_EXCEPTION ->
new NetworkException(errorMessage);
default ->
new UnknownServerException(errorMessage);
};
@ -2423,7 +2426,7 @@ public class SharePartition {
if (!stateBatches.isEmpty()) {
writeShareGroupState(stateBatches).whenComplete((result, exception) -> {
if (exception != null) {
log.error("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}",
log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}",
groupId, topicIdPartition, memberId, exception);
}
// Even if write share group state RPC call fails, we will still go ahead with the state transition.
@ -2575,6 +2578,14 @@ public class SharePartition {
return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive);
}
private void logError(String message, Throwable e) {
if (e instanceof NetworkException) {
log.debug(message, e);
} else {
log.error(message, e);
}
}
/**
* This function filters out the offsets present in the acquired records list that are also a part of batches that need to be archived.
* It follows an iterative refinement of acquired records to eliminate batches to be archived.

View File

@ -139,6 +139,9 @@ public class DefaultStatePersister implements Persister {
.computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
.computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>());
log.debug("{}-{}-{}: stateEpoch - {}, leaderEpoch - {}.",
groupId, topicData.topicId(), partitionData.partition(), partitionData.stateEpoch(), partitionData.leaderEpoch());
handlers.add(
stateManager.new WriteStateHandler(
groupId,

View File

@ -374,7 +374,7 @@ public class PersisterStateManager {
// We don't know if FIND_COORD or actual REQUEST. Let's err on side of request.
if (response == null) {
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response"));
requestErrorResponse(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response (response = null)"));
sender.wakeup();
return;
}
@ -399,16 +399,17 @@ public class PersisterStateManager {
return Optional.empty();
}
log.error("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response);
log.debug("Response for RPC {} with key {} is invalid - {}.", name(), this.partitionKey, response);
if (response.wasDisconnected()) {
errorConsumer.accept(Errors.NETWORK_EXCEPTION, null);
errorConsumer.accept(Errors.NETWORK_EXCEPTION, new NetworkException("Server response indicates disconnect."));
return Optional.of(Errors.NETWORK_EXCEPTION);
} else if (response.wasTimedOut()) {
errorConsumer.accept(Errors.REQUEST_TIMED_OUT, null);
log.error("Response for RPC {} with key {} timed out - {}.", name(), this.partitionKey, response);
errorConsumer.accept(Errors.REQUEST_TIMED_OUT, new NetworkException("Server response indicates timeout."));
return Optional.of(Errors.REQUEST_TIMED_OUT);
} else {
errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Did not receive any response"));
errorConsumer.accept(Errors.UNKNOWN_SERVER_ERROR, new NetworkException("Server did not provide any response."));
return Optional.of(Errors.UNKNOWN_SERVER_ERROR);
}
}
@ -453,7 +454,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE: // retriable error codes
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message());
log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message());
if (!findCoordBackoff.canAttempt()) {
log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey());
findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success."));
@ -580,7 +581,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message());
log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message());
if (!initializeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete initialize state RPC without success."));
@ -738,7 +739,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message());
log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message());
if (!writeStateBackoff.canAttempt()) {
log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete write state RPC without success."));
@ -880,7 +881,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message());
log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message());
if (!readStateBackoff.canAttempt()) {
log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete read state RPC without success."));
@ -1022,7 +1023,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message());
log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message());
if (!readStateSummaryBackoff.canAttempt()) {
log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success."));
@ -1161,7 +1162,7 @@ public class PersisterStateManager {
case COORDINATOR_NOT_AVAILABLE:
case COORDINATOR_LOAD_IN_PROGRESS:
case NOT_COORDINATOR:
log.warn("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message());
log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message());
if (!deleteStateBackoff.canAttempt()) {
log.error("Exhausted max retries for delete state RPC for key {} without success.", partitionKey());
requestErrorResponse(error, new Exception("Exhausted max retries to complete delete state RPC without success."));