mirror of https://github.com/apache/kafka.git
KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820)
- Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956 - Added unit-tests for the copy throttling logic. Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
This commit is contained in:
parent
2fa2c72581
commit
23fe71d579
|
@ -38,6 +38,7 @@
|
|||
<allow pkg="org.apache.kafka.common" />
|
||||
<allow pkg="org.mockito" class="AssignmentsManagerTest"/>
|
||||
<allow pkg="org.apache.kafka.server"/>
|
||||
<allow pkg="org.opentest4j" class="RemoteLogManagerTest"/>
|
||||
<!-- see KIP-544 for why KafkaYammerMetrics should be used instead of the global default yammer metrics registry
|
||||
https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable -->
|
||||
<disallow class="com.yammer.metrics.Metrics" />
|
||||
|
|
|
@ -97,6 +97,7 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -123,6 +124,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable {
|
|||
|
||||
private final RemoteLogMetadataManager remoteLogMetadataManager;
|
||||
|
||||
private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
|
||||
private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition();
|
||||
private final RLMQuotaManager rlmCopyQuotaManager;
|
||||
private final RLMQuotaManager rlmFetchQuotaManager;
|
||||
|
||||
|
@ -250,6 +255,13 @@ public class RemoteLogManager implements Closeable {
|
|||
remoteStorageReaderThreadPool.removeMetrics();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the timeout for the RLM Tasks to wait for the quota to be available
|
||||
*/
|
||||
Duration quotaTimeout() {
|
||||
return Duration.ofSeconds(1);
|
||||
}
|
||||
|
||||
RLMQuotaManager createRLMCopyQuotaManager() {
|
||||
return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$,
|
||||
"Tracking copy byte-rate for Remote Log Manager", time);
|
||||
|
@ -763,6 +775,23 @@ public class RemoteLogManager implements Closeable {
|
|||
isCancelled(), isLeader());
|
||||
return;
|
||||
}
|
||||
|
||||
copyQuotaManagerLock.lock();
|
||||
try {
|
||||
while (rlmCopyQuotaManager.isQuotaExceeded()) {
|
||||
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
|
||||
// If the thread gets interrupted while waiting, the InterruptedException is thrown
|
||||
// back to the caller. It's important to note that the task being executed is already
|
||||
// cancelled before the executing thread is interrupted. The caller is responsible
|
||||
// for handling the exception gracefully by checking if the task is already cancelled.
|
||||
boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
|
||||
// Signal waiting threads to check the quota again
|
||||
copyQuotaManagerLockCondition.signalAll();
|
||||
} finally {
|
||||
copyQuotaManagerLock.unlock();
|
||||
}
|
||||
copyLogSegment(log, candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import com.yammer.metrics.core.MetricName;
|
|||
import kafka.cluster.EndPoint;
|
||||
import kafka.cluster.Partition;
|
||||
import kafka.log.UnifiedLog;
|
||||
import kafka.log.remote.quota.RLMQuotaManager;
|
||||
import kafka.log.remote.quota.RLMQuotaManagerConfig;
|
||||
import kafka.server.BrokerTopicStats;
|
||||
import kafka.server.KafkaConfig;
|
||||
|
@ -87,6 +88,7 @@ import org.mockito.ArgumentCaptor;
|
|||
import org.mockito.InOrder;
|
||||
import org.mockito.MockedConstruction;
|
||||
import org.mockito.Mockito;
|
||||
import org.opentest4j.AssertionFailedError;
|
||||
import scala.Option;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
|
@ -101,6 +103,7 @@ import java.io.UncheckedIOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -146,6 +149,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -153,6 +157,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
|
|||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
@ -187,6 +192,7 @@ public class RemoteLogManagerTest {
|
|||
|
||||
private final RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
|
||||
private final RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
|
||||
private final RLMQuotaManager rlmCopyQuotaManager = mock(RLMQuotaManager.class);
|
||||
private RemoteLogManagerConfig remoteLogManagerConfig = null;
|
||||
|
||||
private BrokerTopicStats brokerTopicStats = null;
|
||||
|
@ -230,6 +236,12 @@ public class RemoteLogManagerTest {
|
|||
public RemoteLogMetadataManager createRemoteLogMetadataManager() {
|
||||
return remoteLogMetadataManager;
|
||||
}
|
||||
public RLMQuotaManager createRLMCopyQuotaManager() {
|
||||
return rlmCopyQuotaManager;
|
||||
}
|
||||
public Duration quotaTimeout() {
|
||||
return Duration.ofMillis(100);
|
||||
}
|
||||
@Override
|
||||
long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
|
||||
return 0L;
|
||||
|
@ -2735,6 +2747,204 @@ public class RemoteLogManagerTest {
|
|||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testCopyQuota(boolean quotaExceeded) throws Exception {
|
||||
RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
|
||||
|
||||
if (quotaExceeded) {
|
||||
// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded
|
||||
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));
|
||||
|
||||
// Verify the highest offset in remote storage is updated only once
|
||||
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
|
||||
// Verify the highest offset in remote storage was -1L before the copy started
|
||||
assertEquals(-1L, capture.getValue());
|
||||
} else {
|
||||
// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability
|
||||
assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog));
|
||||
|
||||
// Verify quota check was performed
|
||||
verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
|
||||
// Verify bytes to copy was recorded with the quota manager
|
||||
verify(rlmCopyQuotaManager, times(1)).record(10);
|
||||
|
||||
// Verify the highest offset in remote storage is updated
|
||||
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
|
||||
List<Long> capturedValues = capture.getAllValues();
|
||||
// Verify the highest offset in remote storage was -1L before the copy
|
||||
assertEquals(-1L, capturedValues.get(0).longValue());
|
||||
// Verify it was updated to 149L after the copy
|
||||
assertEquals(149L, capturedValues.get(1).longValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRLMShutdownDuringQuotaExceededScenario() throws Exception {
|
||||
remoteLogManager.startup();
|
||||
setupRLMTask(true);
|
||||
remoteLogManager.onLeadershipChange(
|
||||
Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds);
|
||||
// Ensure the copy operation is waiting for quota to be available
|
||||
TestUtils.waitForCondition(() -> {
|
||||
verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded();
|
||||
return true;
|
||||
}, "Quota exceeded check did not happen");
|
||||
// Verify RLM is able to shut down
|
||||
assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close());
|
||||
}
|
||||
|
||||
// helper method to set up a RemoteLogManager.RLMTask for testing copy quota behaviour
|
||||
private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded) throws RemoteStorageException, IOException {
|
||||
long oldSegmentStartOffset = 0L;
|
||||
long nextSegmentStartOffset = 150L;
|
||||
|
||||
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
|
||||
|
||||
// leader epoch preparation
|
||||
checkpoint.write(totalEpochEntries);
|
||||
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
|
||||
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
|
||||
when(mockLog.parentDir()).thenReturn("dir1");
|
||||
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
|
||||
|
||||
// create 2 log segments, with 0 and 150 as log start offset
|
||||
LogSegment oldSegment = mock(LogSegment.class);
|
||||
LogSegment activeSegment = mock(LogSegment.class);
|
||||
|
||||
File tempFile = TestUtils.tempFile();
|
||||
FileRecords fileRecords = mock(FileRecords.class);
|
||||
when(fileRecords.file()).thenReturn(tempFile);
|
||||
when(fileRecords.sizeInBytes()).thenReturn(10);
|
||||
|
||||
// Set up the segment that is eligible for copy
|
||||
when(oldSegment.log()).thenReturn(fileRecords);
|
||||
when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
|
||||
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
|
||||
|
||||
// set up the active segment
|
||||
when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
|
||||
|
||||
when(mockLog.activeSegment()).thenReturn(activeSegment);
|
||||
when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
|
||||
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
|
||||
|
||||
File mockProducerSnapshotIndex = TestUtils.tempFile();
|
||||
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
|
||||
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
|
||||
|
||||
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
|
||||
when(mockLog.lastStableOffset()).thenReturn(250L);
|
||||
|
||||
File tempDir = TestUtils.tempDirectory();
|
||||
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
|
||||
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get();
|
||||
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
|
||||
txnFile.createNewFile();
|
||||
TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
|
||||
when(oldSegment.timeIndex()).thenReturn(timeIdx);
|
||||
when(oldSegment.offsetIndex()).thenReturn(idx);
|
||||
when(oldSegment.txnIndex()).thenReturn(txnIndex);
|
||||
|
||||
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
|
||||
dummyFuture.complete(null);
|
||||
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
|
||||
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
|
||||
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());
|
||||
|
||||
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
|
||||
doNothing().when(rlmCopyQuotaManager).record(anyInt());
|
||||
|
||||
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
|
||||
task.convertToLeader(2);
|
||||
return task;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyThrottling() throws Exception {
|
||||
long oldestSegmentStartOffset = 0L;
|
||||
|
||||
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
|
||||
|
||||
// leader epoch preparation
|
||||
checkpoint.write(totalEpochEntries);
|
||||
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint, scheduler);
|
||||
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
|
||||
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
|
||||
|
||||
// create 3 log segments
|
||||
LogSegment segmentToCopy = mock(LogSegment.class);
|
||||
LogSegment segmentToThrottle = mock(LogSegment.class);
|
||||
LogSegment activeSegment = mock(LogSegment.class);
|
||||
|
||||
File tempFile = TestUtils.tempFile();
|
||||
FileRecords fileRecords = mock(FileRecords.class);
|
||||
when(fileRecords.file()).thenReturn(tempFile);
|
||||
when(fileRecords.sizeInBytes()).thenReturn(10);
|
||||
|
||||
// set up the segment that will be copied
|
||||
when(segmentToCopy.log()).thenReturn(fileRecords);
|
||||
when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
|
||||
when(segmentToCopy.readNextOffset()).thenReturn(100L);
|
||||
|
||||
// set up the segment that will not be copied because of hitting quota
|
||||
when(segmentToThrottle.log()).thenReturn(fileRecords);
|
||||
when(segmentToThrottle.baseOffset()).thenReturn(100L);
|
||||
when(segmentToThrottle.readNextOffset()).thenReturn(150L);
|
||||
|
||||
// set up the active segment
|
||||
when(activeSegment.log()).thenReturn(fileRecords);
|
||||
when(activeSegment.baseOffset()).thenReturn(150L);
|
||||
|
||||
when(mockLog.activeSegment()).thenReturn(activeSegment);
|
||||
when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
|
||||
when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy, segmentToThrottle, activeSegment)));
|
||||
|
||||
File mockProducerSnapshotIndex = TestUtils.tempFile();
|
||||
ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
|
||||
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
|
||||
|
||||
when(mockLog.producerStateManager()).thenReturn(mockStateManager);
|
||||
when(mockLog.lastStableOffset()).thenReturn(250L);
|
||||
|
||||
File tempDir = TestUtils.tempDirectory();
|
||||
OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
|
||||
TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1500).get();
|
||||
File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldestSegmentStartOffset, "");
|
||||
txnFile.createNewFile();
|
||||
TransactionIndex txnIndex = new TransactionIndex(oldestSegmentStartOffset, txnFile);
|
||||
when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
|
||||
when(segmentToCopy.offsetIndex()).thenReturn(idx);
|
||||
when(segmentToCopy.txnIndex()).thenReturn(txnIndex);
|
||||
|
||||
CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
|
||||
dummyFuture.complete(null);
|
||||
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
|
||||
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
|
||||
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty());
|
||||
|
||||
// After the first call, isQuotaExceeded should return true
|
||||
when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
|
||||
doNothing().when(rlmCopyQuotaManager).record(anyInt());
|
||||
|
||||
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
|
||||
task.convertToLeader(2);
|
||||
|
||||
// Verify that the copy operation times out, since the second segment cannot be copied due to quota being exceeded
|
||||
assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog)));
|
||||
|
||||
// Verify the highest offset in remote storage is updated corresponding to the only segment that was copied
|
||||
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
|
||||
verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
|
||||
List<Long> capturedValues = capture.getAllValues();
|
||||
// Verify the highest offset in remote storage was -1L before the copy
|
||||
assertEquals(-1L, capturedValues.get(0).longValue());
|
||||
// Verify it was updated to 99L after the copy
|
||||
assertEquals(99L, capturedValues.get(1).longValue());
|
||||
}
|
||||
|
||||
private Partition mockPartition(TopicIdPartition topicIdPartition) {
|
||||
TopicPartition tp = topicIdPartition.topicPartition();
|
||||
Partition partition = mock(Partition.class);
|
||||
|
|
Loading…
Reference in New Issue