mirror of https://github.com/apache/kafka.git
KAFKA-19597: Stop the RSM after closing the remote-log reader threads to handle requests gracefully (#20342)
During shutdown, when the RSM closes first, then the ongoing requests might throw an error. To handle the ongoing requests gracefully, closing the RSM after closing the remote-log reader thread pools. Reviewers: Satish Duggana <satishd@apache.org>
This commit is contained in:
parent
0da9cacffa
commit
f0c3d93104
|
@ -2038,9 +2038,6 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
||||||
leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
||||||
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
||||||
followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
|
||||||
Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin");
|
|
||||||
Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin");
|
|
||||||
Utils.closeQuietly(indexCache, "RemoteIndexCache");
|
|
||||||
|
|
||||||
rlmCopyThreadPool.close();
|
rlmCopyThreadPool.close();
|
||||||
rlmExpirationThreadPool.close();
|
rlmExpirationThreadPool.close();
|
||||||
|
@ -2050,10 +2047,13 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
|
||||||
} finally {
|
} finally {
|
||||||
removeMetrics();
|
removeMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
leaderCopyRLMTasks.clear();
|
leaderCopyRLMTasks.clear();
|
||||||
leaderExpirationRLMTasks.clear();
|
leaderExpirationRLMTasks.clear();
|
||||||
followerRLMTasks.clear();
|
followerRLMTasks.clear();
|
||||||
|
|
||||||
|
Utils.closeQuietly(indexCache, "RemoteIndexCache");
|
||||||
|
Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin");
|
||||||
|
Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin");
|
||||||
closed = true;
|
closed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1769,9 +1769,9 @@ public class RemoteLogManagerTest {
|
||||||
void testIdempotentClose() throws IOException {
|
void testIdempotentClose() throws IOException {
|
||||||
remoteLogManager.close();
|
remoteLogManager.close();
|
||||||
remoteLogManager.close();
|
remoteLogManager.close();
|
||||||
InOrder inorder = inOrder(remoteStorageManager, remoteLogMetadataManager);
|
InOrder inorder = inOrder(remoteLogMetadataManager, remoteStorageManager);
|
||||||
inorder.verify(remoteStorageManager, times(1)).close();
|
|
||||||
inorder.verify(remoteLogMetadataManager, times(1)).close();
|
inorder.verify(remoteLogMetadataManager, times(1)).close();
|
||||||
|
inorder.verify(remoteStorageManager, times(1)).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue