KAFKA-19517: Include control records in LoadSummary#numRecords (#20206)

## Summary
jira: https://issues.apache.org/jira/browse/KAFKA-19517
Ensure `LoadSummary#numRecords` counts all records, including control
batches, to maintain consistency with numBytes.

## Test
`testLoading` now verifies `numRecords`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, TengYao Chi
 <frankvicky@apache.org>
This commit is contained in:
Logan Zhu 2025-07-21 15:12:18 +08:00 committed by GitHub
parent 50598191dc
commit 24d03a18ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 1 deletions

View File

@ -214,6 +214,8 @@ public class CoordinatorLoaderImpl<T> implements CoordinatorLoader<T> {
for (MutableRecordBatch batch : memoryRecords.batches()) { for (MutableRecordBatch batch : memoryRecords.batches()) {
if (batch.isControlBatch()) { if (batch.isControlBatch()) {
for (Record record : batch) { for (Record record : batch) {
loadStats.numRecords++;
ControlRecordType controlRecord = ControlRecordType.parse(record.key()); ControlRecordType controlRecord = ControlRecordType.parse(record.key());
if (controlRecord == ControlRecordType.COMMIT) { if (controlRecord == ControlRecordType.COMMIT) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {

View File

@ -183,7 +183,10 @@ class CoordinatorLoaderImplTest {
when(log.read(8L, 1000, FetchIsolation.LOG_END, true)) when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
.thenReturn(readResult5); .thenReturn(readResult5);
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)); CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
assertNotNull(summary);
// Includes 7 normal + 2 control (COMMIT, ABORT)
assertEquals(9, summary.numRecords());
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1")); verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2")); verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));