KAFKA-19131: Adjust remote storage reader thread maximum pool size to avoid illegal argument (#19629)
CI / build (push) Has been cancelled Details

The remote storage reader thread pool use same count for both maximum
and core size. If users adjust the pool size larger than original value,
it throws `IllegalArgumentException`. Updated both value to fix the
issue.

cherry-pick PR: #19532
cherry-pick commit:
965743c35b

---------

Signed-off-by: PoAn Yang <payang@apache.org>

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
<payang@apache.org>

Co-authored-by: PoAn Yang <payang@apache.org>
This commit is contained in:
Kamal Chandraprakash 2025-05-04 18:53:16 +05:30 committed by GitHub
parent 0832c2ceb1
commit c2068878c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 2 deletions

View File

@ -303,8 +303,15 @@ public class RemoteLogManager implements Closeable {
public void resizeReaderThreadPool(int newSize) {
int currentSize = remoteStorageReaderThreadPool.getCorePoolSize();
int currentMaximumSize = remoteStorageReaderThreadPool.getMaximumPoolSize();
LOGGER.info("Updating remote reader thread pool size from {} to {}", currentSize, newSize);
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
if (newSize > currentMaximumSize) {
remoteStorageReaderThreadPool.setMaximumPoolSize(newSize);
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
} else {
remoteStorageReaderThreadPool.setCorePoolSize(newSize);
remoteStorageReaderThreadPool.setMaximumPoolSize(newSize);
}
}
private void removeMetrics() {
@ -313,6 +320,11 @@ public class RemoteLogManager implements Closeable {
remoteStorageReaderThreadPool.removeMetrics();
}
// Visible for testing
int readerThreadPoolSize() {
return remoteStorageReaderThreadPool.getCorePoolSize();
}
/**
* Returns the timeout for the RLM Tasks to wait for the quota to be available
*/

View File

@ -1156,7 +1156,7 @@ public class RemoteLogManagerTest {
safeLongYammerMetricValue("RemoteLogSizeComputationTime,topic=" + leaderTopic),
safeLongYammerMetricValue("RemoteLogSizeComputationTime")));
remoteLogSizeComputationTimeLatch.countDown();
TestUtils.waitForCondition(
() -> 0 == safeLongYammerMetricValue("RemoteCopyLagBytes") && 0 == safeLongYammerMetricValue("RemoteCopyLagBytes,topic=" + leaderTopic),
String.format("Expected to find 0 for RemoteCopyLagBytes metric value, but found %d for topic 'Leader' and %d for all topics.",
@ -3713,6 +3713,15 @@ public class RemoteLogManagerTest {
verifyNoMoreInteractions(remoteStorageManager);
}
@Test
void testUpdateRemoteStorageReaderThreads() {
assertEquals(10, remoteLogManager.readerThreadPoolSize());
remoteLogManager.resizeReaderThreadPool(6);
assertEquals(6, remoteLogManager.readerThreadPoolSize());
remoteLogManager.resizeReaderThreadPool(12);
assertEquals(12, remoteLogManager.readerThreadPoolSize());
}
private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException {
byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
Compression compression = Compression.NONE;