mirror of https://github.com/apache/kafka.git
KAFKA-17730 ReplicaFetcherThreadBenchmark is broken (#18382)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
64b8b4a632
commit
7275dc129e
|
@ -17,11 +17,8 @@
|
||||||
|
|
||||||
package org.apache.kafka.jmh.fetcher;
|
package org.apache.kafka.jmh.fetcher;
|
||||||
|
|
||||||
import kafka.cluster.AlterPartitionListener;
|
|
||||||
import kafka.cluster.DelayedOperations;
|
|
||||||
import kafka.cluster.Partition;
|
import kafka.cluster.Partition;
|
||||||
import kafka.log.LogManager;
|
import kafka.log.LogManager;
|
||||||
import kafka.server.AlterPartitionManager;
|
|
||||||
import kafka.server.BrokerBlockingSender;
|
import kafka.server.BrokerBlockingSender;
|
||||||
import kafka.server.FailedPartitions;
|
import kafka.server.FailedPartitions;
|
||||||
import kafka.server.InitialFetchState;
|
import kafka.server.InitialFetchState;
|
||||||
|
@ -36,10 +33,8 @@ import kafka.server.ReplicaQuota;
|
||||||
import kafka.server.builders.LogManagerBuilder;
|
import kafka.server.builders.LogManagerBuilder;
|
||||||
import kafka.server.builders.ReplicaManagerBuilder;
|
import kafka.server.builders.ReplicaManagerBuilder;
|
||||||
import kafka.server.metadata.MockConfigRepository;
|
import kafka.server.metadata.MockConfigRepository;
|
||||||
import kafka.server.metadata.ZkMetadataCache;
|
|
||||||
import kafka.utils.Pool;
|
import kafka.utils.Pool;
|
||||||
import kafka.utils.TestUtils;
|
import kafka.utils.TestUtils;
|
||||||
import kafka.zk.KafkaZkClient;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.FetchSessionHandler;
|
import org.apache.kafka.clients.FetchSessionHandler;
|
||||||
import org.apache.kafka.common.TopicIdPartition;
|
import org.apache.kafka.common.TopicIdPartition;
|
||||||
|
@ -49,7 +44,6 @@ import org.apache.kafka.common.message.FetchResponseData;
|
||||||
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
|
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
|
||||||
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
|
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
|
||||||
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
|
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
|
||||||
import org.apache.kafka.common.message.UpdateMetadataRequestData;
|
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.protocol.ApiKeys;
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
|
@ -57,11 +51,9 @@ import org.apache.kafka.common.record.BaseRecords;
|
||||||
import org.apache.kafka.common.record.RecordsSend;
|
import org.apache.kafka.common.record.RecordsSend;
|
||||||
import org.apache.kafka.common.requests.FetchRequest;
|
import org.apache.kafka.common.requests.FetchRequest;
|
||||||
import org.apache.kafka.common.requests.FetchResponse;
|
import org.apache.kafka.common.requests.FetchResponse;
|
||||||
import org.apache.kafka.common.requests.UpdateMetadataRequest;
|
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.server.BrokerFeatures;
|
|
||||||
import org.apache.kafka.server.common.MetadataVersion;
|
import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.server.common.OffsetAndEpoch;
|
import org.apache.kafka.server.common.OffsetAndEpoch;
|
||||||
import org.apache.kafka.server.network.BrokerEndPoint;
|
import org.apache.kafka.server.network.BrokerEndPoint;
|
||||||
|
@ -91,20 +83,21 @@ import org.openjdk.jmh.annotations.Warmup;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
import scala.collection.Iterator;
|
import scala.collection.Iterator;
|
||||||
import scala.collection.Map;
|
import scala.collection.Map;
|
||||||
|
import scala.jdk.javaapi.CollectionConverters;
|
||||||
|
|
||||||
|
import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
|
||||||
|
|
||||||
@State(Scope.Benchmark)
|
@State(Scope.Benchmark)
|
||||||
@Fork(value = 1)
|
@Fork(value = 1)
|
||||||
|
@ -113,7 +106,6 @@ import scala.collection.Map;
|
||||||
@BenchmarkMode(Mode.AverageTime)
|
@BenchmarkMode(Mode.AverageTime)
|
||||||
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
@OutputTimeUnit(TimeUnit.NANOSECONDS)
|
||||||
public class ReplicaFetcherThreadBenchmark {
|
public class ReplicaFetcherThreadBenchmark {
|
||||||
private final File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
|
|
||||||
private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
|
private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
|
||||||
private final Pool<TopicPartition, Partition> pool = new Pool<>(Option.empty());
|
private final Pool<TopicPartition, Partition> pool = new Pool<>(Option.empty());
|
||||||
private final Metrics metrics = new Metrics();
|
private final Metrics metrics = new Metrics();
|
||||||
|
@ -127,18 +119,16 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
|
|
||||||
@Setup(Level.Trial)
|
@Setup(Level.Trial)
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
if (!logDir.mkdir())
|
|
||||||
throw new IOException("error creating test directory");
|
|
||||||
|
|
||||||
scheduler.startup();
|
scheduler.startup();
|
||||||
Properties props = new Properties();
|
KafkaConfig config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
|
||||||
props.put("zookeeper.connect", "127.0.0.1:9999");
|
0, true, true, 9092, Option.empty(), Option.empty(),
|
||||||
KafkaConfig config = new KafkaConfig(props);
|
Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1,
|
||||||
|
(short) 1, false));
|
||||||
LogConfig logConfig = createLogConfig();
|
LogConfig logConfig = createLogConfig();
|
||||||
|
|
||||||
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
|
BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false);
|
||||||
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
|
LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(config.logDirs().size());
|
||||||
List<File> logDirs = Collections.singletonList(logDir);
|
List<File> logDirs = CollectionConverters.asJava(config.logDirs()).stream().map(File::new).collect(Collectors.toList());
|
||||||
logManager = new LogManagerBuilder().
|
logManager = new LogManagerBuilder().
|
||||||
setLogDirs(logDirs).
|
setLogDirs(logDirs).
|
||||||
setInitialOfflineDirs(Collections.emptyList()).
|
setInitialOfflineDirs(Collections.emptyList()).
|
||||||
|
@ -159,10 +149,21 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
setKeepPartitionMetadataFile(true).
|
setKeepPartitionMetadataFile(true).
|
||||||
build();
|
build();
|
||||||
|
|
||||||
|
replicaManager = new ReplicaManagerBuilder().
|
||||||
|
setConfig(config).
|
||||||
|
setMetrics(metrics).
|
||||||
|
setTime(new MockTime()).
|
||||||
|
setScheduler(scheduler).
|
||||||
|
setLogManager(logManager).
|
||||||
|
setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
|
||||||
|
setBrokerTopicStats(brokerTopicStats).
|
||||||
|
setMetadataCache(MetadataCache.kRaftMetadataCache(config.nodeId(), () -> KRAFT_VERSION_1)).
|
||||||
|
setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
|
||||||
|
setAlterPartitionManager(TestUtils.createAlterIsrManager()).
|
||||||
|
build();
|
||||||
|
|
||||||
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> initialFetched = new LinkedHashMap<>();
|
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> initialFetched = new LinkedHashMap<>();
|
||||||
HashMap<String, Uuid> topicIds = new HashMap<>();
|
|
||||||
scala.collection.mutable.Map<TopicPartition, InitialFetchState> initialFetchStates = new scala.collection.mutable.HashMap<>();
|
scala.collection.mutable.Map<TopicPartition, InitialFetchState> initialFetchStates = new scala.collection.mutable.HashMap<>();
|
||||||
List<UpdateMetadataRequestData.UpdateMetadataPartitionState> updatePartitionState = new ArrayList<>();
|
|
||||||
for (int i = 0; i < partitionCount; i++) {
|
for (int i = 0; i < partitionCount; i++) {
|
||||||
TopicPartition tp = new TopicPartition("topic", i);
|
TopicPartition tp = new TopicPartition("topic", i);
|
||||||
|
|
||||||
|
@ -176,15 +177,10 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
.setReplicas(replicas)
|
.setReplicas(replicas)
|
||||||
.setIsNew(true);
|
.setIsNew(true);
|
||||||
|
|
||||||
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
|
OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Optional.of(0L);
|
||||||
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
|
Partition partition = replicaManager.createPartition(tp);
|
||||||
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Optional.of(0L));
|
|
||||||
AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class);
|
|
||||||
Partition partition = new Partition(tp, 100, MetadataVersion.latestTesting(),
|
|
||||||
0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(topicId, tp),
|
|
||||||
Mockito.mock(MetadataCache.class), logManager, isrChannelManager, topicId);
|
|
||||||
|
|
||||||
partition.makeFollower(partitionState, offsetCheckpoints, topicId, Option.empty());
|
partition.makeFollower(partitionState, checkpoints, topicId, Option.empty());
|
||||||
pool.put(tp, partition);
|
pool.put(tp, partition);
|
||||||
initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0));
|
initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0));
|
||||||
BaseRecords fetched = new BaseRecords() {
|
BaseRecords fetched = new BaseRecords() {
|
||||||
|
@ -203,39 +199,8 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
.setLastStableOffset(0)
|
.setLastStableOffset(0)
|
||||||
.setLogStartOffset(0)
|
.setLogStartOffset(0)
|
||||||
.setRecords(fetched));
|
.setRecords(fetched));
|
||||||
|
|
||||||
updatePartitionState.add(
|
|
||||||
new UpdateMetadataRequestData.UpdateMetadataPartitionState()
|
|
||||||
.setTopicName("topic")
|
|
||||||
.setPartitionIndex(i)
|
|
||||||
.setControllerEpoch(0)
|
|
||||||
.setLeader(0)
|
|
||||||
.setLeaderEpoch(0)
|
|
||||||
.setIsr(replicas)
|
|
||||||
.setZkVersion(1)
|
|
||||||
.setReplicas(replicas));
|
|
||||||
}
|
}
|
||||||
UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
|
|
||||||
0, 0, 0, updatePartitionState, Collections.emptyList(), topicIds).build();
|
|
||||||
|
|
||||||
// TODO: fix to support raft
|
|
||||||
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
|
|
||||||
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
|
|
||||||
metadataCache.updateMetadata(0, updateMetadataRequest);
|
|
||||||
|
|
||||||
replicaManager = new ReplicaManagerBuilder().
|
|
||||||
setConfig(config).
|
|
||||||
setMetrics(metrics).
|
|
||||||
setTime(new MockTime()).
|
|
||||||
setZkClient(Mockito.mock(KafkaZkClient.class)).
|
|
||||||
setScheduler(scheduler).
|
|
||||||
setLogManager(logManager).
|
|
||||||
setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
|
|
||||||
setBrokerTopicStats(brokerTopicStats).
|
|
||||||
setMetadataCache(metadataCache).
|
|
||||||
setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
|
|
||||||
setAlterPartitionManager(TestUtils.createAlterIsrManager()).
|
|
||||||
build();
|
|
||||||
replicaQuota = new ReplicaQuota() {
|
replicaQuota = new ReplicaQuota() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isQuotaExceeded() {
|
public boolean isQuotaExceeded() {
|
||||||
|
@ -266,7 +231,9 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
replicaManager.shutdown(false);
|
replicaManager.shutdown(false);
|
||||||
logManager.shutdown(-1L);
|
logManager.shutdown(-1L);
|
||||||
scheduler.shutdown();
|
scheduler.shutdown();
|
||||||
Utils.delete(logDir);
|
for (File dir : CollectionConverters.asJava(logManager.liveLogDirs())) {
|
||||||
|
Utils.delete(dir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Benchmark
|
@Benchmark
|
||||||
|
@ -275,18 +242,6 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
return fetcher.fetcherStats().requestRate().count();
|
return fetcher.fetcherStats().requestRate().count();
|
||||||
}
|
}
|
||||||
|
|
||||||
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
|
|
||||||
private static class DelayedOperationsMock extends DelayedOperations {
|
|
||||||
DelayedOperationsMock(Option<Uuid> topicId, TopicPartition topicPartition) {
|
|
||||||
super(topicId, topicPartition, null, null, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int numDelayedDelete() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static LogConfig createLogConfig() {
|
private static LogConfig createLogConfig() {
|
||||||
return new LogConfig(new Properties());
|
return new LogConfig(new Properties());
|
||||||
}
|
}
|
||||||
|
@ -318,7 +273,7 @@ public class ReplicaFetcherThreadBenchmark {
|
||||||
replicaManager,
|
replicaManager,
|
||||||
replicaQuota,
|
replicaQuota,
|
||||||
config::interBrokerProtocolVersion,
|
config::interBrokerProtocolVersion,
|
||||||
() -> -1
|
() -> -1L
|
||||||
) {
|
) {
|
||||||
@Override
|
@Override
|
||||||
public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
|
public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
|
||||||
|
|
Loading…
Reference in New Issue