diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 3ca11cbab1f..2c330076733 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -884,8 +884,7 @@ public class SharePartition { CompletableFuture future = new CompletableFuture<>(); Throwable throwable = null; - List updatedStates = new ArrayList<>(); - List stateBatches = new ArrayList<>(); + List persisterBatches = new ArrayList<>(); lock.writeLock().lock(); try { // Avoided using enhanced for loop as need to check if the last batch have offsets @@ -925,8 +924,7 @@ public class SharePartition { batch, recordStateMap, subMap, - updatedStates, - stateBatches + persisterBatches ); if (ackThrowable.isPresent()) { @@ -939,7 +937,7 @@ public class SharePartition { } // If the acknowledgement is successful then persist state, complete the state transition // and update the cached state for start offset. Else rollback the state transition. - rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches); + rollbackOrProcessStateUpdates(future, throwable, persisterBatches); return future; } @@ -955,8 +953,7 @@ public class SharePartition { CompletableFuture future = new CompletableFuture<>(); Throwable throwable = null; - List updatedStates = new ArrayList<>(); - List stateBatches = new ArrayList<>(); + List persisterBatches = new ArrayList<>(); lock.writeLock().lock(); try { @@ -975,14 +972,14 @@ public class SharePartition { } if (inFlightBatch.offsetState() != null) { - Optional releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches); + Optional releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForPerOffsetBatch(memberId, inFlightBatch, recordState, persisterBatches); if (releaseAcquiredRecordsThrowable.isPresent()) { throwable = releaseAcquiredRecordsThrowable.get(); break; } continue; } - Optional releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, updatedStates, stateBatches); + Optional releaseAcquiredRecordsThrowable = releaseAcquiredRecordsForCompleteBatch(memberId, inFlightBatch, recordState, persisterBatches); if (releaseAcquiredRecordsThrowable.isPresent()) { throwable = releaseAcquiredRecordsThrowable.get(); break; @@ -993,7 +990,7 @@ public class SharePartition { } // If the release acquired records is successful then persist state, complete the state transition // and update the cached state for start offset. Else rollback the state transition. - rollbackOrProcessStateUpdates(future, throwable, updatedStates, stateBatches); + rollbackOrProcessStateUpdates(future, throwable, persisterBatches); return future; } @@ -1004,8 +1001,7 @@ public class SharePartition { private Optional releaseAcquiredRecordsForPerOffsetBatch(String memberId, InFlightBatch inFlightBatch, RecordState recordState, - List updatedStates, - List stateBatches) { + List persisterBatches) { log.trace("Offset tracked batch record found, batch: {} for the share partition: {}-{}", inFlightBatch, groupId, topicIdPartition); @@ -1032,10 +1028,9 @@ public class SharePartition { return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the offset")); } - // Successfully updated the state of the offset. - updatedStates.add(updateResult); - stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), - updateResult.state().id(), (short) updateResult.deliveryCount())); + // Successfully updated the state of the offset and created a persister state batch for write to persister. + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(), + offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount()))); // Do not update the next fetch offset as the offset has not completed the transition yet. } } @@ -1045,8 +1040,7 @@ public class SharePartition { private Optional releaseAcquiredRecordsForCompleteBatch(String memberId, InFlightBatch inFlightBatch, RecordState recordState, - List updatedStates, - List stateBatches) { + List persisterBatches) { // Check if member id is the owner of the batch. if (!inFlightBatch.batchMemberId().equals(memberId) && !inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) { @@ -1072,10 +1066,9 @@ public class SharePartition { return Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch")); } - // Successfully updated the state of the batch. - updatedStates.add(updateResult); - stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), - updateResult.state().id(), (short) updateResult.deliveryCount())); + // Successfully updated the state of the batch and created a persister state batch for write to persister. + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(inFlightBatch.firstOffset(), + inFlightBatch.lastOffset(), updateResult.state().id(), (short) updateResult.deliveryCount()))); // Do not update the next fetch offset as the batch has not completed the transition yet. } return Optional.empty(); @@ -1826,8 +1819,7 @@ public class SharePartition { ShareAcknowledgementBatch batch, Map recordStateMap, NavigableMap subMap, - final List updatedStates, - List stateBatches + List persisterBatches ) { Optional throwable; lock.writeLock().lock(); @@ -1889,11 +1881,11 @@ public class SharePartition { } throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch, - recordStateMap, updatedStates, stateBatches); + recordStateMap, persisterBatches); } else { // The in-flight batch is a full match hence change the state of the complete batch. throwable = acknowledgeCompleteBatch(batch, inFlightBatch, - recordStateMap.get(batch.firstOffset()), updatedStates, stateBatches); + recordStateMap.get(batch.firstOffset()), persisterBatches); } if (throwable.isPresent()) { @@ -1930,8 +1922,7 @@ public class SharePartition { ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, Map recordStateMap, - List updatedStates, - List stateBatches + List persisterBatches ) { lock.writeLock().lock(); try { @@ -1995,10 +1986,9 @@ public class SharePartition { return Optional.of(new InvalidRecordStateException( "Unable to acknowledge records for the batch")); } - // Successfully updated the state of the offset. - updatedStates.add(updateResult); - stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), - updateResult.state().id(), (short) updateResult.deliveryCount())); + // Successfully updated the state of the offset and created a persister state batch for write to persister. + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(offsetState.getKey(), + offsetState.getKey(), updateResult.state().id(), (short) updateResult.deliveryCount()))); // Do not update the nextFetchOffset as the offset has not completed the transition yet. } } finally { @@ -2011,8 +2001,7 @@ public class SharePartition { ShareAcknowledgementBatch batch, InFlightBatch inFlightBatch, RecordState recordState, - List updatedStates, - List stateBatches + List persisterBatches ) { lock.writeLock().lock(); try { @@ -2044,11 +2033,9 @@ public class SharePartition { new InvalidRecordStateException("Unable to acknowledge records for the batch")); } - // Successfully updated the state of the batch. - updatedStates.add(updateResult); - stateBatches.add( - new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(), - updateResult.state().id(), (short) updateResult.deliveryCount())); + // Successfully updated the state of the batch and created a persister state batch for write to persister. + persisterBatches.add(new PersisterBatch(updateResult, new PersisterStateBatch(inFlightBatch.firstOffset(), + inFlightBatch.lastOffset(), updateResult.state().id(), (short) updateResult.deliveryCount()))); // Do not update the next fetch offset as the batch has not completed the transition yet. } finally { lock.writeLock().unlock(); @@ -2090,8 +2077,7 @@ public class SharePartition { void rollbackOrProcessStateUpdates( CompletableFuture future, Throwable throwable, - List updatedStates, - List stateBatches + List persisterBatches ) { lock.writeLock().lock(); try { @@ -2099,9 +2085,9 @@ public class SharePartition { // Log in DEBUG to avoid flooding of logs for a faulty client. log.debug("Request failed for updating state, rollback any changed state" + " for the share partition: {}-{}", groupId, topicIdPartition); - updatedStates.forEach(state -> { - state.completeStateTransition(false); - if (state.state() == RecordState.AVAILABLE) { + persisterBatches.forEach(persisterBatch -> { + persisterBatch.updatedState.completeStateTransition(false); + if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) { updateFindNextFetchOffset(true); } }); @@ -2109,7 +2095,7 @@ public class SharePartition { return; } - if (stateBatches.isEmpty() && updatedStates.isEmpty()) { + if (persisterBatches.isEmpty()) { future.complete(null); return; } @@ -2117,47 +2103,48 @@ public class SharePartition { lock.writeLock().unlock(); } - writeShareGroupState(stateBatches).whenComplete((result, exception) -> { - // There can be a pending delayed share fetch requests for the share partition which are waiting - // on the startOffset to move ahead, hence track if the state is updated in the cache. If - // yes, then notify the delayed share fetch purgatory to complete the pending requests. - boolean cacheStateUpdated = false; - lock.writeLock().lock(); - try { - if (exception != null) { - log.debug("Failed to write state to persister for the share partition: {}-{}", - groupId, topicIdPartition, exception); - // In case of failure when transition state is rolled back then it should be rolled - // back to ACQUIRED state, unless acquisition lock for the state has expired. - updatedStates.forEach(state -> { - state.completeStateTransition(false); - if (state.state() == RecordState.AVAILABLE) { + writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList()) + .whenComplete((result, exception) -> { + // There can be a pending delayed share fetch requests for the share partition which are waiting + // on the startOffset to move ahead, hence track if the state is updated in the cache. If + // yes, then notify the delayed share fetch purgatory to complete the pending requests. + boolean cacheStateUpdated = false; + lock.writeLock().lock(); + try { + if (exception != null) { + log.debug("Failed to write state to persister for the share partition: {}-{}", + groupId, topicIdPartition, exception); + // In case of failure when transition state is rolled back then it should be rolled + // back to ACQUIRED state, unless acquisition lock for the state has expired. + persisterBatches.forEach(persisterBatch -> { + persisterBatch.updatedState.completeStateTransition(false); + if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) { + updateFindNextFetchOffset(true); + } + }); + future.completeExceptionally(exception); + return; + } + + log.trace("State change request successful for share partition: {}-{}", + groupId, topicIdPartition); + persisterBatches.forEach(persisterBatch -> { + persisterBatch.updatedState.completeStateTransition(true); + if (persisterBatch.updatedState.state() == RecordState.AVAILABLE) { updateFindNextFetchOffset(true); } }); - future.completeExceptionally(exception); - return; + // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. + cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); + future.complete(null); + } finally { + lock.writeLock().unlock(); + // Maybe complete the delayed share fetch request if the state has been changed in cache + // which might have moved start offset ahead. Hence, the pending delayed share fetch + // request can be completed. The call should be made outside the lock to avoid deadlock. + maybeCompleteDelayedShareFetchRequest(cacheStateUpdated); } - - log.trace("State change request successful for share partition: {}-{}", - groupId, topicIdPartition); - updatedStates.forEach(state -> { - state.completeStateTransition(true); - if (state.state() == RecordState.AVAILABLE) { - updateFindNextFetchOffset(true); - } - }); - // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. - cacheStateUpdated = maybeUpdateCachedStateAndOffsets(); - future.complete(null); - } finally { - lock.writeLock().unlock(); - // Maybe complete the delayed share fetch request if the state has been changed in cache - // which might have moved start offset ahead. Hence, the pending delayed share fetch - // request can be completed. The call should be made outside the lock to avoid deadlock. - maybeCompleteDelayedShareFetchRequest(cacheStateUpdated); - } - }); + }); } private boolean maybeUpdateCachedStateAndOffsets() { @@ -2929,6 +2916,15 @@ public class SharePartition { } } + /** + * PersisterBatch class is used to record the state updates for a batch or an offset. + * It contains the updated in-flight state and the persister state batch to be sent to persister. + */ + private record PersisterBatch( + InFlightState updatedState, + PersisterStateBatch stateBatch + ) { } + /** * LastOffsetAndMaxRecords class is used to track the last offset to acquire and the maximum number * of records that can be acquired in a fetch request.