KAFKA-17894: Implemented broker topic metrics for Share Group 1/N (KIP-1103) (#18444)

The PR implements the BrokerTopicMetrics defined in KIP-1103.

The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Apoorv Mittal 2025-01-24 17:34:54 +00:00 committed by GitHub
parent 2f1bf2f2ab
commit 70eab7778d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 504 additions and 87 deletions

View File

@ -92,6 +92,9 @@
<allow pkg="org.apache.kafka.network" /> <allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" /> <allow pkg="org.apache.kafka.storage.internals.log" />
</subpackage> </subpackage>
<subpackage name="share">
<allow pkg="org.apache.kafka.storage.log.metrics" />
</subpackage>
</subpackage> </subpackage>
<subpackage name="security"> <subpackage name="security">

View File

@ -45,7 +45,7 @@
files="(KafkaClusterTestKit).java"/> files="(KafkaClusterTestKit).java"/>
<suppress checks="NPathComplexity" files="TestKitNodes.java"/> <suppress checks="NPathComplexity" files="TestKitNodes.java"/>
<suppress checks="JavaNCSS" <suppress checks="JavaNCSS"
files="(RemoteLogManagerTest|SharePartitionTest).java"/> files="(RemoteLogManagerTest|SharePartitionManagerTest|SharePartitionTest).java"/>
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/> <suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/> <suppress checks="CyclomaticComplexity" files="SharePartition.java"/>

View File

@ -60,6 +60,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -68,13 +69,17 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
/** /**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
@ -140,6 +145,11 @@ public class SharePartitionManager implements AutoCloseable {
*/ */
private final ShareGroupMetrics shareGroupMetrics; private final ShareGroupMetrics shareGroupMetrics;
/**
* The broker topic stats is used to record the broker topic metrics for share group.
*/
private final BrokerTopicStats brokerTopicStats;
/** /**
* The max fetch records is the maximum number of records that can be fetched by a share fetch request. * The max fetch records is the maximum number of records that can be fetched by a share fetch request.
*/ */
@ -155,7 +165,8 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords, int maxFetchRecords,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics,
BrokerTopicStats brokerTopicStats
) { ) {
this(replicaManager, this(replicaManager,
time, time,
@ -167,7 +178,8 @@ public class SharePartitionManager implements AutoCloseable {
maxFetchRecords, maxFetchRecords,
persister, persister,
groupConfigManager, groupConfigManager,
metrics metrics,
brokerTopicStats
); );
} }
@ -182,7 +194,8 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords, int maxFetchRecords,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics,
BrokerTopicStats brokerTopicStats
) { ) {
this(replicaManager, this(replicaManager,
time, time,
@ -196,7 +209,8 @@ public class SharePartitionManager implements AutoCloseable {
maxFetchRecords, maxFetchRecords,
persister, persister,
groupConfigManager, groupConfigManager,
metrics metrics,
brokerTopicStats
); );
} }
@ -213,7 +227,8 @@ public class SharePartitionManager implements AutoCloseable {
int maxFetchRecords, int maxFetchRecords,
Persister persister, Persister persister,
GroupConfigManager groupConfigManager, GroupConfigManager groupConfigManager,
Metrics metrics Metrics metrics,
BrokerTopicStats brokerTopicStats
) { ) {
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
this.time = time; this.time = time;
@ -227,6 +242,7 @@ public class SharePartitionManager implements AutoCloseable {
this.groupConfigManager = groupConfigManager; this.groupConfigManager = groupConfigManager;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.maxFetchRecords = maxFetchRecords; this.maxFetchRecords = maxFetchRecords;
this.brokerTopicStats = brokerTopicStats;
} }
/** /**
@ -252,7 +268,7 @@ public class SharePartitionManager implements AutoCloseable {
partitionMaxBytes.keySet(), groupId, fetchParams); partitionMaxBytes.keySet(), groupId, fetchParams);
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>(); CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords)); processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords, brokerTopicStats));
return future; return future;
} }
@ -274,9 +290,11 @@ public class SharePartitionManager implements AutoCloseable {
) { ) {
log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}",
acknowledgeTopics.keySet(), groupId); acknowledgeTopics.keySet(), groupId);
this.shareGroupMetrics.shareAcknowledgement();
Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>(); Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>();
// Track the topics for which we have received an acknowledgement for metrics.
Set<String> topics = new HashSet<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
topics.add(topicIdPartition.topic());
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) { if (sharePartition != null) {
@ -302,7 +320,13 @@ public class SharePartitionManager implements AutoCloseable {
} }
}); });
return mapAcknowledgementFutures(futures); // Update the metrics for the topics for which we have received an acknowledgement.
topics.forEach(topic -> {
brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();
brokerTopicStats.topicStats(topic).totalShareAcknowledgementRequestRate().mark();
});
return mapAcknowledgementFutures(futures, Optional.of(failedShareAcknowledgeMetricsHandler()));
} }
/** /**
@ -363,14 +387,19 @@ public class SharePartitionManager implements AutoCloseable {
} }
}); });
return mapAcknowledgementFutures(futuresMap); return mapAcknowledgementFutures(futuresMap, Optional.empty());
} }
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap) { private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
Optional<Consumer<Set<String>>> failedMetricsHandler
) {
CompletableFuture<Void> allFutures = CompletableFuture.allOf( CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futuresMap.values().toArray(new CompletableFuture[0])); futuresMap.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> { return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>(); Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>();
// Keep the set as same topic might appear multiple times. Multiple partitions can fail for same topic.
Set<String> failedTopics = new HashSet<>();
futuresMap.forEach((topicIdPartition, future) -> { futuresMap.forEach((topicIdPartition, future) -> {
ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData() ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData()
.setPartitionIndex(topicIdPartition.partition()); .setPartitionIndex(topicIdPartition.partition());
@ -378,9 +407,11 @@ public class SharePartitionManager implements AutoCloseable {
if (t != null) { if (t != null) {
partitionData.setErrorCode(Errors.forException(t).code()) partitionData.setErrorCode(Errors.forException(t).code())
.setErrorMessage(t.getMessage()); .setErrorMessage(t.getMessage());
failedTopics.add(topicIdPartition.topic());
} }
result.put(topicIdPartition, partitionData); result.put(topicIdPartition, partitionData);
}); });
failedMetricsHandler.ifPresent(handler -> handler.accept(failedTopics));
return result; return result;
}); });
} }
@ -554,7 +585,10 @@ public class SharePartitionManager implements AutoCloseable {
List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new ArrayList<>(); List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new ArrayList<>();
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
// Track the topics for which we have received a share fetch request for metrics.
Set<String> topics = new HashSet<>();
for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) { for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) {
topics.add(topicIdPartition.topic());
SharePartitionKey sharePartitionKey = sharePartitionKey( SharePartitionKey sharePartitionKey = sharePartitionKey(
shareFetch.groupId(), shareFetch.groupId(),
topicIdPartition topicIdPartition
@ -598,6 +632,12 @@ public class SharePartitionManager implements AutoCloseable {
sharePartitions.put(topicIdPartition, sharePartition); sharePartitions.put(topicIdPartition, sharePartition);
} }
// Update the metrics for the topics for which we have received a share fetch request.
topics.forEach(topic -> {
brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().mark();
brokerTopicStats.topicStats(topic).totalShareFetchRequestRate().mark();
});
// If all the partitions in the request errored out, then complete the fetch request with an exception. // If all the partitions in the request errored out, then complete the fetch request with an exception.
if (shareFetch.errorInAllPartitions()) { if (shareFetch.errorInAllPartitions()) {
shareFetch.maybeComplete(Collections.emptyMap()); shareFetch.maybeComplete(Collections.emptyMap());
@ -695,6 +735,21 @@ public class SharePartitionManager implements AutoCloseable {
} }
} }
/**
* The handler to update the failed share acknowledge request metrics.
*
* @return A Consumer that updates the failed share acknowledge request metrics.
*/
private Consumer<Set<String>> failedShareAcknowledgeMetricsHandler() {
return failedTopics -> {
// Update failed share acknowledge request metric.
failedTopics.forEach(topic -> {
brokerTopicStats.allTopicsStats().failedShareAcknowledgementRequestRate().mark();
brokerTopicStats.topicStats(topic).failedShareAcknowledgementRequestRate().mark();
});
};
}
/** /**
* The SharePartitionListener is used to listen for partition events. The share partition is associated with * The SharePartitionListener is used to listen for partition events. The share partition is associated with
* the topic-partition, we need to handle the partition events for the share partition. * the topic-partition, we need to handle the partition events for the share partition.
@ -759,10 +814,6 @@ public class SharePartitionManager implements AutoCloseable {
public static final String METRICS_GROUP_NAME = "share-group-metrics"; public static final String METRICS_GROUP_NAME = "share-group-metrics";
public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor";
public static final String SHARE_ACK_RATE = "share-acknowledgement-rate";
public static final String SHARE_ACK_COUNT = "share-acknowledgement-count";
public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement"; public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
public static final String RECORD_ACK_RATE = "record-acknowledgement-rate"; public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
public static final String RECORD_ACK_COUNT = "record-acknowledgement-count"; public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
@ -775,7 +826,6 @@ public class SharePartitionManager implements AutoCloseable {
public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<>(); public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<>();
private final Time time; private final Time time;
private final Sensor shareAcknowledgementSensor;
private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>(); private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
private final Sensor partitionLoadTimeSensor; private final Sensor partitionLoadTimeSensor;
@ -787,18 +837,6 @@ public class SharePartitionManager implements AutoCloseable {
public ShareGroupMetrics(Metrics metrics, Time time) { public ShareGroupMetrics(Metrics metrics, Time time) {
this.time = time; this.time = time;
shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
shareAcknowledgementSensor.add(new Meter(
metrics.metricName(
SHARE_ACK_RATE,
METRICS_GROUP_NAME,
"Rate of acknowledge requests."),
metrics.metricName(
SHARE_ACK_COUNT,
METRICS_GROUP_NAME,
"The number of acknowledge requests.")));
for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) { for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue()))); recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
recordAcksSensorMap.get(entry.getKey()) recordAcksSensorMap.get(entry.getKey())
@ -828,10 +866,6 @@ public class SharePartitionManager implements AutoCloseable {
new Max()); new Max());
} }
void shareAcknowledgement() {
shareAcknowledgementSensor.record();
}
void recordAcknowledgement(byte ackType) { void recordAcknowledgement(byte ackType) {
// unknown ack types (such as gaps for control records) are intentionally ignored // unknown ack types (such as gaps for control records) are intentionally ignored
if (recordAcksSensorMap.containsKey(ackType)) { if (recordAcksSensorMap.containsKey(ackType)) {

View File

@ -440,7 +440,8 @@ class BrokerServer(
config.shareGroupConfig.shareFetchMaxFetchRecords, config.shareGroupConfig.shareFetchMaxFetchRecords,
persister, persister,
groupConfigManager, groupConfigManager,
metrics metrics,
brokerTopicStats
) )
dataPlaneRequestProcessor = new KafkaApis( dataPlaneRequestProcessor = new KafkaApis(

View File

@ -45,6 +45,7 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -93,6 +94,7 @@ public class DelayedShareFetchTest {
private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), private static final FetchParams FETCH_PARAMS = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(),
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK,
Optional.empty(), true); Optional.empty(), true);
private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats();
private Timer mockTimer; private Timer mockTimer;
@ -128,7 +130,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(false); when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -169,7 +172,8 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -230,7 +234,8 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -284,7 +289,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -333,7 +339,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(false); when(sp0.canAcquireRecords()).thenReturn(false);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -378,7 +384,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
@ -422,7 +429,7 @@ public class DelayedShareFetchTest {
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(false); when(sp0.canAcquireRecords()).thenReturn(false);
@ -480,7 +487,8 @@ public class DelayedShareFetchTest {
sharePartitions1.put(tp2, sp2); sharePartitions1.put(tp2, sp2);
ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes1, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes1, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, replicaManager.localBrokerId(), "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
@ -509,7 +517,8 @@ public class DelayedShareFetchTest {
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES); partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes2, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -567,7 +576,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch) .withShareFetchData(shareFetch)
@ -623,7 +632,8 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(true);
when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn( when(sp0.acquire(any(), anyInt(), anyInt(), any())).thenReturn(
@ -687,7 +697,8 @@ public class DelayedShareFetchTest {
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch) .withShareFetchData(shareFetch)
@ -717,7 +728,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp0, sp0); sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch) .withShareFetchData(shareFetch)
@ -747,7 +759,7 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); future, partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
// partitionMaxBytesStrategy.maxBytes() function throws an exception // partitionMaxBytesStrategy.maxBytes() function throws an exception
PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class);
@ -818,7 +830,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp4, sp4); sharePartitions.put(tp4, sp4);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn( when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
@ -920,7 +933,8 @@ public class DelayedShareFetchTest {
sharePartitions.put(tp4, sp4); sharePartitions.put(tp4, sp4);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn( when(sp0.acquire(anyString(), anyInt(), anyInt(), any(FetchPartitionData.class))).thenReturn(
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
@ -995,7 +1009,8 @@ public class DelayedShareFetchTest {
ShareFetch shareFetch = new ShareFetch( ShareFetch shareFetch = new ShareFetch(
new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch) .withShareFetchData(shareFetch)

View File

@ -37,6 +37,7 @@ import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -75,6 +76,7 @@ public class ShareFetchUtilsTest {
private static final BiConsumer<SharePartitionKey, Throwable> EXCEPTION_HANDLER = (key, exception) -> { private static final BiConsumer<SharePartitionKey, Throwable> EXCEPTION_HANDLER = (key, exception) -> {
// No-op // No-op
}; };
private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats();
@Test @Test
public void testProcessFetchResponse() { public void testProcessFetchResponse() {
@ -104,7 +106,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId, ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("0".getBytes(), "v".getBytes()),
@ -167,7 +169,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId, ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId,
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>(); Map<TopicIdPartition, FetchPartitionData> responseData = new HashMap<>();
responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, responseData.put(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L,
@ -209,7 +211,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp1, sp1); sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
ReplicaManager replicaManager = mock(ReplicaManager.class); ReplicaManager replicaManager = mock(ReplicaManager.class);
@ -300,7 +302,7 @@ public class ShareFetchUtilsTest {
sharePartitions.put(tp0, sp0); sharePartitions.put(tp0, sp0);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 100, BROKER_TOPIC_STATS);
ReplicaManager replicaManager = mock(ReplicaManager.class); ReplicaManager replicaManager = mock(ReplicaManager.class);
@ -372,7 +374,7 @@ public class ShareFetchUtilsTest {
Uuid memberId = Uuid.randomUuid(); Uuid memberId = Uuid.randomUuid();
// Set max fetch records to 10 // Set max fetch records to 10
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId.toString(), ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId.toString(),
new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10); new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE, 10, BROKER_TOPIC_STATS);
MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE, MemoryRecords records1 = MemoryRecords.withRecords(Compression.NONE,
new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("0".getBytes(), "v".getBytes()),

View File

@ -82,6 +82,8 @@ import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -157,6 +159,7 @@ public class SharePartitionManagerTest {
private Timer mockTimer; private Timer mockTimer;
private ReplicaManager mockReplicaManager; private ReplicaManager mockReplicaManager;
private BrokerTopicStats brokerTopicStats;
private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>()); private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>());
@ -164,6 +167,7 @@ public class SharePartitionManagerTest {
public void setUp() { public void setUp() {
mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper",
new SystemTimer("sharePartitionManagerTestTimer")); new SystemTimer("sharePartitionManagerTestTimer"));
brokerTopicStats = new BrokerTopicStats();
mockReplicaManager = mock(ReplicaManager.class); mockReplicaManager = mock(ReplicaManager.class);
Partition partition = mockPartition(); Partition partition = mockPartition();
when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition); when(mockReplicaManager.getPartitionOrException(Mockito.any())).thenReturn(partition);
@ -172,6 +176,7 @@ public class SharePartitionManagerTest {
@AfterEach @AfterEach
public void tearDown() throws Exception { public void tearDown() throws Exception {
mockTimer.close(); mockTimer.close();
brokerTopicStats.close();
} }
@Test @Test
@ -680,7 +685,10 @@ public class SharePartitionManagerTest {
Time time = new MockTime(); Time time = new MockTime();
ShareSessionCache cache = new ShareSessionCache(10, 1000); ShareSessionCache cache = new ShareSessionCache(10, 1000);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache).withTime(time).build(); .withCache(cache)
.withTime(time)
.build();
Uuid tpId0 = Uuid.randomUuid(); Uuid tpId0 = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0)); TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1)); TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
@ -1069,6 +1077,7 @@ public class SharePartitionManagerTest {
.withTime(time) .withTime(time)
.withMetrics(metrics) .withMetrics(metrics)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -1101,6 +1110,14 @@ public class SharePartitionManagerTest {
assertTrue(metrics.metrics().containsKey(metric)); assertTrue(metrics.metrics().containsKey(metric));
test.accept((Double) metrics.metrics().get(metric).metricValue()); test.accept((Double) metrics.metrics().get(metric).metricValue());
}); });
// Should have 6 total fetches, 3 fetches for topic foo (though 4 partitions but 3 fetches) and 3
// fetches for topic bar (though 3 partitions but 3 fetches).
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(6, 0, 0, 0),
Map.of("foo", new TopicMetrics(3, 0, 0, 0), "bar", new TopicMetrics(3, 0, 0, 0))
);
} }
@Test @Test
@ -1138,6 +1155,7 @@ public class SharePartitionManagerTest {
.withTime(time) .withTime(time)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
SharePartition sp0 = mock(SharePartition.class); SharePartition sp0 = mock(SharePartition.class);
@ -1231,6 +1249,7 @@ public class SharePartitionManagerTest {
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -1240,6 +1259,12 @@ public class SharePartitionManagerTest {
any(), any(), any(ReplicaQuota.class), anyBoolean()); any(), any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join(); Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join();
assertEquals(0, result.size()); assertEquals(0, result.size());
// Should have 1 fetch recorded and no failed as the fetch did complete without error.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 0, 0, 0),
Map.of("foo", new TopicMetrics(1, 0, 0, 0))
);
} }
@Test @Test
@ -1261,6 +1286,7 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); doAnswer(invocation -> buildLogReadResult(partitionMaxBytes.keySet())).when(mockReplicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
@ -1271,6 +1297,12 @@ public class SharePartitionManagerTest {
// even though the maxInFlightMessages limit is exceeded, replicaManager.readFromLog should be called // even though the maxInFlightMessages limit is exceeded, replicaManager.readFromLog should be called
Mockito.verify(mockReplicaManager, times(1)).readFromLog( Mockito.verify(mockReplicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean()); any(), any(), any(ReplicaQuota.class), anyBoolean());
// Should have 1 fetch recorded and no failed as the fetch did complete.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 0, 0, 0),
Map.of("foo", new TopicMetrics(1, 0, 0, 0))
);
} }
@Test @Test
@ -1323,6 +1355,7 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withCache(cache) .withCache(cache)
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture = CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
@ -1341,6 +1374,12 @@ public class SharePartitionManagerTest {
assertEquals(4, result.get(tp3).partitionIndex()); assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp3).errorMessage()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp3).errorMessage());
// Shouldn't have any metrics for fetch and acknowledge.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 0, 0),
Map.of()
);
} }
@Test @Test
@ -1454,8 +1493,11 @@ public class SharePartitionManagerTest {
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build(); .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp, Arrays.asList( acknowledgeTopics.put(tp, Arrays.asList(
@ -1469,6 +1511,12 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp)); assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex()); assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp).errorCode()); assertEquals(Errors.NONE.code(), result.get(tp).errorCode());
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 0),
Map.of("foo", new TopicMetrics(0, 0, 1, 0))
);
} }
@Test @Test
@ -1495,7 +1543,10 @@ public class SharePartitionManagerTest {
Metrics metrics = new Metrics(); Metrics metrics = new Metrics();
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withMetrics(metrics).build(); .withPartitionCacheMap(partitionCacheMap)
.withMetrics(metrics)
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp1, Arrays.asList( acknowledgeTopics.put(tp1, Arrays.asList(
@ -1525,14 +1576,6 @@ public class SharePartitionManagerTest {
assertEquals(Errors.NONE.code(), result.get(tp3).errorCode()); assertEquals(Errors.NONE.code(), result.get(tp3).errorCode());
Map<MetricName, Consumer<Double>> expectedMetrics = new HashMap<>(); Map<MetricName, Consumer<Double>> expectedMetrics = new HashMap<>();
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.SHARE_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME),
val -> assertEquals(val, 1.0)
);
expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.SHARE_ACK_RATE, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME),
val -> assertTrue(val > 0)
);
expectedMetrics.put( expectedMetrics.put(
metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME, metrics.metricName(SharePartitionManager.ShareGroupMetrics.RECORD_ACK_COUNT, SharePartitionManager.ShareGroupMetrics.METRICS_GROUP_NAME,
Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.ACCEPT.toString())), Collections.singletonMap(SharePartitionManager.ShareGroupMetrics.ACK_TYPE, AcknowledgeType.ACCEPT.toString())),
@ -1567,6 +1610,13 @@ public class SharePartitionManagerTest {
assertTrue(metrics.metrics().containsKey(metric)); assertTrue(metrics.metrics().containsKey(metric));
test.accept((Double) metrics.metrics().get(metric).metricValue()); test.accept((Double) metrics.metrics().get(metric).metricValue());
}); });
// Should have 3 successful acknowledgement and 1 successful acknowledgement per topic.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 3, 0),
Map.of(tp1.topic(), new TopicMetrics(0, 0, 1, 0), tp2.topic(), new TopicMetrics(0, 0, 1, 0), tp3.topic(), new TopicMetrics(0, 0, 1, 0))
);
} }
@Test @Test
@ -1581,7 +1631,9 @@ public class SharePartitionManagerTest {
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build(); .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp, Arrays.asList( acknowledgeTopics.put(tp, Arrays.asList(
@ -1596,6 +1648,12 @@ public class SharePartitionManagerTest {
assertEquals(0, result.get(tp).partitionIndex()); assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage());
// Should have 1 acknowledge recorded and 1 failed.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 1),
Map.of(tp.topic(), new TopicMetrics(0, 0, 1, 1))
);
} }
@Test @Test
@ -1611,7 +1669,9 @@ public class SharePartitionManagerTest {
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp); partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).build(); .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp, Arrays.asList( acknowledgeTopics.put(tp, Arrays.asList(
@ -1627,6 +1687,12 @@ public class SharePartitionManagerTest {
assertEquals(0, result.get(tp).partitionIndex()); assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode()); assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode());
assertEquals("Member is not the owner of batch record", result.get(tp).errorMessage()); assertEquals("Member is not the owner of batch record", result.get(tp).errorMessage());
// Should have 1 acknowledge recorded and 1 failed.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 1),
Map.of(tp.topic(), new TopicMetrics(0, 0, 1, 1))
);
} }
@Test @Test
@ -1635,7 +1701,9 @@ public class SharePartitionManagerTest {
String memberId = Uuid.randomUuid().toString(); String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3)); TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>(); Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp, Arrays.asList( acknowledgeTopics.put(tp, Arrays.asList(
@ -1650,6 +1718,12 @@ public class SharePartitionManagerTest {
assertEquals(3, result.get(tp).partitionIndex()); assertEquals(3, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage());
// Should have 1 acknowledge recorded and 1 failed.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 1),
Map.of(tp.topic(), new TopicMetrics(0, 0, 1, 1))
);
} }
@Test @Test
@ -1666,7 +1740,6 @@ public class SharePartitionManagerTest {
SharePartition sp1 = mock(SharePartition.class); SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class); SharePartition sp2 = mock(SharePartition.class);
// mocked share partitions sp1 and sp2 can be acquired once there is an acknowledgement for it. // mocked share partitions sp1 and sp2 can be acquired once there is an acknowledgement for it.
doAnswer(invocation -> { doAnswer(invocation -> {
when(sp1.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(true);
@ -1688,7 +1761,8 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(), new CompletableFuture<>(),
partitionMaxBytes, partitionMaxBytes,
BATCH_SIZE, BATCH_SIZE,
100); 100,
brokerTopicStats);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
@ -1702,6 +1776,8 @@ public class SharePartitionManagerTest {
when(sp1.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false);
when(sp2.maybeAcquireFetchLock()).thenReturn(true); when(sp2.maybeAcquireFetchLock()).thenReturn(true);
when(sp2.canAcquireRecords()).thenReturn(false); when(sp2.canAcquireRecords()).thenReturn(false);
when(sp1.acquire(anyString(), anyInt(), anyInt(), any())).thenReturn(ShareAcquiredRecords.empty());
when(sp2.acquire(anyString(), anyInt(), anyInt(), any())).thenReturn(ShareAcquiredRecords.empty());
List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>(); List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>();
partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
@ -1710,6 +1786,7 @@ public class SharePartitionManagerTest {
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
@ -1747,6 +1824,12 @@ public class SharePartitionManagerTest {
Mockito.verify(sp2, times(0)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset();
assertTrue(delayedShareFetch.lock().tryLock()); assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock(); delayedShareFetch.lock().unlock();
// Should have 1 acknowledge recorded as other topic is acknowledgement request is not sent.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 0),
Map.of(tp1.topic(), new TopicMetrics(0, 0, 1, 0))
);
} }
@Test @Test
@ -1792,7 +1875,8 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(), new CompletableFuture<>(),
partitionMaxBytes, partitionMaxBytes,
BATCH_SIZE, BATCH_SIZE,
100); 100,
brokerTopicStats);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
@ -1814,6 +1898,7 @@ public class SharePartitionManagerTest {
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
@ -1848,6 +1933,12 @@ public class SharePartitionManagerTest {
Mockito.verify(sp2, times(0)).nextFetchOffset(); Mockito.verify(sp2, times(0)).nextFetchOffset();
assertTrue(delayedShareFetch.lock().tryLock()); assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock(); delayedShareFetch.lock().unlock();
// Should have 1 acknowledge recorded as other 2 topics acknowledgement request is not sent.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(0, 0, 1, 0),
Map.of(tp3.topic(), new TopicMetrics(0, 0, 1, 0))
);
} }
@Test @Test
@ -1891,7 +1982,8 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(), new CompletableFuture<>(),
partitionMaxBytes, partitionMaxBytes,
BATCH_SIZE, BATCH_SIZE,
100); 100,
brokerTopicStats);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
@ -1999,7 +2091,8 @@ public class SharePartitionManagerTest {
new CompletableFuture<>(), new CompletableFuture<>(),
partitionMaxBytes, partitionMaxBytes,
BATCH_SIZE, BATCH_SIZE,
100); 100,
brokerTopicStats);
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
@ -2079,7 +2172,10 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -2097,6 +2193,12 @@ public class SharePartitionManagerTest {
assertFalse(pendingInitializationFuture.isDone()); assertFalse(pendingInitializationFuture.isDone());
// Complete the pending initialization future. // Complete the pending initialization future.
pendingInitializationFuture.complete(null); pendingInitializationFuture.complete(null);
// Should have 1 fetch recorded.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 0, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(1, 0, 0, 0))
);
} }
@Test @Test
@ -2125,7 +2227,10 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy); mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
// Send 3 requests for share fetch for same share partition. // Send 3 requests for share fetch for same share partition.
@ -2163,6 +2268,12 @@ public class SharePartitionManagerTest {
// Verify that replica manager fetch is not called. // Verify that replica manager fetch is not called.
Mockito.verify(mockReplicaManager, times(0)).readFromLog( Mockito.verify(mockReplicaManager, times(0)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean()); any(), any(), any(ReplicaQuota.class), anyBoolean());
// Should have 3 fetch recorded.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(3, 0, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(3, 0, 0, 0))
);
} }
@Test @Test
@ -2183,7 +2294,10 @@ public class SharePartitionManagerTest {
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager)
.withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
// Return LeaderNotAvailableException to simulate initialization failure. // Return LeaderNotAvailableException to simulate initialization failure.
@ -2284,6 +2398,13 @@ public class SharePartitionManagerTest {
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception"); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
Mockito.verify(sp0, times(6)).markFenced(); Mockito.verify(sp0, times(6)).markFenced();
assertTrue(partitionCacheMap.isEmpty()); assertTrue(partitionCacheMap.isEmpty());
// Should have 7 fetch recorded and 6 failures as 1 fetch was waiting on initialization and
// didn't error out.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(7, 6, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(7, 6, 0, 0))
);
} }
@ -2301,6 +2422,7 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -2311,6 +2433,12 @@ public class SharePartitionManagerTest {
DELAYED_SHARE_FETCH_TIMEOUT_MS, DELAYED_SHARE_FETCH_TIMEOUT_MS,
() -> "Processing for delayed share fetch request not finished."); () -> "Processing for delayed share fetch request not finished.");
validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance"); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
// Should have 1 fetch recorded and 1 failure.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 1, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(1, 1, 0, 0))
);
} }
@Test @Test
@ -2334,6 +2462,7 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
// Validate when exception is thrown. // Validate when exception is thrown.
@ -2356,6 +2485,12 @@ public class SharePartitionManagerTest {
() -> "Processing for delayed share fetch request not finished."); () -> "Processing for delayed share fetch request not finished.");
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER);
assertTrue(partitionCacheMap.isEmpty()); assertTrue(partitionCacheMap.isEmpty());
// Should have 2 fetch recorded and 2 failure.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(2, 2, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(2, 2, 0, 0))
);
} }
@Test @Test
@ -2407,6 +2542,7 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(replicaManager) .withReplicaManager(replicaManager)
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
// Validate when exception is thrown. // Validate when exception is thrown.
@ -2429,6 +2565,13 @@ public class SharePartitionManagerTest {
Mockito.verify(replicaManager, times(0)).completeDelayedShareFetchRequest(any()); Mockito.verify(replicaManager, times(0)).completeDelayedShareFetchRequest(any());
Mockito.verify(replicaManager, times(1)).readFromLog( Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean()); any(), any(), any(ReplicaQuota.class), anyBoolean());
// Should have 1 fetch recorded and 1 failure as single topic has multiple partition fetch
// and failure.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(1, 1, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(1, 1, 0, 0))
);
} }
@Test @Test
@ -2456,6 +2599,7 @@ public class SharePartitionManagerTest {
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -2472,6 +2616,12 @@ public class SharePartitionManagerTest {
BATCH_SIZE, partitionMaxBytes); BATCH_SIZE, partitionMaxBytes);
validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception"); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
assertTrue(partitionCacheMap.isEmpty()); assertTrue(partitionCacheMap.isEmpty());
// Should have 2 fetch recorded and 2 failures.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(2, 2, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(2, 2, 0, 0))
);
} }
@Test @Test
@ -2512,6 +2662,7 @@ public class SharePartitionManagerTest {
.withPartitionCacheMap(partitionCacheMap) .withPartitionCacheMap(partitionCacheMap)
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future =
@ -2534,6 +2685,14 @@ public class SharePartitionManagerTest {
BATCH_SIZE, partitionMaxBytes); BATCH_SIZE, partitionMaxBytes);
validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again"); validateShareFetchFutureException(future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
assertTrue(partitionCacheMap.isEmpty()); assertTrue(partitionCacheMap.isEmpty());
// Should have 4 fetch recorded (2 fetch and 2 topics) and 3 failures as sp1 was not acquired
// in first fetch and shall have empty response. Similarly, tp0 should record 2 failures and
// tp1 should record 1 failure.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(4, 3, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(2, 2, 0, 0), tp1.topic(), new TopicMetrics(2, 1, 0, 0))
);
} }
@Test @Test
@ -2554,12 +2713,20 @@ public class SharePartitionManagerTest {
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
.withReplicaManager(mockReplicaManager) .withReplicaManager(mockReplicaManager)
.withTimer(mockTimer) .withTimer(mockTimer)
.withBrokerTopicStats(brokerTopicStats)
.build(); .build();
sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, BATCH_SIZE, sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, BATCH_SIZE,
partitionMaxBytes); partitionMaxBytes);
// Validate that the listener is registered. // Validate that the listener is registered.
verify(mockReplicaManager, times(2)).maybeAddListener(any(), any()); verify(mockReplicaManager, times(2)).maybeAddListener(any(), any());
// The share partition initialization should error out as further mocks are not provided, the
// metrics should mark fetch as failed.
validateBrokerTopicStatsMetrics(
brokerTopicStats,
new TopicMetrics(2, 2, 0, 0),
Map.of(tp0.topic(), new TopicMetrics(1, 1, 0, 0), tp1.topic(), new TopicMetrics(1, 1, 0, 0))
);
} }
@Test @Test
@ -2727,6 +2894,28 @@ public class SharePartitionManagerTest {
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
} }
private void validateBrokerTopicStatsMetrics(
BrokerTopicStats brokerTopicStats,
TopicMetrics expectedAllTopicMetrics,
Map<String, TopicMetrics> expectedTopicMetrics
) {
if (expectedAllTopicMetrics != null) {
assertEquals(expectedAllTopicMetrics.totalShareFetchRequestCount, brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().count());
assertEquals(expectedAllTopicMetrics.failedShareFetchRequestCount, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(expectedAllTopicMetrics.totalShareAcknowledgementRequestCount, brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().count());
assertEquals(expectedAllTopicMetrics.failedShareAcknowledgementRequestCount, brokerTopicStats.allTopicsStats().failedShareAcknowledgementRequestRate().count());
}
// Validate tracked topic metrics.
assertEquals(expectedTopicMetrics.size(), brokerTopicStats.numTopics());
expectedTopicMetrics.forEach((topic, metrics) -> {
BrokerTopicMetrics topicMetrics = brokerTopicStats.topicStats(topic);
assertEquals(metrics.totalShareFetchRequestCount, topicMetrics.totalShareFetchRequestRate().count());
assertEquals(metrics.failedShareFetchRequestCount, topicMetrics.failedShareFetchRequestRate().count());
assertEquals(metrics.totalShareAcknowledgementRequestCount, topicMetrics.totalShareAcknowledgementRequestRate().count());
assertEquals(metrics.failedShareAcknowledgementRequestCount, topicMetrics.failedShareAcknowledgementRequestRate().count());
});
}
static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) { static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(Set<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>(); List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult(
@ -2762,6 +2951,13 @@ public class SharePartitionManagerTest {
}).when(replicaManager).addDelayedShareFetchRequest(any(), any()); }).when(replicaManager).addDelayedShareFetchRequest(any(), any());
} }
private record TopicMetrics(
long totalShareFetchRequestCount,
long failedShareFetchRequestCount,
long totalShareAcknowledgementRequestCount,
long failedShareAcknowledgementRequestCount
) { }
static class SharePartitionManagerBuilder { static class SharePartitionManagerBuilder {
private ReplicaManager replicaManager = mock(ReplicaManager.class); private ReplicaManager replicaManager = mock(ReplicaManager.class);
private Time time = new MockTime(); private Time time = new MockTime();
@ -2770,6 +2966,7 @@ public class SharePartitionManagerTest {
private Persister persister = new NoOpShareStatePersister(); private Persister persister = new NoOpShareStatePersister();
private Timer timer = new MockTimer(); private Timer timer = new MockTimer();
private Metrics metrics = new Metrics(); private Metrics metrics = new Metrics();
private BrokerTopicStats brokerTopicStats;
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
this.replicaManager = replicaManager; this.replicaManager = replicaManager;
@ -2806,6 +3003,11 @@ public class SharePartitionManagerTest {
return this; return this;
} }
private SharePartitionManagerBuilder withBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
this.brokerTopicStats = brokerTopicStats;
return this;
}
public static SharePartitionManagerBuilder builder() { public static SharePartitionManagerBuilder builder() {
return new SharePartitionManagerBuilder(); return new SharePartitionManagerBuilder();
} }
@ -2822,7 +3024,8 @@ public class SharePartitionManagerTest {
MAX_FETCH_RECORDS, MAX_FETCH_RECORDS,
persister, persister,
mock(GroupConfigManager.class), mock(GroupConfigManager.class),
metrics); metrics,
brokerTopicStats);
} }
} }
} }

View File

@ -21,14 +21,15 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** /**
* The ShareFetch class is used to store the fetch parameters for a share fetch request. * The ShareFetch class is used to store the fetch parameters for a share fetch request.
@ -64,6 +65,11 @@ public class ShareFetch {
* The maximum number of records that can be fetched for the request. * The maximum number of records that can be fetched for the request.
*/ */
private final int maxFetchRecords; private final int maxFetchRecords;
/**
* The handler to update the failed share fetch metrics.
*/
private final BrokerTopicStats brokerTopicStats;
/** /**
* The partitions that had an error during the fetch. * The partitions that had an error during the fetch.
*/ */
@ -76,7 +82,8 @@ public class ShareFetch {
CompletableFuture<Map<TopicIdPartition, PartitionData>> future, CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
Map<TopicIdPartition, Integer> partitionMaxBytes, Map<TopicIdPartition, Integer> partitionMaxBytes,
int batchSize, int batchSize,
int maxFetchRecords int maxFetchRecords,
BrokerTopicStats brokerTopicStats
) { ) {
this.fetchParams = fetchParams; this.fetchParams = fetchParams;
this.groupId = groupId; this.groupId = groupId;
@ -85,6 +92,7 @@ public class ShareFetch {
this.partitionMaxBytes = partitionMaxBytes; this.partitionMaxBytes = partitionMaxBytes;
this.batchSize = batchSize; this.batchSize = batchSize;
this.maxFetchRecords = maxFetchRecords; this.maxFetchRecords = maxFetchRecords;
this.brokerTopicStats = brokerTopicStats;
} }
public String groupId() { public String groupId() {
@ -174,10 +182,9 @@ public class ShareFetch {
if (isCompleted()) { if (isCompleted()) {
return; return;
} }
Map<TopicIdPartition, PartitionData> response = topicIdPartitions.stream().collect( Map<TopicIdPartition, PartitionData> response = new HashMap<>();
Collectors.toMap(tp -> tp, tp -> new PartitionData() // Add the exception to erroneous partitions to track the error.
.setErrorCode(Errors.forException(throwable).code()) topicIdPartitions.forEach(tp -> addErroneous(tp, throwable));
.setErrorMessage(throwable.getMessage())));
// Add any erroneous partitions to the response. // Add any erroneous partitions to the response.
addErroneousToResponse(response); addErroneousToResponse(response);
future.complete(response); future.complete(response);
@ -202,11 +209,19 @@ public class ShareFetch {
private synchronized void addErroneousToResponse(Map<TopicIdPartition, PartitionData> response) { private synchronized void addErroneousToResponse(Map<TopicIdPartition, PartitionData> response) {
if (erroneous != null) { if (erroneous != null) {
// Track the failed topics for metrics.
Set<String> erroneousTopics = new HashSet<>();
erroneous.forEach((topicIdPartition, throwable) -> { erroneous.forEach((topicIdPartition, throwable) -> {
erroneousTopics.add(topicIdPartition.topic());
response.put(topicIdPartition, new PartitionData() response.put(topicIdPartition, new PartitionData()
.setPartitionIndex(topicIdPartition.partition())
.setErrorCode(Errors.forException(throwable).code()) .setErrorCode(Errors.forException(throwable).code())
.setErrorMessage(throwable.getMessage())); .setErrorMessage(throwable.getMessage()));
}); });
erroneousTopics.forEach(topic -> {
brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().mark();
brokerTopicStats.topicStats(topic).failedShareFetchRequestRate().mark();
});
} }
} }
} }

View File

@ -19,10 +19,15 @@ package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData;
import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -38,11 +43,23 @@ public class ShareFetchTest {
private static final String MEMBER_ID = "memberId"; private static final String MEMBER_ID = "memberId";
private static final int BATCH_SIZE = 500; private static final int BATCH_SIZE = 500;
private BrokerTopicStats brokerTopicStats;
@BeforeEach
public void setUp() {
brokerTopicStats = new BrokerTopicStats();
}
@AfterEach
public void tearDown() throws Exception {
brokerTopicStats.close();
}
@Test @Test
public void testErrorInAllPartitions() { public void testErrorInAllPartitions() {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
Map.of(topicIdPartition, 10), BATCH_SIZE, 100); Map.of(topicIdPartition, 10), BATCH_SIZE, 100, brokerTopicStats);
assertFalse(shareFetch.errorInAllPartitions()); assertFalse(shareFetch.errorInAllPartitions());
shareFetch.addErroneous(topicIdPartition, new RuntimeException()); shareFetch.addErroneous(topicIdPartition, new RuntimeException());
@ -54,7 +71,8 @@ public class ShareFetchTest {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100); Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100,
brokerTopicStats);
assertFalse(shareFetch.errorInAllPartitions()); assertFalse(shareFetch.errorInAllPartitions());
shareFetch.addErroneous(topicIdPartition0, new RuntimeException()); shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
@ -69,7 +87,8 @@ public class ShareFetchTest {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(),
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100); Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100,
brokerTopicStats);
Set<TopicIdPartition> result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1)); Set<TopicIdPartition> result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1));
// No erroneous partitions, hence all partitions should be returned. // No erroneous partitions, hence all partitions should be returned.
assertEquals(2, result.size()); assertEquals(2, result.size());
@ -88,4 +107,100 @@ public class ShareFetchTest {
assertTrue(result.isEmpty()); assertTrue(result.isEmpty());
} }
@Test
public void testMaybeCompleteWithErroneousTopicPartitions() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100, brokerTopicStats);
// Add both erroneous partition and complete request.
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
shareFetch.addErroneous(topicIdPartition1, new RuntimeException());
shareFetch.maybeComplete(Map.of());
assertEquals(2, future.join().size());
assertTrue(future.join().containsKey(topicIdPartition0));
assertTrue(future.join().containsKey(topicIdPartition1));
// Validate failed share fetch request metrics, though 2 partitions failed but only 1 topic failed.
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
}
@Test
public void testMaybeCompleteWithPartialErroneousTopicPartitions() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100, brokerTopicStats);
// Add an erroneous partition and complete request.
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
shareFetch.maybeComplete(Map.of());
assertTrue(future.isDone());
assertEquals(1, future.join().size());
assertTrue(future.join().containsKey(topicIdPartition0));
// Validate failed share fetch request metrics, 1 topic partition failed and 1 succeeded.
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
}
@Test
public void testMaybeCompleteWithException() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.maybeCompleteWithException(List.of(topicIdPartition0, topicIdPartition1), new RuntimeException());
assertEquals(2, future.join().size());
assertTrue(future.join().containsKey(topicIdPartition0));
assertTrue(future.join().containsKey(topicIdPartition1));
// Validate failed share fetch request metrics, though 2 partitions failed but only 1 topic failed.
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
}
@Test
public void testMaybeCompleteWithExceptionPartialFailure() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
Map.of(topicIdPartition0, 10, topicIdPartition1, 10, topicIdPartition2, 10), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.maybeCompleteWithException(List.of(topicIdPartition0, topicIdPartition2), new RuntimeException());
assertEquals(2, future.join().size());
assertTrue(future.join().containsKey(topicIdPartition0));
// Validate failed share fetch request metrics, 1 topic partition failed and 1 succeeded.
assertEquals(2, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo1").failedShareFetchRequestRate().count());
}
@Test
public void testMaybeCompleteWithExceptionWithExistingErroneousTopicPartition() {
TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, future,
Map.of(topicIdPartition0, 10, topicIdPartition1, 10), BATCH_SIZE, 100, brokerTopicStats);
shareFetch.addErroneous(topicIdPartition0, new RuntimeException());
shareFetch.maybeCompleteWithException(List.of(topicIdPartition1), new RuntimeException());
assertEquals(2, future.join().size());
assertTrue(future.join().containsKey(topicIdPartition0));
assertTrue(future.join().containsKey(topicIdPartition1));
// Validate failed share fetch request metrics, though 2 partitions failed but only 1 topic failed.
assertEquals(1, brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
assertEquals(1, brokerTopicStats.topicStats("foo").failedShareFetchRequestRate().count());
}
} }

