mirror of https://github.com/apache/kafka.git
KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778)
KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
7ddfa64759
commit
02c794dfd3
|
@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
|
|||
* <code>fetch.max.wait.ms</code>
|
||||
*/
|
||||
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
|
||||
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
|
||||
private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " +
|
||||
"answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " +
|
||||
"fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " +
|
||||
"time, please refer to 'remote.fetch.max.wait.ms' broker config";
|
||||
public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
|
||||
|
||||
/** <code>metadata.max.age.ms</code> */
|
||||
|
|
|
@ -35,12 +35,13 @@ import scala.collection._
|
|||
class DelayedRemoteFetch(remoteFetchTask: Future[Void],
|
||||
remoteFetchResult: CompletableFuture[RemoteLogReadResult],
|
||||
remoteFetchInfo: RemoteStorageFetchInfo,
|
||||
remoteFetchMaxWaitMs: Long,
|
||||
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
|
||||
fetchParams: FetchParams,
|
||||
localReadResults: Seq[(TopicIdPartition, LogReadResult)],
|
||||
replicaManager: ReplicaManager,
|
||||
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
|
||||
extends DelayedOperation(fetchParams.maxWaitMs) {
|
||||
extends DelayedOperation(remoteFetchMaxWaitMs) {
|
||||
|
||||
if (fetchParams.isFromFollower) {
|
||||
throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams")
|
||||
|
|
|
@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
return Some(createLogReadResult(e))
|
||||
}
|
||||
|
||||
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
|
||||
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
|
||||
val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
|
||||
fetchPartitionStatus, params, logReadResults, this, responseCallback)
|
||||
|
||||
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
|
||||
|
|
|
@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
|
|||
private val fetchOffset = 500L
|
||||
private val logStartOffset = 0L
|
||||
private val currentLeaderEpoch = Optional.of[Integer](10)
|
||||
private val remoteFetchMaxWaitMs = 500
|
||||
|
||||
private val fetchStatus = FetchPartitionStatus(
|
||||
startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
|
||||
|
@ -64,8 +65,8 @@ class DelayedRemoteFetchTest {
|
|||
val leaderLogStartOffset = 10
|
||||
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
|
||||
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
|
||||
.thenReturn(mock(classOf[Partition]))
|
||||
|
@ -100,8 +101,8 @@ class DelayedRemoteFetchTest {
|
|||
val leaderLogStartOffset = 10
|
||||
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
|
||||
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
|
||||
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
|
||||
assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
|
||||
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -124,8 +125,8 @@ class DelayedRemoteFetchTest {
|
|||
|
||||
val logReadInfo = buildReadResult(Errors.NONE)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
|
||||
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
// delayed remote fetch should still be able to complete
|
||||
assertTrue(delayedRemoteFetch.tryComplete())
|
||||
|
@ -155,8 +156,8 @@ class DelayedRemoteFetchTest {
|
|||
// build a read result with error
|
||||
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
|
||||
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
assertTrue(delayedRemoteFetch.tryComplete())
|
||||
assertTrue(delayedRemoteFetch.isCompleted)
|
||||
|
@ -184,8 +185,8 @@ class DelayedRemoteFetchTest {
|
|||
val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
|
||||
val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
|
||||
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
|
||||
Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, fetchInfo, remoteFetchMaxWaitMs,
|
||||
Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
|
||||
|
||||
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
|
||||
.thenReturn(mock(classOf[Partition]))
|
||||
|
|
|
@ -54,7 +54,6 @@ public final class RemoteLogManagerConfig {
|
|||
"implementation. For example this value can be `rlmm.config.`.";
|
||||
public static final String DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
|
||||
|
||||
|
||||
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = "remote.log.storage.system.enable";
|
||||
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether to enable tiered storage functionality in a broker or not. Valid values " +
|
||||
"are `true` or `false` and the default value is false. When it is true broker starts all the services required for the tiered storage functionality.";
|
||||
|
@ -185,10 +184,15 @@ public final class RemoteLogManagerConfig {
|
|||
"The default value is 1 second.";
|
||||
public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
|
||||
|
||||
public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = "remote.fetch.max.wait.ms";
|
||||
public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will wait before answering the remote fetch request";
|
||||
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
|
||||
|
||||
public static final ConfigDef CONFIG_DEF = new ConfigDef();
|
||||
|
||||
static {
|
||||
CONFIG_DEF.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
|
||||
CONFIG_DEF
|
||||
.define(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
|
||||
BOOLEAN,
|
||||
DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE,
|
||||
null,
|
||||
|
@ -345,7 +349,13 @@ public final class RemoteLogManagerConfig {
|
|||
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS,
|
||||
atLeast(1),
|
||||
MEDIUM,
|
||||
REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC);
|
||||
REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_DOC)
|
||||
.define(REMOTE_FETCH_MAX_WAIT_MS_PROP,
|
||||
INT,
|
||||
DEFAULT_REMOTE_FETCH_MAX_WAIT_MS,
|
||||
atLeast(1),
|
||||
MEDIUM,
|
||||
REMOTE_FETCH_MAX_WAIT_MS_DOC);
|
||||
}
|
||||
|
||||
private final boolean enableRemoteStorageSystem;
|
||||
|
@ -375,6 +385,7 @@ public final class RemoteLogManagerConfig {
|
|||
private final long remoteLogManagerFetchMaxBytesPerSecond;
|
||||
private final int remoteLogManagerFetchNumQuotaSamples;
|
||||
private final int remoteLogManagerFetchQuotaWindowSizeSeconds;
|
||||
private final int remoteFetchMaxWaitMs;
|
||||
|
||||
public RemoteLogManagerConfig(AbstractConfig config) {
|
||||
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
|
||||
|
@ -407,7 +418,8 @@ public final class RemoteLogManagerConfig {
|
|||
config.getInt(REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP),
|
||||
config.getLong(REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP),
|
||||
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP),
|
||||
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP));
|
||||
config.getInt(REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP),
|
||||
config.getInt(REMOTE_FETCH_MAX_WAIT_MS_PROP));
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
|
@ -437,8 +449,8 @@ public final class RemoteLogManagerConfig {
|
|||
int remoteLogManagerCopyQuotaWindowSizeSeconds,
|
||||
long remoteLogManagerFetchMaxBytesPerSecond,
|
||||
int remoteLogManagerFetchNumQuotaSamples,
|
||||
int remoteLogManagerFetchQuotaWindowSizeSeconds
|
||||
) {
|
||||
int remoteLogManagerFetchQuotaWindowSizeSeconds,
|
||||
int remoteFetchMaxWaitMs) {
|
||||
this.enableRemoteStorageSystem = enableRemoteStorageSystem;
|
||||
this.remoteStorageManagerClassName = remoteStorageManagerClassName;
|
||||
this.remoteStorageManagerClassPath = remoteStorageManagerClassPath;
|
||||
|
@ -466,6 +478,7 @@ public final class RemoteLogManagerConfig {
|
|||
this.remoteLogManagerFetchMaxBytesPerSecond = remoteLogManagerFetchMaxBytesPerSecond;
|
||||
this.remoteLogManagerFetchNumQuotaSamples = remoteLogManagerFetchNumQuotaSamples;
|
||||
this.remoteLogManagerFetchQuotaWindowSizeSeconds = remoteLogManagerFetchQuotaWindowSizeSeconds;
|
||||
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
|
||||
}
|
||||
|
||||
public boolean enableRemoteStorageSystem() {
|
||||
|
@ -576,6 +589,9 @@ public final class RemoteLogManagerConfig {
|
|||
return remoteLogManagerFetchQuotaWindowSizeSeconds;
|
||||
}
|
||||
|
||||
public int remoteFetchMaxWaitMs() {
|
||||
return remoteFetchMaxWaitMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
@ -608,12 +624,14 @@ public final class RemoteLogManagerConfig {
|
|||
&& remoteLogManagerCopyQuotaWindowSizeSeconds == that.remoteLogManagerCopyQuotaWindowSizeSeconds
|
||||
&& remoteLogManagerFetchMaxBytesPerSecond == that.remoteLogManagerFetchMaxBytesPerSecond
|
||||
&& remoteLogManagerFetchNumQuotaSamples == that.remoteLogManagerFetchNumQuotaSamples
|
||||
&& remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds;
|
||||
&& remoteLogManagerFetchQuotaWindowSizeSeconds == that.remoteLogManagerFetchQuotaWindowSizeSeconds
|
||||
&& remoteFetchMaxWaitMs == that.remoteFetchMaxWaitMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
|
||||
return Objects.hash(
|
||||
enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
|
||||
remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
|
||||
remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
|
||||
remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
|
||||
|
@ -621,7 +639,7 @@ public final class RemoteLogManagerConfig {
|
|||
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
|
||||
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
|
||||
remoteLogManagerCopyNumQuotaSamples, remoteLogManagerCopyQuotaWindowSizeSeconds, remoteLogManagerFetchMaxBytesPerSecond,
|
||||
remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds);
|
||||
remoteLogManagerFetchNumQuotaSamples, remoteLogManagerFetchQuotaWindowSizeSeconds, remoteFetchMaxWaitMs);
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
|
|
@ -95,7 +95,8 @@ public class RemoteLogManagerConfigTest {
|
|||
1,
|
||||
Long.MAX_VALUE,
|
||||
11,
|
||||
1);
|
||||
1,
|
||||
500);
|
||||
}
|
||||
|
||||
private Map<String, Object> extractProps(RemoteLogManagerConfig remoteLogManagerConfig) {
|
||||
|
@ -189,7 +190,8 @@ public class RemoteLogManagerConfigTest {
|
|||
1,
|
||||
Long.MAX_VALUE,
|
||||
11,
|
||||
1);
|
||||
1,
|
||||
500);
|
||||
|
||||
assertNotEquals(config1.hashCode(), config3.hashCode());
|
||||
assertNotEquals(config1, config3);
|
||||
|
|
Loading…
Reference in New Issue