MINOR: Updated name from messages to records for consistency in share partition (#20416)

Minor PR to update name of maxInFlightMessages to maxInFlightRecords to
maintain consistency in share partition related classes.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Apoorv Mittal 2025-08-28 13:52:04 +01:00 committed by GitHub
parent 2cc66f12c3
commit 6956417a3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 58 deletions

View File

@ -175,11 +175,11 @@ public class SharePartition {
private final AtomicReference<Uuid> fetchLock;
/**
* The max in-flight messages is used to limit the number of records that can be in-flight at any
* given time. The max in-flight messages is used to prevent the consumer from fetching too many
* The max in-flight records is used to limit the number of records that can be in-flight at any
* given time. The max in-flight records is used to prevent the consumer from fetching too many
* records from the leader and running out of memory.
*/
private final int maxInFlightMessages;
private final int maxInFlightRecords;
/**
* The max delivery count is used to limit the number of times a record can be delivered to the
@ -305,7 +305,7 @@ public class SharePartition {
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxInFlightRecords,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
@ -315,7 +315,7 @@ public class SharePartition {
GroupConfigManager groupConfigManager,
SharePartitionListener listener
) {
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
this(groupId, topicIdPartition, leaderEpoch, maxInFlightRecords, maxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY, listener,
new SharePartitionMetrics(groupId, topicIdPartition.topic(), topicIdPartition.partition()));
}
@ -326,7 +326,7 @@ public class SharePartition {
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxInFlightRecords,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
@ -341,7 +341,7 @@ public class SharePartition {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
this.leaderEpoch = leaderEpoch;
this.maxInFlightMessages = maxInFlightMessages;
this.maxInFlightRecords = maxInFlightRecords;
this.maxDeliveryCount = maxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
this.lock = new ReentrantReadWriteLock();
@ -1302,7 +1302,7 @@ public class SharePartition {
/**
* Checks if the records can be acquired for the share partition. The records can be acquired if
* the number of records in-flight is less than the max in-flight messages. Or if the fetch is
* the number of records in-flight is less than the max in-flight records. Or if the fetch is
* to happen somewhere in between the record states cached in the share partition i.e. re-acquire
* the records that are already fetched before.
*
@ -1312,7 +1312,7 @@ public class SharePartition {
if (nextFetchOffset() != endOffset() + 1) {
return true;
}
return numInFlightRecords() < maxInFlightMessages;
return numInFlightRecords() < maxInFlightRecords;
}
/**
@ -1492,7 +1492,7 @@ public class SharePartition {
/**
* The method calculates the last offset and maximum records to acquire. The adjustment is needed
* to ensure that the records acquired do not exceed the maximum in-flight messages limit.
* to ensure that the records acquired do not exceed the maximum in-flight records limit.
*
* @param fetchOffset The offset from which the records are fetched.
* @param maxFetchRecords The maximum number of records to acquire.
@ -1500,16 +1500,16 @@ public class SharePartition {
* @return LastOffsetAndMaxRecords object, containing the last offset to acquire and the maximum records to acquire.
*/
private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long fetchOffset, int maxFetchRecords, long lastOffset) {
// There can always be records fetched exceeding the max in-flight messages limit. Hence,
// we need to check if the share partition has reached the max in-flight messages limit
// There can always be records fetched exceeding the max in-flight records limit. Hence,
// we need to check if the share partition has reached the max in-flight records limit
// and only acquire limited records.
int maxRecordsToAcquire;
long lastOffsetToAcquire = lastOffset;
lock.readLock().lock();
try {
int inFlightRecordsCount = numInFlightRecords();
// Take minimum of maxFetchRecords and remaining capacity to fill max in-flight messages limit.
maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightMessages - inFlightRecordsCount);
// Take minimum of maxFetchRecords and remaining capacity to fill max in-flight records limit.
maxRecordsToAcquire = Math.min(maxFetchRecords, maxInFlightRecords - inFlightRecordsCount);
// If the maxRecordsToAcquire is less than or equal to 0, then ideally (check exists to not
// fetch records for share partitions which are at capacity) the fetch must be happening
// in-between the in-flight batches i.e. some in-flight records have been released (marked
@ -1522,15 +1522,15 @@ public class SharePartition {
if (maxRecordsToAcquire <= 0) {
if (fetchOffset <= endOffset()) {
// Adjust the max records to acquire to the capacity available to fill the max
// in-flight messages limit. This can happen when the fetch is happening in-between
// the in-flight batches and the share partition has reached the max in-flight messages limit.
// in-flight records limit. This can happen when the fetch is happening in-between
// the in-flight batches and the share partition has reached the max in-flight records limit.
maxRecordsToAcquire = Math.min(maxFetchRecords, (int) (endOffset() - fetchOffset + 1));
// Adjust the last offset to acquire to the endOffset of the share partition.
lastOffsetToAcquire = endOffset();
} else {
// The share partition is already at max in-flight messages, hence cannot acquire more records.
log.debug("Share partition {}-{} has reached max in-flight messages limit: {}. Cannot acquire more records, inflight records count: {}",
groupId, topicIdPartition, maxInFlightMessages, inFlightRecordsCount);
// The share partition is already at max in-flight records, hence cannot acquire more records.
log.debug("Share partition {}-{} has reached max in-flight records limit: {}. Cannot acquire more records, inflight records count: {}",
groupId, topicIdPartition, maxInFlightRecords, inFlightRecordsCount);
}
}
} finally {
@ -1558,13 +1558,13 @@ public class SharePartition {
firstAcquiredOffset = endOffset;
}
// Check how many messages can be acquired from the batch.
// Check how many records can be acquired from the batch.
long lastAcquiredOffset = lastOffset;
if (maxFetchRecords < lastAcquiredOffset - firstAcquiredOffset + 1) {
// The max messages to acquire is less than the complete available batches hence
// The max records to acquire is less than the complete available batches hence
// limit the acquired records. The last offset shall be the batches last offset
// which falls under the max messages limit. As the max fetch records is the soft
// limit, the last offset can be higher than the max messages.
// which falls under the max records limit. As the max fetch records is the soft
// limit, the last offset can be higher than the max records.
lastAcquiredOffset = lastOffsetFromBatchWithRequestOffset(batches, firstAcquiredOffset + maxFetchRecords - 1);
// If the initial read gap offset window is active then it's not guaranteed that the
// batches align on batch boundaries. Hence, reset to last offset itself if the batch's
@ -2193,7 +2193,7 @@ public class SharePartition {
a) Only full batches can be removed from the cachedState, For example if there is batch (0-99)
and 0-49 records are acknowledged (ACCEPT or REJECT), the first 50 records will not be removed
from the cachedState. Instead, the startOffset will be moved to 50, but the batch will only
be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT).
be removed once all the records (0-99) are acknowledged (ACCEPT or REJECT).
*/
// Since only a subMap will be removed, we need to find the first and last keys of that subMap

View File

@ -122,9 +122,9 @@ public class SharePartitionManager implements AutoCloseable {
private final Timer timer;
/**
* The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition.
* The max in flight records is the maximum number of records that can be in flight at any one time per share-partition.
*/
private final int maxInFlightMessages;
private final int maxInFlightRecords;
/**
* The max delivery count is the maximum number of times a message can be delivered before it is considered to be archived.
@ -156,7 +156,7 @@ public class SharePartitionManager implements AutoCloseable {
ShareSessionCache cache,
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
@ -168,7 +168,7 @@ public class SharePartitionManager implements AutoCloseable {
new SharePartitionCache(),
defaultRecordLockDurationMs,
maxDeliveryCount,
maxInFlightMessages,
maxInFlightRecords,
remoteFetchMaxWaitMs,
persister,
groupConfigManager,
@ -184,7 +184,7 @@ public class SharePartitionManager implements AutoCloseable {
SharePartitionCache partitionCache,
int defaultRecordLockDurationMs,
int maxDeliveryCount,
int maxInFlightMessages,
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
@ -199,7 +199,7 @@ public class SharePartitionManager implements AutoCloseable {
new SystemTimerReaper("share-group-lock-timeout-reaper",
new SystemTimer("share-group-lock-timeout")),
maxDeliveryCount,
maxInFlightMessages,
maxInFlightRecords,
remoteFetchMaxWaitMs,
persister,
groupConfigManager,
@ -217,7 +217,7 @@ public class SharePartitionManager implements AutoCloseable {
int defaultRecordLockDurationMs,
Timer timer,
int maxDeliveryCount,
int maxInFlightMessages,
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
GroupConfigManager groupConfigManager,
@ -231,7 +231,7 @@ public class SharePartitionManager implements AutoCloseable {
this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
this.timer = timer;
this.maxDeliveryCount = maxDeliveryCount;
this.maxInFlightMessages = maxInFlightMessages;
this.maxInFlightRecords = maxInFlightRecords;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
this.persister = persister;
this.groupConfigManager = groupConfigManager;
@ -710,7 +710,7 @@ public class SharePartitionManager implements AutoCloseable {
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
leaderEpoch,
maxInFlightMessages,
maxInFlightRecords,
maxDeliveryCount,
defaultRecordLockDurationMs,
timer,

View File

@ -120,7 +120,7 @@ public class SharePartitionTest {
private static final TopicIdPartition TOPIC_ID_PARTITION = new TopicIdPartition(Uuid.randomUuid(), 0, "test-topic");
private static final String MEMBER_ID = "member-1";
private static final Time MOCK_TIME = new MockTime();
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final short MAX_IN_FLIGHT_RECORDS = 200;
private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 120;
private static final int BATCH_SIZE = 500;
@ -2181,14 +2181,14 @@ public class SharePartitionTest {
}
@Test
public void testAcquireWithMaxInFlightMessagesAndTryAcquireNewBatch() {
public void testAcquireWithMaxInFlightRecordsAndTryAcquireNewBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.build();
// Acquire records, all 10 records should be acquired as within maxInflightMessages limit.
// Acquire records, all 10 records should be acquired as within maxInflightRecords limit.
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
BATCH_SIZE,
@ -2197,7 +2197,7 @@ public class SharePartitionTest {
fetchPartitionData(memoryRecords(10, 0), 0),
FETCH_ISOLATION_HWM),
10);
// Validate all 10 records will be acquired as the maxInFlightMessages is 20.
// Validate all 10 records will be acquired as the maxInFlightRecords is 20.
assertArrayEquals(expectedAcquiredRecord(0, 9, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(10, sharePartition.nextFetchOffset());
@ -2212,7 +2212,7 @@ public class SharePartitionTest {
MemoryRecords records = MemoryRecords.readableRecords(buffer);
// Acquire records, should be acquired till maxInFlightMessages i.e. 20 records. As second batch
// Acquire records, should be acquired till maxInFlightRecords i.e. 20 records. As second batch
// is ending at 24 offset, hence additional 15 records will be acquired.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
@ -2243,11 +2243,11 @@ public class SharePartitionTest {
}
@Test
public void testAcquireWithMaxInFlightMessagesAndReleaseLastOffset() {
public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withState(SharePartitionState.ACTIVE)
.withSharePartitionMetrics(sharePartitionMetrics)
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.build();
// Create 4 batches of records.
@ -2260,7 +2260,7 @@ public class SharePartitionTest {
buffer.flip();
MemoryRecords records = MemoryRecords.readableRecords(buffer);
// Acquire records, should be acquired till maxInFlightMessages i.e. 20 records till 29 offset.
// Acquire records, should be acquired till maxInFlightRecords i.e. 20 records till 29 offset.
List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
MEMBER_ID,
BATCH_SIZE,
@ -2270,7 +2270,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
20);
// Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight messages
// Validate 3 batches are fetched and fourth batch should be skipped. Max in-flight records
// limit is reached.
assertArrayEquals(expectedAcquiredRecord(10, 29, 1).toArray(), acquiredRecordsList.toArray());
assertEquals(30, sharePartition.nextFetchOffset());
@ -2360,29 +2360,29 @@ public class SharePartitionTest {
@Test
public void testCanAcquireRecordsWithEmptyCache() {
SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(1).build();
SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightRecords(1).build();
assertTrue(sharePartition.canAcquireRecords());
}
@Test
public void testCanAcquireRecordsWithCachedDataAndLimitNotReached() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(6)
.withMaxInflightRecords(6)
.withState(SharePartitionState.ACTIVE)
.build();
fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
// Limit not reached as only 6 in-flight messages is the limit.
// Limit not reached as only 6 in-flight records is the limit.
assertTrue(sharePartition.canAcquireRecords());
}
@Test
public void testCanAcquireRecordsWithCachedDataAndLimitReached() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(1)
.withMaxInflightRecords(1)
.withState(SharePartitionState.ACTIVE)
.build();
fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
// Limit reached as only one in-flight message is the limit.
// Limit reached as only one in-flight record is the limit.
assertFalse(sharePartition.canAcquireRecords());
}
@ -6582,7 +6582,7 @@ public class SharePartitionTest {
@Test
public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForBatchSubset() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.withState(SharePartitionState.ACTIVE)
.build();
@ -6608,7 +6608,7 @@ public class SharePartitionTest {
@Test
public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForEntireBatch() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.withState(SharePartitionState.ACTIVE)
.build();
fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 15);
@ -6633,7 +6633,7 @@ public class SharePartitionTest {
@Test
public void testMaybeUpdateCachedStateWhenAcknowledgementsInBetween() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.withState(SharePartitionState.ACTIVE)
.build();
@ -6662,7 +6662,7 @@ public class SharePartitionTest {
@Test
public void testMaybeUpdateCachedStateWhenAllRecordsInCachedStateAreAcknowledged() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(20)
.withMaxInflightRecords(20)
.withState(SharePartitionState.ACTIVE)
.build();
@ -6684,7 +6684,7 @@ public class SharePartitionTest {
@Test
public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(100)
.withMaxInflightRecords(100)
.withState(SharePartitionState.ACTIVE)
.build();
@ -7160,7 +7160,7 @@ public class SharePartitionTest {
@Test
public void testNextFetchOffsetWithMultipleConsumers() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightMessages(100)
.withMaxInflightRecords(100)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records1 = memoryRecords(3, 0);
@ -8900,7 +8900,7 @@ public class SharePartitionTest {
private int defaultAcquisitionLockTimeoutMs = 30000;
private int maxDeliveryCount = MAX_DELIVERY_COUNT;
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
private int maxInflightRecords = MAX_IN_FLIGHT_RECORDS;
private Persister persister = new NoOpStatePersister();
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@ -8909,8 +8909,8 @@ public class SharePartitionTest {
private Time time = MOCK_TIME;
private SharePartitionMetrics sharePartitionMetrics = Mockito.mock(SharePartitionMetrics.class);
private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) {
this.maxInflightMessages = maxInflightMessages;
private SharePartitionBuilder withMaxInflightRecords(int maxInflightRecords) {
this.maxInflightRecords = maxInflightRecords;
return this;
}
@ -8959,7 +8959,7 @@ public class SharePartitionTest {
}
public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightMessages, maxDeliveryCount,
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, maxInflightRecords, maxDeliveryCount,
defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, replicaManager, groupConfigManager,
state, Mockito.mock(SharePartitionListener.class), sharePartitionMetrics);
}