KAFKA-18733: Updating share group record acks metric (2/N) (#18924)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-02-17 18:12:58 +00:00 committed by GitHub
parent 98a7ce5caa
commit 06ce3e890b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 74 additions and 5 deletions

View File

@ -305,7 +305,15 @@ public class SharePartitionManager implements AutoCloseable {
future.complete(throwable);
return;
}
acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(shareGroupMetrics::recordAcknowledgement));
acknowledgePartitionBatches.forEach(batch -> {
// Client can either send a single entry in acknowledgeTypes which represents
// the state of the complete batch or can send individual offsets state.
if (batch.acknowledgeTypes().size() == 1) {
shareGroupMetrics.recordAcknowledgement(batch.acknowledgeTypes().get(0), batch.lastOffset() - batch.firstOffset() + 1);
} else {
batch.acknowledgeTypes().forEach(shareGroupMetrics::recordAcknowledgement);
}
});
future.complete(null);
});

View File

@ -1571,9 +1571,9 @@ public class SharePartitionManagerTest {
assertEquals(0, result.get(tp3).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp3).errorCode());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
assertEquals(42, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
assertEquals(35, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
assertEquals(18, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).meanRate() > 0);
@ -1587,6 +1587,57 @@ public class SharePartitionManagerTest {
shareGroupMetrics.close();
}
@Test
public void testAcknowledgeIndividualOffsets() throws Exception {
String groupId = "grp";
String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
List<ShareAcknowledgementBatch> ack1 = List.of(
new ShareAcknowledgementBatch(12, 12, List.of((byte) 1)));
List<ShareAcknowledgementBatch> ack2 = List.of(
new ShareAcknowledgementBatch(15, 20, List.of((byte) 2, (byte) 3, (byte) 2, (byte) 2, (byte) 3, (byte) 2)));
when(sp1.acknowledge(memberId, ack1)).thenReturn(CompletableFuture.completedFuture(null));
when(sp2.acknowledge(memberId, ack2)).thenReturn(CompletableFuture.completedFuture(null));
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withShareGroupMetrics(shareGroupMetrics)
.withBrokerTopicStats(brokerTopicStats)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = Map.of(tp1, ack1, tp2, ack2);
CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> resultFuture =
sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = resultFuture.join();
assertEquals(2, result.size());
assertTrue(result.containsKey(tp1));
assertTrue(result.containsKey(tp2));
assertEquals(0, result.get(tp1).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
assertEquals(0, result.get(tp2).partitionIndex());
assertEquals(Errors.NONE.code(), result.get(tp2).errorCode());
assertEquals(1, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
assertEquals(4, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
assertEquals(2, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).meanRate() > 0);
assertTrue(shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).meanRate() > 0);
shareGroupMetrics.close();
}
@Test
public void testAcknowledgeIncorrectGroupId() {
String groupId = "grp";
@ -1598,9 +1649,11 @@ public class SharePartitionManagerTest {
Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
partitionCacheMap.put(new SharePartitionKey(groupId, tp), sp);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
sharePartitionManager = SharePartitionManagerBuilder.builder()
.withPartitionCacheMap(partitionCacheMap)
.withBrokerTopicStats(brokerTopicStats)
.withShareGroupMetrics(shareGroupMetrics)
.build();
Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<>();
@ -1616,6 +1669,10 @@ public class SharePartitionManagerTest {
assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage());
// No metric should be recorded as acknowledge failed.
assertEquals(0, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
assertEquals(0, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
assertEquals(0, shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
// Should have 1 acknowledge recorded and 1 failed.
validateBrokerTopicStatsMetrics(
brokerTopicStats,

View File

@ -62,9 +62,13 @@ public class ShareGroupMetrics implements AutoCloseable {
}
public void recordAcknowledgement(byte ackType) {
recordAcknowledgement(ackType, 1);
}
public void recordAcknowledgement(byte ackType, long count) {
// unknown ack types (such as gaps for control records) are intentionally ignored
if (recordAcknowledgementMeterMap.containsKey(ackType)) {
recordAcknowledgementMeterMap.get(ackType).mark();
recordAcknowledgementMeterMap.get(ackType).mark(count);
}
}