KAFKA-19523: Gracefully handle error while building remoteLogAuxState (#20201)
CI / build (push) Waiting to run Details

Improve the error handling while building the remote-log-auxiliary state
when a follower node with an empty disk begin to synchronise with the
leader. If the topic has remote storage enabled, then the
ReplicaFetcherThread attempt to build the remote-log-auxiliary state.
Note that the remote-log-auxiliary state gets invoked only when the
leader-log-start-offset is non-zero and leader-log-start-offset is not
equal to leader-local-log-start-offset.

When the LeaderAndISR request is received, then the
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
followed by the RemoteLogManager#onLeadershipChange call. As a result,
when ReplicaFetcherThread initiates the
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not
have been initialized at that time and throws retriable exception.

Introduced RetriableRemoteStorageException to gracefully handle the
error.

After the patch:
```
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1,
fetcherId=0] Could not build remote log auxiliary state for orange-1 due
to error: RemoteLogManager is not ready for partition: orange-1
(kafka.server.ReplicaFetcherThread)
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2,
fetcherId=0] Could not build remote log auxiliary state for orange-0 due
to error: RemoteLogManager is not ready for partition: orange-0
(kafka.server.ReplicaFetcherThread)
```

Reviewers: Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
This commit is contained in:
Kamal Chandraprakash 2025-07-23 19:29:31 +05:30 committed by GitHub
parent 0086f24101
commit 93adaea599
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 110 additions and 2 deletions

View File

@ -35,6 +35,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LogFileUtils;
@ -230,6 +231,10 @@ public class TierStateMachine {
}
}
if (!rlm.isPartitionReady(topicPartition)) {
throw new RemoteStorageNotReadyException("RemoteLogManager is not ready for partition: " + topicPartition);
}
RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
.orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch,
leaderLocalLogStartOffset, leaderLogStartOffset));

View File

@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.ResultWithPartitions
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.LogAppendInfo
@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String,
onPartitionFenced(topicPartition, leaderEpochInRequest)
case e@(_: UnknownTopicOrPartitionException |
_: UnknownLeaderEpochException |
_: NotLeaderOrFollowerException) =>
_: NotLeaderOrFollowerException |
_: RetriableRemoteStorageException) =>
info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}")
false
case e: Throwable =>

View File

@ -3717,6 +3717,7 @@ class ReplicaManagerTest {
val storageManager = mock(classOf[RemoteStorageManager])
when(storageManager.fetchIndex(any(), any())).thenReturn(new ByteArrayInputStream("0".getBytes()))
when(remoteLogManager.storageManager()).thenReturn(storageManager)
when(remoteLogManager.isPartitionReady(any())).thenReturn(true)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
try {
@ -3775,7 +3776,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(delta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
// We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata
// We expect failedBuildRemoteLogAuxStateRate to increase because the RemoteLogManager is not ready for the tp0
// when attempting to build log aux state
TestUtils.waitUntilTrue(() => brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0,
"Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
/**
* This exception is thrown when a remote storage operation cannot proceed because the remote storage is not ready.
* This may occur in situations where the remote storage (or) metadata layer is initializing, unreachable,
* or temporarily unavailable.
* <p>
* Instances of this exception indicate that the error is retriable, and the operation might
* succeed if attempted again when the remote storage (or) metadata layer becomes operational.
*/
public class RemoteStorageNotReadyException extends RetriableRemoteStorageException {
public RemoteStorageNotReadyException(String message) {
super(message);
}
public RemoteStorageNotReadyException(String message, Throwable cause) {
super(message, cause);
}
public RemoteStorageNotReadyException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.log.remote.storage;
/**
* Represents an exception that indicates a retriable error occurred during remote storage operations.
* This exception is thrown when an operation against a remote storage system has failed due to transient
* or temporary issues, and the operation has a reasonable chance of succeeding if retried.
*/
public class RetriableRemoteStorageException extends RemoteStorageException {
private static final long serialVersionUID = 1L;
public RetriableRemoteStorageException(String message) {
super(message);
}
public RetriableRemoteStorageException(String message, Throwable cause) {
super(message, cause);
}
public RetriableRemoteStorageException(Throwable cause) {
super(cause);
}
}

View File

@ -788,6 +788,15 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
return null;
}
public boolean isPartitionReady(TopicPartition partition) {
Uuid uuid = topicIdByPartitionMap.get(partition);
if (uuid == null) {
return false;
}
TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, partition);
return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition);
}
abstract class RLMTask extends CancellableRunnable {
protected final TopicIdPartition topicIdPartition;

View File

@ -3726,6 +3726,18 @@ public class RemoteLogManagerTest {
verifyNoMoreInteractions(remoteStorageManager);
}
@Test
public void testIsPartitionReady() throws InterruptedException {
assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
remoteLogManager.onLeadershipChange(
Set.of(mockPartition(leaderTopicIdPartition)),
Set.of(mockPartition(followerTopicIdPartition)),
topicIds
);
assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition()));
}
@Test
public void testMonitorableRemoteLogStorageManager() throws IOException {
Properties props = new Properties();