From 93adaea59990da36730b6f07675cb5ca9a54ff43 Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 23 Jul 2025 19:29:31 +0530 Subject: [PATCH] KAFKA-19523: Gracefully handle error while building remoteLogAuxState (#20201) Improve the error handling while building the remote-log-auxiliary state when a follower node with an empty disk begin to synchronise with the leader. If the topic has remote storage enabled, then the ReplicaFetcherThread attempt to build the remote-log-auxiliary state. Note that the remote-log-auxiliary state gets invoked only when the leader-log-start-offset is non-zero and leader-log-start-offset is not equal to leader-local-log-start-offset. When the LeaderAndISR request is received, then the ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, followed by the RemoteLogManager#onLeadershipChange call. As a result, when ReplicaFetcherThread initiates the RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been initialized at that time and throws retriable exception. Introduced RetriableRemoteStorageException to gracefully handle the error. After the patch: ``` [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Could not build remote log auxiliary state for orange-1 due to error: RemoteLogManager is not ready for partition: orange-1 (kafka.server.ReplicaFetcherThread) [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Could not build remote log auxiliary state for orange-0 due to error: RemoteLogManager is not ready for partition: orange-0 (kafka.server.ReplicaFetcherThread) ``` Reviewers: Luke Chen , Satish Duggana --- .../java/kafka/server/TierStateMachine.java | 5 +++ .../kafka/server/AbstractFetcherThread.scala | 4 +- .../kafka/server/ReplicaManagerTest.scala | 3 +- .../RemoteStorageNotReadyException.java | 40 +++++++++++++++++++ .../RetriableRemoteStorageException.java | 39 ++++++++++++++++++ .../log/remote/storage/RemoteLogManager.java | 9 +++++ .../remote/storage/RemoteLogManagerTest.java | 12 ++++++ 7 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java create mode 100644 storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java index ede941907f2..9d8dcafd203 100644 --- a/core/src/main/java/kafka/server/TierStateMachine.java +++ b/core/src/main/java/kafka/server/TierStateMachine.java @@ -35,6 +35,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.LogFileUtils; @@ -230,6 +231,10 @@ public class TierStateMachine { } } + if (!rlm.isPartitionReady(topicPartition)) { + throw new RemoteStorageNotReadyException("RemoteLogManager is not ready for partition: " + topicPartition); + } + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) .orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch, leaderLocalLogStartOffset, leaderLogStartOffset)); diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 03a46f4fba8..8dd621d1950 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint import org.apache.kafka.server.ResultWithPartitions import org.apache.kafka.server.ReplicaState import org.apache.kafka.server.PartitionFetchState +import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.storage.internals.log.LogAppendInfo @@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String, onPartitionFenced(topicPartition, leaderEpochInRequest) case e@(_: UnknownTopicOrPartitionException | _: UnknownLeaderEpochException | - _: NotLeaderOrFollowerException) => + _: NotLeaderOrFollowerException | + _: RetriableRemoteStorageException) => info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}") false case e: Throwable => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 623c282185b..c5393117bdc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3717,6 +3717,7 @@ class ReplicaManagerTest { val storageManager = mock(classOf[RemoteStorageManager]) when(storageManager.fetchIndex(any(), any())).thenReturn(new ByteArrayInputStream("0".getBytes())) when(remoteLogManager.storageManager()).thenReturn(storageManager) + when(remoteLogManager.isPartitionReady(any())).thenReturn(true) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) try { @@ -3775,7 +3776,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(delta, leaderMetadataImage) // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing - // We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata + // We expect failedBuildRemoteLogAuxStateRate to increase because the RemoteLogManager is not ready for the tp0 // when attempting to build log aux state TestUtils.waitUntilTrue(() => brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0, "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java new file mode 100644 index 00000000000..e36fd307a51 --- /dev/null +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +/** + * This exception is thrown when a remote storage operation cannot proceed because the remote storage is not ready. + * This may occur in situations where the remote storage (or) metadata layer is initializing, unreachable, + * or temporarily unavailable. + *

+ * Instances of this exception indicate that the error is retriable, and the operation might + * succeed if attempted again when the remote storage (or) metadata layer becomes operational. + */ +public class RemoteStorageNotReadyException extends RetriableRemoteStorageException { + + public RemoteStorageNotReadyException(String message) { + super(message); + } + + public RemoteStorageNotReadyException(String message, Throwable cause) { + super(message, cause); + } + + public RemoteStorageNotReadyException(Throwable cause) { + super(cause); + } +} diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java new file mode 100644 index 00000000000..de180ebbaa8 --- /dev/null +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +/** + * Represents an exception that indicates a retriable error occurred during remote storage operations. + * This exception is thrown when an operation against a remote storage system has failed due to transient + * or temporary issues, and the operation has a reasonable chance of succeeding if retried. + */ +public class RetriableRemoteStorageException extends RemoteStorageException { + + private static final long serialVersionUID = 1L; + + public RetriableRemoteStorageException(String message) { + super(message); + } + + public RetriableRemoteStorageException(String message, Throwable cause) { + super(message, cause); + } + + public RetriableRemoteStorageException(Throwable cause) { + super(cause); + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 9031de010c8..5ca596ec7b7 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -788,6 +788,15 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { return null; } + public boolean isPartitionReady(TopicPartition partition) { + Uuid uuid = topicIdByPartitionMap.get(partition); + if (uuid == null) { + return false; + } + TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, partition); + return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition); + } + abstract class RLMTask extends CancellableRunnable { protected final TopicIdPartition topicIdPartition; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 182fda9abb9..99d9d43c9a1 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -3726,6 +3726,18 @@ public class RemoteLogManagerTest { verifyNoMoreInteractions(remoteStorageManager); } + @Test + public void testIsPartitionReady() throws InterruptedException { + assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition())); + remoteLogManager.onLeadershipChange( + Set.of(mockPartition(leaderTopicIdPartition)), + Set.of(mockPartition(followerTopicIdPartition)), + topicIds + ); + assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition())); + assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition())); + } + @Test public void testMonitorableRemoteLogStorageManager() throws IOException { Properties props = new Properties();