View File

@ -45,6 +45,10 @@ public final class BrokerTopicMetrics {
public static final String PRODUCE_MESSAGE_CONVERSIONS_PER_SEC = "ProduceMessageConversionsPerSec"; public static final String PRODUCE_MESSAGE_CONVERSIONS_PER_SEC = "ProduceMessageConversionsPerSec";
public static final String REASSIGNMENT_BYTES_IN_PER_SEC = "ReassignmentBytesInPerSec"; public static final String REASSIGNMENT_BYTES_IN_PER_SEC = "ReassignmentBytesInPerSec";
public static final String REASSIGNMENT_BYTES_OUT_PER_SEC = "ReassignmentBytesOutPerSec"; public static final String REASSIGNMENT_BYTES_OUT_PER_SEC = "ReassignmentBytesOutPerSec";
public static final String TOTAL_SHARE_FETCH_REQUESTS_PER_SEC = "TotalShareFetchRequestsPerSec";
public static final String FAILED_SHARE_FETCH_REQUESTS_PER_SEC = "FailedShareFetchRequestsPerSec";
public static final String TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "TotalShareAcknowledgementRequestsPerSec";
public static final String FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC = "FailedShareAcknowledgementRequestsPerSec";
// These following topics are for LogValidator for better debugging on failed records // These following topics are for LogValidator for better debugging on failed records
public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec"; public static final String NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC = "NoKeyCompactedTopicRecordsPerSec";
public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec"; public static final String INVALID_MAGIC_NUMBER_RECORDS_PER_SEC = "InvalidMagicNumberRecordsPerSec";
@ -75,8 +79,12 @@ public final class BrokerTopicMetrics {
metricTypeMap.put(BYTES_REJECTED_PER_SEC, new MeterWrapper(BYTES_REJECTED_PER_SEC, "bytes")); metricTypeMap.put(BYTES_REJECTED_PER_SEC, new MeterWrapper(BYTES_REJECTED_PER_SEC, "bytes"));
metricTypeMap.put(FAILED_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(FAILED_PRODUCE_REQUESTS_PER_SEC, "requests")); metricTypeMap.put(FAILED_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(FAILED_PRODUCE_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FAILED_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_FETCH_REQUESTS_PER_SEC, "requests")); metricTypeMap.put(FAILED_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FAILED_SHARE_FETCH_REQUESTS_PER_SEC, new MeterWrapper(FAILED_SHARE_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, new MeterWrapper(FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_PRODUCE_REQUESTS_PER_SEC, "requests")); metricTypeMap.put(TOTAL_PRODUCE_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_PRODUCE_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_FETCH_REQUESTS_PER_SEC, "requests")); metricTypeMap.put(TOTAL_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_SHARE_FETCH_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_SHARE_FETCH_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, new MeterWrapper(TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC, "requests"));
metricTypeMap.put(FETCH_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(FETCH_MESSAGE_CONVERSIONS_PER_SEC, "requests")); metricTypeMap.put(FETCH_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(FETCH_MESSAGE_CONVERSIONS_PER_SEC, "requests"));
metricTypeMap.put(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, "requests")); metricTypeMap.put(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, new MeterWrapper(PRODUCE_MESSAGE_CONVERSIONS_PER_SEC, "requests"));
metricTypeMap.put(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, new MeterWrapper(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, "requests")); metricTypeMap.put(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, new MeterWrapper(NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC, "requests"));
@ -190,6 +198,14 @@ public final class BrokerTopicMetrics {
return metricTypeMap.get(FAILED_FETCH_REQUESTS_PER_SEC).meter(); return metricTypeMap.get(FAILED_FETCH_REQUESTS_PER_SEC).meter();
} }
public Meter failedShareFetchRequestRate() {
return metricTypeMap.get(FAILED_SHARE_FETCH_REQUESTS_PER_SEC).meter();
}
public Meter failedShareAcknowledgementRequestRate() {
return metricTypeMap.get(FAILED_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC).meter();
}
public Meter totalProduceRequestRate() { public Meter totalProduceRequestRate() {
return metricTypeMap.get(TOTAL_PRODUCE_REQUESTS_PER_SEC).meter(); return metricTypeMap.get(TOTAL_PRODUCE_REQUESTS_PER_SEC).meter();
} }
@ -198,6 +214,14 @@ public final class BrokerTopicMetrics {
return metricTypeMap.get(TOTAL_FETCH_REQUESTS_PER_SEC).meter(); return metricTypeMap.get(TOTAL_FETCH_REQUESTS_PER_SEC).meter();
} }
public Meter totalShareFetchRequestRate() {
return metricTypeMap.get(TOTAL_SHARE_FETCH_REQUESTS_PER_SEC).meter();
}
public Meter totalShareAcknowledgementRequestRate() {
return metricTypeMap.get(TOTAL_SHARE_ACKNOWLEDGEMENTS_REQUESTS_PER_SEC).meter();
}
public Meter fetchMessageConversionsRate() { public Meter fetchMessageConversionsRate() {
return metricTypeMap.get(FETCH_MESSAGE_CONVERSIONS_PER_SEC).meter(); return metricTypeMap.get(FETCH_MESSAGE_CONVERSIONS_PER_SEC).meter();
} }

View File

@ -263,4 +263,9 @@ public class BrokerTopicStats implements AutoCloseable {
public BrokerTopicMetrics allTopicsStats() { public BrokerTopicMetrics allTopicsStats() {
return allTopicsStats; return allTopicsStats;
} }
// Visible for testing, should only be required for testing.
public int numTopics() {
return stats.size();
}
} }