KAFKA-19693: Added PersisterBatch record in Share Partition which includes updatedState and stateBatch (#20507)

The method rollbackOrProcessStateUpdates in SharePartition received 2
separate lists of updatedStates (InFlightState) and stateBatches
(PersisterStateBatch). This PR introduces a new subclass called
`PersisterBatch` which encompasses both these objects.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Chirag Wadhwa 2025-09-09 15:51:42 +05:30 committed by GitHub
parent 620a01b74b
commit d5e624e918
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 78 additions and 82 deletions

View File

@ -884,8 +884,7 @@ public class SharePartition {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null; Throwable throwable = null;
List<InFlightState> updatedStates = new ArrayList<>(); List<PersisterBatch> persisterBatches = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
// Avoided using enhanced for loop as need to check if the last batch have offsets // Avoided using enhanced for loop as need to check if the last batch have offsets
@ -925,8 +924,7 @@ public class SharePartition {
batch, batch,
recordStateMap, recordStateMap,
subMap, subMap,
updatedStates, persisterBatches
stateBatches
); );
if (ackThrowable.isPresent()) { if (ackThrowable.isPresent()) {
@ -939,7 +937,7 @@ public class SharePartition {
} }
// If the acknowledgement is successful then persist state, complete the state transition // If the acknowledgement is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition. // and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches); rollbackOrProcessStateUpdates(future, throwable, persisterBatches);
return future; return future;
} }
@ -955,8 +953,7 @@ public class SharePartition {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
Throwable throwable = null; Throwable throwable = null;
List<InFlightState> updatedStates = new ArrayList<>(); List<PersisterBatch> persisterBatches = new ArrayList<>();
List<PersisterStateBatch> stateBatches = new ArrayList<>();
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
@ -975,14 +972,14 @@ public class SharePartition {
} }
if (inFlightBatch.offsetState() != null) { if (inFlightBatch.offsetState() != null) {
Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches); Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, persisterBatches);
if (releaseAcquiredRecordsThrowable.isPresent()) { if (releaseAcquiredRecordsThrowable.isPresent()) {
throwable = releaseAcquiredRecordsThrowable.get(); throwable = releaseAcquiredRecordsThrowable.get();
break; break;
} }
continue; continue;
} }
Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches); Optional<Throwable> releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, persisterBatches);
if (releaseAcquiredRecordsThrowable.isPresent()) { if (releaseAcquiredRecordsThrowable.isPresent()) {
throwable = releaseAcquiredRecordsThrowable.get(); throwable = releaseAcquiredRecordsThrowable.get();
break; break;
@ -993,7 +990,7 @@ public class SharePartition {
} }
// If the release acquired records is successful then persist state, complete the state transition // If the release acquired records is successful then persist state, complete the state transition
// and update the cached state for start offset. Else rollback the state transition. // and update the cached state for start offset. Else rollback the state transition.
rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches); rollbackOrProcessStateUpdates(future, throwable, persisterBatches);
return future; return future;
} }
@ -1004,8 +1001,7 @@ public class SharePartition {
private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId, private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String memberId,
InFlightBatch inFlightBatch, InFlightBatch inFlightBatch,
RecordState recordState, RecordState recordState,
List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches) {
List<PersisterStateBatch> stateBatches) {
log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch, log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch,
groupId, topicIdPartition); groupId, topicIdPartition);
@ -1032,10 +1028,9 @@ public class SharePartition {
return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset")); return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset"));
} }
// Successfully updated the state of the offset. // Successfully updated the state of the offset and created a persister state batch for write to persister.
updatedStates.add(updateResult); persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(),
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount())));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the offset has not completed the transition yet. // Do not update the next fetch offset as the offset has not completed the transition yet.
} }
} }
@ -1045,8 +1040,7 @@ public class SharePartition {
private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String memberId, private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String memberId,
InFlightBatch inFlightBatch, InFlightBatch inFlightBatch,
RecordState recordState, RecordState recordState,
List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches) {
List<PersisterStateBatch> stateBatches) {
// Check if member id is the owner of the batch. // Check if member id is the owner of the batch.
if (!inFlightBatch.batchMemberId().equals(memberId) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) { if (!inFlightBatch.batchMemberId().equals(memberId) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) {
@ -1072,10 +1066,9 @@ public class SharePartition {
return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch")); return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch"));
} }
// Successfully updated the state of the batch. // Successfully updated the state of the batch and created a persister state batch for write to persister.
updatedStates.add(updateResult); persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(inFlightBatch.firstOffset(),
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), inFlightBatch.lastOffset(), updateResult.state().id(), (short) updateResult.deliveryCount())));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the batch has not completed the transition yet. // Do not update the next fetch offset as the batch has not completed the transition yet.
} }
return Optional.empty(); return Optional.empty();
@ -1826,8 +1819,7 @@ public class SharePartition {
ShareAcknowledgementBatch batch, ShareAcknowledgementBatch batch,
Map<Long, RecordState> recordStateMap, Map<Long, RecordState> recordStateMap,
NavigableMap<Long, InFlightBatch> subMap, NavigableMap<Long, InFlightBatch> subMap,
final List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches
List<PersisterStateBatch> stateBatches
) { ) {
Optional<Throwable> throwable; Optional<Throwable> throwable;
lock.writeLock().lock(); lock.writeLock().lock();
@ -1889,11 +1881,11 @@ public class SharePartition {
} }
throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch, throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch,
recordStateMap, updatedStates, stateBatches); recordStateMap, persisterBatches);
} else { } else {
// The in-flight batch is a full match hence change the state of the complete batch. // The in-flight batch is a full match hence change the state of the complete batch.
throwable = acknowledgeCompleteBatch(batch, inFlightBatch, throwable = acknowledgeCompleteBatch(batch, inFlightBatch,
recordStateMap.get(batch.firstOffset()), updatedStates, stateBatches); recordStateMap.get(batch.firstOffset()), persisterBatches);
} }
if (throwable.isPresent()) { if (throwable.isPresent()) {
@ -1930,8 +1922,7 @@ public class SharePartition {
ShareAcknowledgementBatch batch, ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch, InFlightBatch inFlightBatch,
Map<Long, RecordState> recordStateMap, Map<Long, RecordState> recordStateMap,
List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches
List<PersisterStateBatch> stateBatches
) { ) {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
@ -1995,10 +1986,9 @@ public class SharePartition {
return Optional.of(new InvalidRecordStateException( return Optional.of(new InvalidRecordStateException(
"Unable to acknowledge records for the batch")); "Unable to acknowledge records for the batch"));
} }
// Successfully updated the state of the offset. // Successfully updated the state of the offset and created a persister state batch for write to persister.
updatedStates.add(updateResult); persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(),
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount())));
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the nextFetchOffset as the offset has not completed the transition yet. // Do not update the nextFetchOffset as the offset has not completed the transition yet.
} }
} finally { } finally {
@ -2011,8 +2001,7 @@ public class SharePartition {
ShareAcknowledgementBatch batch, ShareAcknowledgementBatch batch,
InFlightBatch inFlightBatch, InFlightBatch inFlightBatch,
RecordState recordState, RecordState recordState,
List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches
List<PersisterStateBatch> stateBatches
) { ) {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
@ -2044,11 +2033,9 @@ public class SharePartition {
new InvalidRecordStateException("Unable to acknowledge records for the batch")); new InvalidRecordStateException("Unable to acknowledge records for the batch"));
} }
// Successfully updated the state of the batch. // Successfully updated the state of the batch and created a persister state batch for write to persister.
updatedStates.add(updateResult); persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(inFlightBatch.firstOffset(),
stateBatches.add( inFlightBatch.lastOffset(), updateResult.state().id(), (short) updateResult.deliveryCount())));
new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state().id(), (short) updateResult.deliveryCount()));
// Do not update the next fetch offset as the batch has not completed the transition yet. // Do not update the next fetch offset as the batch has not completed the transition yet.
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
@ -2090,8 +2077,7 @@ public class SharePartition {
void rollbackOrProcessStateUpdates( void rollbackOrProcessStateUpdates(
CompletableFuture<Void> future, CompletableFuture<Void> future,
Throwable throwable, Throwable throwable,
List<InFlightState> updatedStates, List<PersisterBatch> persisterBatches
List<PersisterStateBatch> stateBatches
) { ) {
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
@ -2099,9 +2085,9 @@ public class SharePartition {
// Log in DEBUG to avoid flooding of logs for a faulty client. // Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any changed state" log.debug("Request failed for updating state, rollback any changed state"
+ " for the share partition: {}-{}", groupId, topicIdPartition); + " for the share partition: {}-{}", groupId, topicIdPartition);
updatedStates.forEach(state -> { persisterBatches.forEach(persisterBatch -> {
state.completeStateTransition(false); persisterBatch.updatedState.completeStateTransition(false);
if (state.state() == RecordState.AVAILABLE) { if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true); updateFindNextFetchOffset(true);
} }
}); });
@ -2109,7 +2095,7 @@ public class SharePartition {
return; return;
} }
if (stateBatches.isEmpty() && updatedStates.isEmpty()) { if (persisterBatches.isEmpty()) {
future.complete(null); future.complete(null);
return; return;
} }
@ -2117,47 +2103,48 @@ public class SharePartition {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
writeShareGroupState(stateBatches).whenComplete((result, exception) -> { writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList())
// There can be a pending delayed share fetch requests for the share partition which are waiting .whenComplete((result, exception) -> {
// on the startOffset to move ahead, hence track if the state is updated in the cache. If // There can be a pending delayed share fetch requests for the share partition which are waiting
// yes, then notify the delayed share fetch purgatory to complete the pending requests. // on the startOffset to move ahead, hence track if the state is updated in the cache. If
boolean cacheStateUpdated = false; // yes, then notify the delayed share fetch purgatory to complete the pending requests.
lock.writeLock().lock(); boolean cacheStateUpdated = false;
try { lock.writeLock().lock();
if (exception != null) { try {
log.debug("Failed to write state to persister for the share partition: {}-{}", if (exception != null) {
groupId, topicIdPartition, exception); log.debug("Failed to write state to persister for the share partition: {}-{}",
// In case of failure when transition state is rolled back then it should be rolled groupId, topicIdPartition, exception);
// back to ACQUIRED state, unless acquisition lock for the state has expired. // In case of failure when transition state is rolled back then it should be rolled
updatedStates.forEach(state -> { // back to ACQUIRED state, unless acquisition lock for the state has expired.
state.completeStateTransition(false); persisterBatches.forEach(persisterBatch -> {
if (state.state() == RecordState.AVAILABLE) { persisterBatch.updatedState.completeStateTransition(false);
if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
future.completeExceptionally(exception);
return;
}
log.trace("State change request successful for share partition: {}-{}",
groupId, topicIdPartition);
persisterBatches.forEach(persisterBatch -> {
persisterBatch.updatedState.completeStateTransition(true);
if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true); updateFindNextFetchOffset(true);
} }
}); });
future.completeExceptionally(exception); // Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
return; cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
} finally {
lock.writeLock().unlock();
// Maybe complete the delayed share fetch request if the state has been changed in cache
// which might have moved start offset ahead. Hence, the pending delayed share fetch
// request can be completed. The call should be made outside the lock to avoid deadlock.
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
} }
});
log.trace("State change request successful for share partition: {}-{}",
groupId, topicIdPartition);
updatedStates.forEach(state -> {
state.completeStateTransition(true);
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
});
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
future.complete(null);
} finally {
lock.writeLock().unlock();
// Maybe complete the delayed share fetch request if the state has been changed in cache
// which might have moved start offset ahead. Hence, the pending delayed share fetch
// request can be completed. The call should be made outside the lock to avoid deadlock.
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
}
});
} }
private boolean maybeUpdateCachedStateAndOffsets() { private boolean maybeUpdateCachedStateAndOffsets() {
@ -2929,6 +2916,15 @@ public class SharePartition {
} }
} }
/**
* PersisterBatch class is used to record the state updates for a batch or an offset.
* It contains the updated in-flight state and the persister state batch to be sent to persister.
*/
private record PersisterBatch(
InFlightState updatedState,
PersisterStateBatch stateBatch
) { }
/** /**
* LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number * LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number
* of records that can be acquired in a fetch request. * of records that can be acquired in a fetch request.