diff --git a/core/src/main/java/kafka/server/TierStateMachine.java b/core/src/main/java/kafka/server/TierStateMachine.java index ede941907f2..9d8dcafd203 100644 --- a/core/src/main/java/kafka/server/TierStateMachine.java +++ b/core/src/main/java/kafka/server/TierStateMachine.java @@ -35,6 +35,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.LogFileUtils; @@ -230,6 +231,10 @@ public class TierStateMachine { } } + if (!rlm.isPartitionReady(topicPartition)) { + throw new RemoteStorageNotReadyException("RemoteLogManager is not ready for partition: " + topicPartition); + } + RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) .orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch, leaderLocalLogStartOffset, leaderLogStartOffset)); diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 03a46f4fba8..8dd621d1950 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint import org.apache.kafka.server.ResultWithPartitions import org.apache.kafka.server.ReplicaState import org.apache.kafka.server.PartitionFetchState +import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.storage.internals.log.LogAppendInfo @@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String, onPartitionFenced(topicPartition, leaderEpochInRequest) case e@(_: UnknownTopicOrPartitionException | _: UnknownLeaderEpochException | - _: NotLeaderOrFollowerException) => + _: NotLeaderOrFollowerException | + _: RetriableRemoteStorageException) => info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}") false case e: Throwable => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 623c282185b..c5393117bdc 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3717,6 +3717,7 @@ class ReplicaManagerTest { val storageManager = mock(classOf[RemoteStorageManager]) when(storageManager.fetchIndex(any(), any())).thenReturn(new ByteArrayInputStream("0".getBytes())) when(remoteLogManager.storageManager()).thenReturn(storageManager) + when(remoteLogManager.isPartitionReady(any())).thenReturn(true) val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true) try { @@ -3775,7 +3776,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(delta, leaderMetadataImage) // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing - // We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata + // We expect failedBuildRemoteLogAuxStateRate to increase because the RemoteLogManager is not ready for the tp0 // when attempting to build log aux state TestUtils.waitUntilTrue(() => brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0, "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java new file mode 100644 index 00000000000..e36fd307a51 --- /dev/null +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +/** + * This exception is thrown when a remote storage operation cannot proceed because the remote storage is not ready. + * This may occur in situations where the remote storage (or) metadata layer is initializing, unreachable, + * or temporarily unavailable. + *
+ * Instances of this exception indicate that the error is retriable, and the operation might + * succeed if attempted again when the remote storage (or) metadata layer becomes operational. + */ +public class RemoteStorageNotReadyException extends RetriableRemoteStorageException { + + public RemoteStorageNotReadyException(String message) { + super(message); + } + + public RemoteStorageNotReadyException(String message, Throwable cause) { + super(message, cause); + } + + public RemoteStorageNotReadyException(Throwable cause) { + super(cause); + } +} diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java new file mode 100644 index 00000000000..de180ebbaa8 --- /dev/null +++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +/** + * Represents an exception that indicates a retriable error occurred during remote storage operations. + * This exception is thrown when an operation against a remote storage system has failed due to transient + * or temporary issues, and the operation has a reasonable chance of succeeding if retried. + */ +public class RetriableRemoteStorageException extends RemoteStorageException { + + private static final long serialVersionUID = 1L; + + public RetriableRemoteStorageException(String message) { + super(message); + } + + public RetriableRemoteStorageException(String message, Throwable cause) { + super(message, cause); + } + + public RetriableRemoteStorageException(Throwable cause) { + super(cause); + } +} diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 9031de010c8..5ca596ec7b7 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -788,6 +788,15 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { return null; } + public boolean isPartitionReady(TopicPartition partition) { + Uuid uuid = topicIdByPartitionMap.get(partition); + if (uuid == null) { + return false; + } + TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, partition); + return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition); + } + abstract class RLMTask extends CancellableRunnable { protected final TopicIdPartition topicIdPartition; diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 182fda9abb9..99d9d43c9a1 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -3726,6 +3726,18 @@ public class RemoteLogManagerTest { verifyNoMoreInteractions(remoteStorageManager); } + @Test + public void testIsPartitionReady() throws InterruptedException { + assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition())); + remoteLogManager.onLeadershipChange( + Set.of(mockPartition(leaderTopicIdPartition)), + Set.of(mockPartition(followerTopicIdPartition)), + topicIds + ); + assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition())); + assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition())); + } + @Test public void testMonitorableRemoteLogStorageManager() throws IOException { Properties props = new Properties();