KAFKA-18494-3: solution for the bug relating to gaps in the share partition cachedStates post initialization (#18696)

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Chirag Wadhwa 2025-02-05 20:46:25 +05:30 committed by GitHub
parent 0bd1ff936f
commit 01587d09d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1202 additions and 9 deletions

View File

@ -287,6 +287,12 @@ public class SharePartition {
*/
private long endOffset;
/**
* The initial read gap offset tracks if there are any gaps in the in-flight batch during intial
* read of the share partition state from the persister.
*/
private InitialReadGapOffset initialReadGapOffset;
/**
* We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently.
*/
@ -444,6 +450,11 @@ public class SharePartition {
stateEpoch = partitionData.stateEpoch();
List<PersisterStateBatch> stateBatches = partitionData.stateBatches();
long gapStartOffset = -1;
// The previousBatchLastOffset is used to track the last offset of the previous batch.
// For the first batch that should ideally start from startOffset if there are no gaps,
// we assume the previousBatchLastOffset to be startOffset - 1.
long previousBatchLastOffset = startOffset - 1;
for (PersisterStateBatch stateBatch : stateBatches) {
if (stateBatch.firstOffset() < startOffset) {
log.error("Invalid state batch found for the share partition: {}-{}. The base offset: {}"
@ -452,6 +463,10 @@ public class SharePartition {
throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition));
return;
}
if (gapStartOffset == -1 && stateBatch.firstOffset() > previousBatchLastOffset + 1) {
gapStartOffset = previousBatchLastOffset + 1;
}
previousBatchLastOffset = stateBatch.lastOffset();
InFlightBatch inFlightBatch = new InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
stateBatch.lastOffset(), RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), null);
cachedState.put(stateBatch.firstOffset(), inFlightBatch);
@ -462,6 +477,10 @@ public class SharePartition {
// in the cached state are not missed
findNextFetchOffset.set(true);
endOffset = cachedState.lastEntry().getValue().lastOffset();
// initialReadGapOffset is not required, if there are no gaps in the read state response
if (gapStartOffset != -1) {
initialReadGapOffset = new InitialReadGapOffset(endOffset, gapStartOffset);
}
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
@ -538,8 +557,20 @@ public class SharePartition {
}
long nextFetchOffset = -1;
long gapStartOffset = isInitialReadGapOffsetWindowActive() ? initialReadGapOffset.gapStartOffset() : -1;
for (Map.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
// Check if there exists any gap in the in-flight batch which needs to be fetched. If
// initialReadGapOffset's endOffset is equal to the share partition's endOffset, then
// only the initial gaps should be considered. Once share partition's endOffset is past
// initial read end offset then all gaps are anyway fetched.
if (isInitialReadGapOffsetWindowActive()) {
if (entry.getKey() > gapStartOffset) {
nextFetchOffset = gapStartOffset;
break;
}
gapStartOffset = entry.getValue().lastOffset() + 1;
}
// Check if the state is maintained per offset or batch. If the offsetState
// is not maintained then the batch state is used to determine the offsets state.
if (entry.getValue().offsetState() == null) {
@ -638,6 +669,9 @@ public class SharePartition {
List<AcquiredRecords> result = new ArrayList<>();
// The acquired count is used to track the number of records acquired for the request.
int acquiredCount = 0;
// This tracks whether there is a gap between the subMap entries. If a gap is found, we will acquire
// the corresponding offsets in a separate batch.
long maybeGapStartOffset = baseOffset;
// The fetched records are already part of the in-flight records. The records might
// be available for re-delivery hence try acquiring same. The request batches could
// be an exact match, subset or span over multiple already fetched batches.
@ -648,6 +682,26 @@ public class SharePartition {
}
InFlightBatch inFlightBatch = entry.getValue();
// If the initialReadGapOffset window is active, we need to treat the gaps in between the window as
// acquirable. Once the window is inactive (when we have acquired all the gaps inside the window),
// the remaining gaps are natural (data does not exist at those offsets) and we need not acquire them.
if (isInitialReadGapOffsetWindowActive()) {
// If nextBatchStartOffset is less than the key of the entry, this means the fetch happened for a gap in the cachedState.
// Thus, a new batch needs to be acquired for the gap.
if (maybeGapStartOffset < entry.getKey()) {
ShareAcquiredRecords shareAcquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
maybeGapStartOffset, entry.getKey() - 1, batchSize, maxFetchRecords);
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
// Set nextBatchStartOffset as the last offset of the current in-flight batch + 1
maybeGapStartOffset = inFlightBatch.lastOffset() + 1;
// If the acquired count is equal to the max fetch records then break the loop.
if (acquiredCount >= maxFetchRecords) {
break;
}
}
// Compute if the batch is a full match.
boolean fullMatch = checkForFullMatch(inFlightBatch, firstBatch.baseOffset(), lastBatch.lastOffset());
@ -715,6 +769,9 @@ public class SharePartition {
result.addAll(shareAcquiredRecords.acquiredRecords());
acquiredCount += shareAcquiredRecords.count();
}
if (!result.isEmpty()) {
maybeUpdateReadGapFetchOffset(result.get(result.size() - 1).lastOffset() + 1);
}
return new ShareAcquiredRecords(result, acquiredCount);
} finally {
lock.writeLock().unlock();
@ -1177,6 +1234,24 @@ public class SharePartition {
};
}
// Method to reduce the window that tracks gaps in the cachedState
private void maybeUpdateReadGapFetchOffset(long offset) {
lock.writeLock().lock();
try {
if (initialReadGapOffset != null) {
if (initialReadGapOffset.endOffset() == endOffset) {
initialReadGapOffset.gapStartOffset(offset);
} else {
// The initial read gap offset is not valid anymore as the end offset has moved
// beyond the initial read gap offset. Hence, reset the initial read gap offset.
initialReadGapOffset = null;
}
}
} finally {
lock.writeLock().unlock();
}
}
private ShareAcquiredRecords acquireNewBatchRecords(
String memberId,
Iterable<? extends RecordBatch> batches,
@ -1212,7 +1287,15 @@ public class SharePartition {
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
// If the new batch acquired is part of a gap in the cachedState, then endOffset should not be updated.
// Ex. if startOffset is 10, endOffset is 30, there is a gap from 10 to 20, and an inFlight batch from 21 to 30.
// In this case, the nextFetchOffset results in 10 and the records are fetched. A new batch is acquired from
// 10 to 20, but the endOffset remains at 30.
if (lastAcquiredOffset > endOffset) {
endOffset = lastAcquiredOffset;
}
maybeUpdateReadGapFetchOffset(lastAcquiredOffset + 1);
return new ShareAcquiredRecords(acquiredRecords, (int) (lastAcquiredOffset - firstAcquiredOffset + 1));
} finally {
lock.writeLock().unlock();
@ -1791,10 +1874,23 @@ public class SharePartition {
long firstKeyToRemove = cachedState.firstKey();
long lastKeyToRemove;
NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged);
// If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed.
if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
startOffset = cachedState.higherKey(lastOffsetAcknowledged);
if (isInitialReadGapOffsetWindowActive()) {
// This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged.
// Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked.
// There is a gap from 21 to 30. Let the initialReadGapOffset.gapStartOffset be 21. In this case,
// lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset
// of next cachedState batch (next cachedState batch is 31 to 40). There is an acquirable gap in between (21 to 30)
// and The startOffset should be at 21. Hence, we set startOffset to the minimum of initialReadGapOffset.gapStartOffset
// and higher key of lastOffsetAcknowledged
startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
}
lastKeyToRemove = entry.getKey();
} else {
// The code will reach this point only if lastOffsetAcknowledged is in the middle of some stateBatch. In this case
// we can simply move the startOffset to the next offset of lastOffsetAcknowledged and should consider any read gap offsets.
startOffset = lastOffsetAcknowledged + 1;
if (entry.getKey().equals(cachedState.firstKey())) {
// If the first batch in cachedState has some records yet to be acknowledged,
@ -1825,8 +1921,17 @@ public class SharePartition {
NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(startOffset);
if (entry == null) {
log.error("The start offset: {} is not found in the cached state for share partition: {}-{}."
+ " Cannot move the start offset.", startOffset, groupId, topicIdPartition);
// The start offset is not found in the cached state when there is a gap starting at the start offset.
// For example, if the start offset is 10 and the cached state has batches -> { (21, 30), (31, 40) }.
// This case arises only when the share partition is initialized and the read state response results in
// state batches containing gaps. This situation is possible in the case where in the previous instance
// of this share partition, the gap offsets were fetched but not acknowledged, and the next batch of offsets
// were fetched as well as acknowledged. In the above example, possibly in the previous instance of the share
// partition, the batch 10-20 was fetched but not acknowledged and the batch 21-30 was fetched and acknowledged.
// Thus, the persister has no clue about what happened with the batch 10-20. During the re-initialization of
// the share partition, the start offset is set to 10 and the cached state has the batch 21-30, resulting in a gap.
log.debug("The start offset: {} is not found in the cached state for share partition: {}-{} " +
"as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition);
return false;
}
RecordState startOffsetState = entry.getValue().offsetState == null ?
@ -1835,6 +1940,10 @@ public class SharePartition {
return isRecordStateAcknowledged(startOffsetState);
}
private boolean isInitialReadGapOffsetWindowActive() {
return initialReadGapOffset != null && initialReadGapOffset.endOffset() == endOffset;
}
/**
* The record state is considered acknowledged if it is either acknowledged or archived.
* These are terminal states for the record.
@ -1847,12 +1956,18 @@ public class SharePartition {
return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED;
}
private long findLastOffsetAcknowledged() {
lock.readLock().lock();
// Visible for testing
long findLastOffsetAcknowledged() {
long lastOffsetAcknowledged = -1;
lock.readLock().lock();
try {
for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) {
InFlightBatch inFlightBatch = entry.getValue();
if (isInitialReadGapOffsetWindowActive() && inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
return lastOffsetAcknowledged;
}
if (inFlightBatch.offsetState() == null) {
if (!isRecordStateAcknowledged(inFlightBatch.batchState())) {
return lastOffsetAcknowledged;
@ -2205,6 +2320,40 @@ public class SharePartition {
return timer;
}
// Visible for testing
InitialReadGapOffset initialReadGapOffset() {
return initialReadGapOffset;
}
/**
* The InitialReadGapOffset class is used to record the gap start and end offset of the probable gaps
* of available records which are neither known to Persister nor to SharePartition. Share Partition
* will use this information to determine the next fetch offset and should try to fetch the records
* in the gap.
*/
// Visible for Testing
static class InitialReadGapOffset {
private final long endOffset;
private long gapStartOffset;
InitialReadGapOffset(long endOffset, long gapStartOffset) {
this.endOffset = endOffset;
this.gapStartOffset = gapStartOffset;
}
long endOffset() {
return endOffset;
}
long gapStartOffset() {
return gapStartOffset;
}
void gapStartOffset(long gapStartOffset) {
this.gapStartOffset = gapStartOffset;
}
}
// Visible for testing
final class AcquisitionLockTimerTask extends TimerTask {
private final long expirationMs;

File diff suppressed because it is too large Load Diff