diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 9a7ef4334c9..ab9f5abc6f6 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -19,9 +19,21 @@ package kafka.server.share; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.group.share.GroupTopicPartitionData; +import org.apache.kafka.server.group.share.PartitionErrorData; +import org.apache.kafka.server.group.share.PartitionFactory; +import org.apache.kafka.server.group.share.PartitionStateBatchData; +import org.apache.kafka.server.group.share.Persister; +import org.apache.kafka.server.group.share.PersisterStateBatch; +import org.apache.kafka.server.group.share.TopicData; +import org.apache.kafka.server.group.share.WriteShareGroupStateParameters; +import org.apache.kafka.server.group.share.WriteShareGroupStateResult; import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.TimerTask; @@ -31,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -38,6 +51,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -188,6 +202,11 @@ public class SharePartition { */ private final Time time; + /** + * The persister is used to persist the state of the share partition to disk. + */ + private final Persister persister; + /** * The share partition start offset specifies the partition start offset from which the records * are cached in the cachedState of the sharePartition. @@ -218,6 +237,7 @@ public class SharePartition { int recordLockDurationMs, Timer timer, Time time, + Persister persister, ReplicaManager replicaManager ) { this.groupId = groupId; @@ -231,6 +251,7 @@ public class SharePartition { this.recordLockDurationMs = recordLockDurationMs; this.timer = timer; this.time = time; + this.persister = persister; this.replicaManager = replicaManager; // Initialize the partition. initialize(); @@ -456,10 +477,68 @@ public class SharePartition { ) { log.trace("Acknowledgement batch request for share partition: {}-{}", groupId, topicIdPartition); - CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented")); + Throwable throwable = null; + lock.writeLock().lock(); + List updatedStates = new ArrayList<>(); + List stateBatches = new ArrayList<>(); + try { + // Avoided using enhanced for loop as need to check if the last batch have offsets + // in the range. + for (int i = 0; i < acknowledgementBatch.size(); i++) { + ShareAcknowledgementBatch batch = acknowledgementBatch.get(i); - return future; + // Client can either send a single entry in acknowledgeTypes which represents the state + // of the complete batch or can send individual offsets state. + Map recordStateMap; + try { + recordStateMap = fetchRecordStateMapForAcknowledgementBatch(batch); + } catch (IllegalArgumentException e) { + log.debug("Invalid acknowledge type: {} for share partition: {}-{}", + batch.acknowledgeTypes(), groupId, topicIdPartition); + throwable = new InvalidRequestException("Invalid acknowledge type: " + batch.acknowledgeTypes()); + break; + } + + if (batch.lastOffset() < startOffset) { + log.trace("All offsets in the acknowledgement batch {} are already archived: {}-{}", + batch, groupId, topicIdPartition); + continue; + } + + // Fetch the sub-map from the cached map for the batch to acknowledge. The sub-map can + // be a full match, subset or spans over multiple fetched batches. + NavigableMap subMap; + try { + subMap = fetchSubMapForAcknowledgementBatch(batch, i == acknowledgementBatch.size() - 1); + } catch (InvalidRecordStateException | InvalidRequestException e) { + throwable = e; + break; + } + + // Acknowledge the records for the batch. + Optional ackThrowable = acknowledgementBatchRecords( + memberId, + batch, + recordStateMap, + subMap, + updatedStates, + stateBatches + ); + + if (ackThrowable.isPresent()) { + throwable = ackThrowable.get(); + break; + } + } + + // If the acknowledgement is successful then persist state, complete the state transition + // and update the cached state for start offset. Else rollback the state transition. + rollbackOrProcessStateUpdates(throwable, updatedStates, stateBatches); + } finally { + lock.writeLock().unlock(); + } + + return CompletableFuture.completedFuture(Optional.ofNullable(throwable)); } /** @@ -632,6 +711,524 @@ public class SharePartition { return inFlightBatch.firstOffset() >= firstOffsetToCompare && inFlightBatch.lastOffset() <= lastOffsetToCompare; } + /** + * Check if the start offset has moved and within the request first and last offset. + * + * @param batchFirstOffset The first offset of the batch. + * @param batchLastOffset The last offset of the batch. + * + * @return True if the start offset has moved and within the request first and last offset, false otherwise. + */ + private boolean checkForStartOffsetWithinBatch(long batchFirstOffset, long batchLastOffset) { + return batchFirstOffset < startOffset && batchLastOffset >= startOffset; + } + + private Map fetchRecordStateMapForAcknowledgementBatch( + ShareAcknowledgementBatch batch) { + // Client can either send a single entry in acknowledgeTypes which represents the state + // of the complete batch or can send individual offsets state. Construct a map with record state + // for each offset in the batch, if single acknowledge type is sent, the map will have only one entry. + Map recordStateMap = new HashMap<>(); + for (int index = 0; index < batch.acknowledgeTypes().size(); index++) { + recordStateMap.put(batch.firstOffset() + index, + fetchRecordState(batch.acknowledgeTypes().get(index))); + } + return recordStateMap; + } + + private static RecordState fetchRecordState(byte acknowledgeType) { + switch (acknowledgeType) { + case 1 /* ACCEPT */: + return RecordState.ACKNOWLEDGED; + case 2 /* RELEASE */: + return RecordState.AVAILABLE; + case 3 /* REJECT */: + case 0 /* GAP */: + return RecordState.ARCHIVED; + default: + throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType); + } + } + + private NavigableMap fetchSubMapForAcknowledgementBatch( + ShareAcknowledgementBatch batch, + boolean isLastBatch + ) { + lock.writeLock().lock(); + try { + // Find the floor batch record for the request batch. The request batch could be + // for a subset of the batch i.e. cached batch of offset 10-14 and request batch + // of 12-13. Hence, floor entry is fetched to find the sub-map. + Map.Entry floorOffset = cachedState.floorEntry(batch.firstOffset()); + if (floorOffset == null) { + boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(batch.firstOffset(), batch.lastOffset()); + if (hasStartOffsetMoved) { + // If the start offset has been moved and within the request batch then fetch + // the floor entry from start offset and acknowledge cached offsets. Consider + // the case where the start offset has moved from 0 to 10, with the cached batch + // of 0 - 5, 5 - 10, 10 - 12, 12 - 15. The request batch for acknowledgement is 5 - 15, + // then post acquisition lock timeout the cache will have data from only from 10 to 15. + // Hence, we need to fetch the floor entry from start offset. + floorOffset = cachedState.floorEntry(startOffset); + } else { + log.debug("Batch record {} not found for share partition: {}-{}", batch, groupId, + topicIdPartition); + throw new InvalidRecordStateException( + "Batch record not found. The request batch offsets are not found in the cache."); + } + } + + NavigableMap subMap = cachedState.subMap(floorOffset.getKey(), true, batch.lastOffset(), true); + // Validate if the request batch has the first offset less than the last offset of the last + // fetched cached batch, then there will be offsets in the request that are already acquired. + if (subMap.lastEntry().getValue().lastOffset < batch.firstOffset()) { + log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition); + throw new InvalidRequestException("Batch record not found. The first offset in request is past acquired records."); + } + + // Validate if the request batch has the last offset greater than the last offset of + // the last fetched cached batch, then there will be offsets in the request than cannot + // be found in the fetched batches. + if (isLastBatch && batch.lastOffset() > subMap.lastEntry().getValue().lastOffset) { + log.debug("Request batch: {} has offsets which are not found for share partition: {}-{}", batch, groupId, topicIdPartition); + throw new InvalidRequestException("Batch record not found. The last offset in request is past acquired records."); + } + + return subMap; + } finally { + lock.writeLock().unlock(); + } + } + + private Optional acknowledgementBatchRecords( + String memberId, + ShareAcknowledgementBatch batch, + Map recordStateMap, + NavigableMap subMap, + final List updatedStates, + List stateBatches + ) { + Optional throwable; + lock.writeLock().lock(); + try { + // The acknowledgement batch either is exact fetch equivalent batch (mostly), subset + // or spans over multiple fetched batches. The state can vary per offset itself from + // the fetched batch in case of subset or client sent individual offsets state. + for (Map.Entry entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + + // If startOffset has moved ahead of the in-flight batch, skip the batch. + if (inFlightBatch.lastOffset() < startOffset) { + log.trace("All offsets in the inflight batch {} are already archived: {}-{}", + inFlightBatch, groupId, topicIdPartition); + continue; + } + + // Validate if the requested member id is the owner of the batch. + if (inFlightBatch.offsetState() == null) { + throwable = validateAcknowledgementBatchMemberId(memberId, inFlightBatch); + if (throwable.isPresent()) { + return throwable; + } + } + + // Determine if the in-flight batch is a full match from the request batch. + boolean fullMatch = checkForFullMatch(inFlightBatch, batch.firstOffset(), batch.lastOffset()); + boolean isPerOffsetClientAck = batch.acknowledgeTypes().size() > 1; + boolean hasStartOffsetMoved = checkForStartOffsetWithinBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset); + + // Maintain state per offset if the inflight batch is not a full match or the + // offset state is managed or client sent individual offsets state or + // the start offset is within this in-flight batch. + if (!fullMatch || inFlightBatch.offsetState() != null || isPerOffsetClientAck || hasStartOffsetMoved) { + log.debug("Subset or offset tracked batch record found for acknowledgement," + + " batch: {}, request offsets - first: {}, last: {}, client per offset" + + "state {} for the share partition: {}-{}", inFlightBatch, batch.firstOffset(), + batch.lastOffset(), isPerOffsetClientAck, groupId, topicIdPartition); + if (inFlightBatch.offsetState() == null) { + // Though the request is a subset of in-flight batch but the offset + // tracking has not been initialized yet which means that we could only + // acknowledge subset of offsets from the in-flight batch but only if the + // complete batch is acquired yet. Hence, do a pre-check to avoid exploding + // the in-flight offset tracking unnecessarily. + if (inFlightBatch.batchState() != RecordState.ACQUIRED) { + log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The subset batch is not in the acquired state.")); + } + // The request batch is a subset or per offset state is managed hence update + // the offsets state in the in-flight batch. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } + + throwable = acknowledgePerOffsetBatchRecords(memberId, batch, inFlightBatch, + recordStateMap, updatedStates, stateBatches); + } else { + // The in-flight batch is a full match hence change the state of the complete batch. + throwable = acknowledgeCompleteBatch(batch, inFlightBatch, + recordStateMap.get(batch.firstOffset()), updatedStates, stateBatches); + } + + if (throwable.isPresent()) { + return throwable; + } + } + } finally { + lock.writeLock().unlock(); + } + return Optional.empty(); + } + + private Optional validateAcknowledgementBatchMemberId( + String memberId, + InFlightBatch inFlightBatch + ) { + // EMPTY_MEMBER_ID is used to indicate that the batch is not in acquired state. + if (inFlightBatch.batchMemberId().equals(EMPTY_MEMBER_ID)) { + log.debug("The batch is not in the acquired state: {} for share partition: {}-{}. Empty member id for batch.", + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException("The batch cannot be acknowledged. The batch is not in the acquired state.")); + } + + if (!inFlightBatch.batchMemberId().equals(memberId)) { + log.debug("Member {} is not the owner of batch record {} for share partition: {}-{}", + memberId, inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException("Member is not the owner of batch record")); + } + return Optional.empty(); + } + + private Optional acknowledgePerOffsetBatchRecords( + String memberId, + ShareAcknowledgementBatch batch, + InFlightBatch inFlightBatch, + Map recordStateMap, + List updatedStates, + List stateBatches + ) { + lock.writeLock().lock(); + try { + // Fetch the first record state from the map to be used as default record state in case the + // offset record state is not provided by client. + RecordState recordStateDefault = recordStateMap.get(batch.firstOffset()); + for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) { + + // 1. For the first batch which might have offsets prior to the request base + // offset i.e. cached batch of 10-14 offsets and request batch of 12-13. + // 2. Skip the offsets which are below the start offset of the share partition + if (offsetState.getKey() < batch.firstOffset() || offsetState.getKey() < startOffset) { + continue; + } + + if (offsetState.getKey() > batch.lastOffset()) { + // No further offsets to process. + break; + } + + if (offsetState.getValue().state != RecordState.ACQUIRED) { + log.debug("The offset is not acquired, offset: {} batch: {} for the share" + + " partition: {}-{}", offsetState.getKey(), inFlightBatch, groupId, + topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "The batch cannot be acknowledged. The offset is not acquired.")); + } + + // Check if member id is the owner of the offset. + if (!offsetState.getValue().memberId.equals(memberId)) { + log.debug("Member {} is not the owner of offset: {} in batch: {} for the share" + + " partition: {}-{}", memberId, offsetState.getKey(), inFlightBatch, + groupId, topicIdPartition); + return Optional.of( + new InvalidRecordStateException("Member is not the owner of offset")); + } + + // Determine the record state for the offset. If the per offset record state is not provided + // by the client, then use the batch record state. + RecordState recordState = + recordStateMap.size() > 1 ? recordStateMap.get(offsetState.getKey()) : + recordStateDefault; + InFlightState updateResult = offsetState.getValue().startStateTransition( + recordState, + false, + this.maxDeliveryCount, + EMPTY_MEMBER_ID + ); + if (updateResult == null) { + log.debug("Unable to acknowledge records for the offset: {} in batch: {}" + + " for the share partition: {}-{}", offsetState.getKey(), + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "Unable to acknowledge records for the batch")); + } + // Successfully updated the state of the offset. + updatedStates.add(updateResult); + stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(), + updateResult.state.id, (short) updateResult.deliveryCount)); + // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state. + // This should not change the next fetch offset because the record is not available for acquisition + if (recordState == RecordState.AVAILABLE + && updateResult.state != RecordState.ARCHIVED) { + findNextFetchOffset.set(true); + } + } + } finally { + lock.writeLock().unlock(); + } + return Optional.empty(); + } + + private Optional acknowledgeCompleteBatch( + ShareAcknowledgementBatch batch, + InFlightBatch inFlightBatch, + RecordState recordState, + List updatedStates, + List stateBatches + ) { + lock.writeLock().lock(); + try { + // The in-flight batch is a full match hence change the state of the complete. + log.trace("Acknowledging complete batch record {} for the share partition: {}-{}", + batch, groupId, topicIdPartition); + if (inFlightBatch.batchState() != RecordState.ACQUIRED) { + log.debug("The batch is not in the acquired state: {} for share partition: {}-{}", + inFlightBatch, groupId, topicIdPartition); + return Optional.of(new InvalidRecordStateException( + "The batch cannot be acknowledged. The batch is not in the acquired state.")); + } + + // Change the state of complete batch since the same state exists for the entire inFlight batch. + // The member id is reset to EMPTY_MEMBER_ID irrespective of the acknowledge type as the batch is + // either released or moved to a state where member id existence is not important. The member id + // is only important when the batch is acquired. + InFlightState updateResult = inFlightBatch.startBatchStateTransition( + recordState, + false, + this.maxDeliveryCount, + EMPTY_MEMBER_ID + ); + if (updateResult == null) { + log.debug("Unable to acknowledge records for the batch: {} with state: {}" + + " for the share partition: {}-{}", inFlightBatch, recordState, groupId, + topicIdPartition); + return Optional.of( + new InvalidRecordStateException("Unable to acknowledge records for the batch")); + } + + // Successfully updated the state of the batch. + updatedStates.add(updateResult); + stateBatches.add( + new PersisterStateBatch(inFlightBatch.firstOffset, inFlightBatch.lastOffset, + updateResult.state.id, (short) updateResult.deliveryCount)); + + // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state. + // This should not change the nextFetchOffset because the record is not available for acquisition + if (recordState == RecordState.AVAILABLE + && updateResult.state != RecordState.ARCHIVED) { + findNextFetchOffset.set(true); + } + } finally { + lock.writeLock().unlock(); + } + return Optional.empty(); + } + + // Visible for testing + void rollbackOrProcessStateUpdates( + Throwable throwable, + List updatedStates, + List stateBatches + ) { + lock.writeLock().lock(); + try { + if (throwable != null || !isWriteShareGroupStateSuccessful(stateBatches)) { + // Log in DEBUG to avoid flooding of logs for a faulty client. + log.debug("Request failed for updating state, rollback any changed state" + + " for the share partition: {}-{}", groupId, topicIdPartition); + updatedStates.forEach(state -> state.completeStateTransition(false)); + } else { + log.trace("State change request successful for share partition: {}-{}", + groupId, topicIdPartition); + updatedStates.forEach(state -> { + state.completeStateTransition(true); + // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. + state.cancelAndClearAcquisitionLockTimeoutTask(); + }); + // Update the cached state and start and end offsets after acknowledging/releasing the acquired records. + maybeUpdateCachedStateAndOffsets(); + } + } finally { + lock.writeLock().unlock(); + } + } + + private void maybeUpdateCachedStateAndOffsets() { + lock.writeLock().lock(); + try { + if (!canMoveStartOffset()) { + return; + } + + // This will help to find the next position for the startOffset. + // The new position of startOffset will be lastOffsetAcknowledged + 1 + long lastOffsetAcknowledged = findLastOffsetAcknowledged(); + // If lastOffsetAcknowledged is -1, this means we cannot move out startOffset ahead + if (lastOffsetAcknowledged == -1) { + return; + } + + // This is true if all records in the cachedState have been acknowledged (either Accept or Reject). + // The resulting action should be to empty the cachedState altogether + long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset(); + if (lastOffsetAcknowledged == lastCachedOffset) { + startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition + endOffset = lastCachedOffset + 1; + cachedState.clear(); + // Nothing further to do. + return; + } + + /* + The cachedState contains some records that are yet to be acknowledged, and thus should + not be removed. Only a subMap will be removed from the cachedState. The logic to remove + batches from cachedState is as follows: + a) Only full batches can be removed from the cachedState, For example if there is batch (0-99) + and 0-49 records are acknowledged (ACCEPT or REJECT), the first 50 records will not be removed + from the cachedState. Instead, the startOffset will be moved to 50, but the batch will only + be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). + */ + + // Since only a subMap will be removed, we need to find the first and last keys of that subMap + long firstKeyToRemove = cachedState.firstKey(); + long lastKeyToRemove; + NavigableMap.Entry entry = cachedState.floorEntry(lastOffsetAcknowledged); + if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { + startOffset = cachedState.higherKey(lastOffsetAcknowledged); + lastKeyToRemove = entry.getKey(); + } else { + startOffset = lastOffsetAcknowledged + 1; + if (entry.getKey().equals(cachedState.firstKey())) { + // If the first batch in cachedState has some records yet to be acknowledged, + // then nothing should be removed from cachedState + lastKeyToRemove = -1; + } else { + lastKeyToRemove = cachedState.lowerKey(entry.getKey()); + } + } + + if (lastKeyToRemove != -1) { + NavigableMap subMap = cachedState.subMap(firstKeyToRemove, true, lastKeyToRemove, true); + for (Long key : subMap.keySet()) { + cachedState.remove(key); + } + } + } finally { + lock.writeLock().unlock(); + } + } + + private boolean canMoveStartOffset() { + // The Share Partition Start Offset may be moved after acknowledgement request is complete. + // The following conditions need to be met to move the startOffset: + // 1. When the cachedState is not empty. + // 2. When the acknowledgement type for the records is either ACCEPT or REJECT. + // 3. When all the previous records have been acknowledged (ACCEPT or REJECT). + if (cachedState.isEmpty()) { + return false; + } + + NavigableMap.Entry entry = cachedState.floorEntry(startOffset); + if (entry == null) { + log.error("The start offset: {} is not found in the cached state for share partition: {}-{}." + + " Cannot move the start offset.", startOffset, groupId, topicIdPartition); + return false; + } + RecordState startOffsetState = entry.getValue().offsetState == null ? + entry.getValue().batchState() : + entry.getValue().offsetState().get(startOffset).state(); + return isRecordStateAcknowledged(startOffsetState); + } + + /** + * The record state is considered acknowledged if it is either acknowledged or archived. + * These are terminal states for the record. + * + * @param recordState The record state to check. + * + * @return True if the record state is acknowledged or archived, false otherwise. + */ + private boolean isRecordStateAcknowledged(RecordState recordState) { + return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED; + } + + private long findLastOffsetAcknowledged() { + lock.readLock().lock(); + long lastOffsetAcknowledged = -1; + try { + for (NavigableMap.Entry entry : cachedState.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); + if (inFlightBatch.offsetState() == null) { + if (!isRecordStateAcknowledged(inFlightBatch.batchState())) { + return lastOffsetAcknowledged; + } + lastOffsetAcknowledged = inFlightBatch.lastOffset(); + } else { + for (Map.Entry offsetState : inFlightBatch.offsetState.entrySet()) { + if (!isRecordStateAcknowledged(offsetState.getValue().state())) { + return lastOffsetAcknowledged; + } + lastOffsetAcknowledged = offsetState.getKey(); + } + } + } + } finally { + lock.readLock().unlock(); + } + return lastOffsetAcknowledged; + } + + // Visible for testing + boolean isWriteShareGroupStateSuccessful(List stateBatches) { + WriteShareGroupStateResult response; + try { + response = persister.writeState(new WriteShareGroupStateParameters.Builder() + .setGroupTopicPartitionData(new GroupTopicPartitionData.Builder() + .setGroupId(this.groupId) + .setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), + Collections.singletonList(PartitionFactory.newPartitionStateBatchData( + topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches)))) + ).build()).build()).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to write the share group state for share partition: {}-{}", groupId, topicIdPartition, e); + throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", + groupId, topicIdPartition), e); + } + + if (response == null || response.topicsData() == null || response.topicsData().size() != 1) { + log.error("Failed to write the share group state for share partition: {}-{}. Invalid state found: {}", + groupId, topicIdPartition, response); + throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", + groupId, topicIdPartition)); + } + + TopicData state = response.topicsData().get(0); + if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1 + || state.partitions().get(0).partition() != topicIdPartition.partition()) { + log.error("Failed to write the share group state for share partition: {}-{}. Invalid topic partition response: {}", + groupId, topicIdPartition, response); + throw new IllegalStateException(String.format("Failed to write the share group state for share partition %s-%s", + groupId, topicIdPartition)); + } + + PartitionErrorData partitionData = state.partitions().get(0); + if (partitionData.errorCode() != Errors.NONE.code()) { + Exception exception = Errors.forCode(partitionData.errorCode()).exception(); + log.error("Failed to write the share group state for share partition: {}-{} due to exception", + groupId, topicIdPartition, exception); + return false; + } + return true; + } + private AcquisitionLockTimerTask scheduleAcquisitionLockTimeout(String memberId, long firstOffset, long lastOffset) { return scheduleAcquisitionLockTimeout(memberId, firstOffset, lastOffset, recordLockDurationMs); } @@ -772,6 +1369,14 @@ public class SharePartition { return batchState.deliveryCount; } + // Visible for testing. + AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() { + if (batchState == null) { + throw new IllegalStateException("The batch state is not available as the offset state is maintained"); + } + return batchState.acquisitionLockTimeoutTask; + } + // Visible for testing. NavigableMap offsetState() { return offsetState; @@ -784,6 +1389,14 @@ public class SharePartition { return batchState.tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId); } + private InFlightState startBatchStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, + String newMemberId) { + if (batchState == null) { + throw new IllegalStateException("The batch state update is not available as the offset state is maintained"); + } + return batchState.startStateTransition(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId); + } + private void maybeInitializeOffsetStateUpdate() { if (offsetState == null) { offsetState = new ConcurrentSkipListMap<>(); @@ -841,6 +1454,9 @@ public class SharePartition { private int deliveryCount; // The member id of the client that is fetching/acknowledging the record. private String memberId; + // The state of the records before the transition. In case we need to revert an in-flight state, we revert the above + // attributes of InFlightState to this state, namely - state, deliveryCount and memberId. + private InFlightState rollbackState; // The timer task for the acquisition lock timeout. private AcquisitionLockTimerTask acquisitionLockTimeoutTask; @@ -856,6 +1472,25 @@ public class SharePartition { this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } + // Visible for testing. + RecordState state() { + return state; + } + + // Visible for testing. + int deliveryCount() { + return deliveryCount; + } + + String memberId() { + return memberId; + } + + // Visible for testing. + TimerTask acquisitionLockTimeoutTask() { + return acquisitionLockTimeoutTask; + } + void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) { this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask; } @@ -893,6 +1528,39 @@ public class SharePartition { } } + private InFlightState startStateTransition(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) { + rollbackState = new InFlightState(state, deliveryCount, memberId, acquisitionLockTimeoutTask); + return tryUpdateState(newState, incrementDeliveryCount, maxDeliveryCount, newMemberId); + } + + private void completeStateTransition(boolean commit) { + if (commit) { + rollbackState = null; + return; + } + state = rollbackState.state; + deliveryCount = rollbackState.deliveryCount; + memberId = rollbackState.memberId; + rollbackState = null; + } + + @Override + public int hashCode() { + return Objects.hash(state, deliveryCount, memberId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InFlightState that = (InFlightState) o; + return state == that.state && deliveryCount == that.deliveryCount && memberId.equals(that.memberId); + } + @Override public String toString() { return "InFlightState(" + diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index f49920cf648..da8b5ec0bb8 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.ShareFetchMetadata; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.share.CachedSharePartition; import org.apache.kafka.server.share.ShareSession; import org.apache.kafka.server.share.ShareSessionCache; @@ -102,15 +103,21 @@ public class SharePartitionManager implements AutoCloseable { */ private final int maxDeliveryCount; + /** + * The persister is used to persist the share partition state. + */ + private final Persister persister; + public SharePartitionManager( ReplicaManager replicaManager, Time time, ShareSessionCache cache, int recordLockDurationMs, int maxDeliveryCount, - int maxInFlightMessages + int maxInFlightMessages, + Persister persister ) { - this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages); + this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages, persister); } SharePartitionManager( @@ -120,7 +127,8 @@ public class SharePartitionManager implements AutoCloseable { Map partitionCacheMap, int recordLockDurationMs, int maxDeliveryCount, - int maxInFlightMessages + int maxInFlightMessages, + Persister persister ) { this.replicaManager = replicaManager; this.time = time; @@ -131,6 +139,7 @@ public class SharePartitionManager implements AutoCloseable { this.recordLockDurationMs = recordLockDurationMs; this.maxDeliveryCount = maxDeliveryCount; this.maxInFlightMessages = maxInFlightMessages; + this.persister = persister; } /** diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index ecbffb1cec6..bf73886c0fa 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -32,6 +32,8 @@ import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.group.share.NoOpShareStatePersister; +import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionKey; import org.junit.jupiter.api.Test; @@ -904,6 +906,7 @@ public class SharePartitionManagerTest { private Time time = new MockTime(); private ShareSessionCache cache = new ShareSessionCache(10, 1000); private Map partitionCacheMap = new HashMap<>(); + private Persister persister = NoOpShareStatePersister.getInstance(); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -925,12 +928,17 @@ public class SharePartitionManagerTest { return this; } + private SharePartitionManagerBuilder withShareGroupPersister(Persister persister) { + this.persister = persister; + return this; + } + public static SharePartitionManagerBuilder builder() { return new SharePartitionManagerBuilder(); } public SharePartitionManager build() { - return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES); + return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 0cef07ba624..ef983a9f241 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -17,10 +17,13 @@ package kafka.server.share; import kafka.server.ReplicaManager; +import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.errors.InvalidRecordStateException; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; @@ -28,6 +31,9 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.group.share.NoOpShareStatePersister; +import org.apache.kafka.server.group.share.Persister; +import org.apache.kafka.server.share.ShareAcknowledgementBatch; import org.apache.kafka.server.util.timer.SystemTimer; import org.apache.kafka.server.util.timer.SystemTimerReaper; import org.apache.kafka.server.util.timer.Timer; @@ -40,15 +46,21 @@ import org.mockito.Mockito; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -317,6 +329,598 @@ public class SharePartitionTest { assertTrue(sharePartition.maybeAcquireFetchLock()); } + @Test + public void testAcknowledgeSingleRecordBatch() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + + MemoryRecords records1 = memoryRecords(1, 0); + MemoryRecords records2 = memoryRecords(1, 1); + + // Another batch is acquired because if there is only 1 batch, and it is acknowledged, the batch will be removed from cachedState + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 10, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 10, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(1, 1, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(2, sharePartition.nextFetchOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(1L).batchState()); + assertEquals(1, sharePartition.cachedState().get(1L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(1L).offsetState()); + } + + @Test + public void testAcknowledgeMultipleRecordBatch() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(10, 5); + + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(5, 14, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(0, sharePartition.cachedState().size()); + } + + @Test + public void testAcknowledgeMultipleRecordBatchWithGapOffsets() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records1 = memoryRecords(2, 5); + // Untracked gap of 3 offsets from 7-9. + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); + // Gap from 15-17 offsets. + recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + MemoryRecords records2 = recordsBuilder.build(); + + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), result.join().toArray()); + assertEquals(7, sharePartition.nextFetchOffset()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), result.join().toArray()); + assertEquals(19, sharePartition.nextFetchOffset()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Arrays.asList( + new ShareAcknowledgementBatch(5, 6, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(10, 18, Arrays.asList( + (byte) 2, (byte) 2, (byte) 2, + (byte) 2, (byte) 2, (byte) 0, + (byte) 0, (byte) 0, (byte) 1 + )))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(5, sharePartition.nextFetchOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState()); + assertNull(sharePartition.cachedState().get(5L).offsetState()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState()); + + // Check cached state. + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + } + + @Test + public void testAcknowledgeMultipleSubsetRecordBatchWithGapOffsets() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records1 = memoryRecords(2, 5); + // Untracked gap of 3 offsets from 7-9. + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + // Gap from 12-13 offsets. + recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + // Gap for 15 offset. + recordsBuilder.appendWithOffset(16, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + // Gap from 17-19 offsets. + recordsBuilder.appendWithOffset(20, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); + MemoryRecords records2 = recordsBuilder.build(); + + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), result.join().toArray()); + assertEquals(7, sharePartition.nextFetchOffset()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), result.join().toArray()); + assertEquals(21, sharePartition.nextFetchOffset()); + + // Acknowledging over subset of both batch with subset of gap offsets. + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(6, 18, Arrays.asList( + // TODO: NOTE - untracked gap of 3 offsets from 7-9 has no effect on acknowledgment + // irrespective of acknowledgement type provided. While acquiring, the log start + // offset should be used to determine such gaps. + (byte) 1, (byte) 1, (byte) 1, + (byte) 1, (byte) 1, (byte) 1, + (byte) 0, (byte) 0, (byte) 1, + (byte) 0, (byte) 1, (byte) 0, + (byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(21, sharePartition.nextFetchOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(5L).batchState()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState()); + + // Check cached state. + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(5L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(6L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(5L).offsetState()); + + expectedOffsetStateMap.clear(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(15L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(16L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(17L, new InFlightState(RecordState.ARCHIVED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(18L, new InFlightState(RecordState.ACKNOWLEDGED, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(19L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(20L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + } + + @Test + public void testAcknowledgeOutOfRangeCachedData() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + // Acknowledge a batch when cache is empty. + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(0, 15, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + + MemoryRecords records = memoryRecords(5, 5); + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + // Cached data with offset 5-9 should exist. + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(5L)); + + ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(20, 25, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRequestException.class, ackResult.join().get().getClass()); + } + + @Test + public void testAcknowledgeWithAnotherMember() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(5, 5); + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + // Cached data with offset 5-9 should exist. + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(5L)); + + CompletableFuture> ackResult = sharePartition.acknowledge( + "member-2", + Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + } + + @Test + public void testAcknowledgeWhenOffsetNotAcquired() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(5, 5); + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + // Cached data with offset 5-9 should exist. + assertEquals(1, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(5L)); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + // Acknowledge the same batch again but with ACCEPT type. + ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + + // Re-acquire the same batch and then acknowledge subset with ACCEPT type. + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(6, 8, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + // Re-acknowledge the subset batch with REJECT type. + ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(6, 8, Collections.singletonList((byte) 3)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + } + + @Test + public void testAcknowledgeRollbackWithFullBatchError() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records1 = memoryRecords(5, 5); + MemoryRecords records2 = memoryRecords(5, 10); + MemoryRecords records3 = memoryRecords(5, 15); + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + // Cached data with offset 5-19 should exist. + assertEquals(3, sharePartition.cachedState().size()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Arrays.asList( + new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(10, 14, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)), + // Add another batch which should fail the request. + new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + + // Check the state of the cache. The state should be acquired itself. + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + } + + @Test + public void testAcknowledgeRollbackWithSubsetError() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records1 = memoryRecords(5, 5); + MemoryRecords records2 = memoryRecords(5, 10); + MemoryRecords records3 = memoryRecords(5, 15); + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 0, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + assertEquals(1, result.join().size()); + // Cached data with offset 5-19 should exist. + assertEquals(3, sharePartition.cachedState().size()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Arrays.asList( + new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)), + new ShareAcknowledgementBatch(10, 14, Collections.singletonList((byte) 1)), + new ShareAcknowledgementBatch(15, 19, Collections.singletonList((byte) 1)), + // Add another batch which should fail the request. + new ShareAcknowledgementBatch(16, 19, Collections.singletonList((byte) 1)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertTrue(ackResult.join().isPresent()); + assertEquals(InvalidRecordStateException.class, ackResult.join().get().getClass()); + + // Check the state of the cache. The state should be acquired itself. + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + // Though the last batch is subset but the offset state map will not be exploded as the batch is + // not in acquired state due to previous batch acknowledgement. + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + } + + @Test + public void testAcquireReleasedRecord() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + MemoryRecords records = memoryRecords(5, 10); + + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + List acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(12, 13, Collections.singletonList((byte) 2)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(12, sharePartition.nextFetchOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState()); + + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + + // Send the same fetch request batch again but only 2 offsets should come as acquired. + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireReleasedRecordMultipleBatches() { + SharePartition sharePartition = SharePartitionBuilder.builder().build(); + // First fetch request with 5 records starting from offset 10. + MemoryRecords records1 = memoryRecords(5, 10); + // Second fetch request with 5 records starting from offset 15. + MemoryRecords records2 = memoryRecords(5, 15); + // Third fetch request with 5 records starting from offset 23, gap of 3 offsets. + MemoryRecords records3 = memoryRecords(5, 23); + // Fourth fetch request with 5 records starting from offset 28. + MemoryRecords records4 = memoryRecords(5, 28); + + CompletableFuture> result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 40, 3, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + List acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 3, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(20, sharePartition.nextFetchOffset()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 3, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 30, 3, records4, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(33, sharePartition.nextFetchOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(28L).batchState()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + assertNull(sharePartition.cachedState().get(23L).offsetState()); + assertNull(sharePartition.cachedState().get(28L).offsetState()); + + CompletableFuture> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(12, 30, Collections.singletonList((byte) 2)))); + assertFalse(ackResult.isCompletedExceptionally()); + assertFalse(ackResult.join().isPresent()); + + assertEquals(12, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(15L).batchMemberId()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(23L).batchState()); + assertNull(sharePartition.cachedState().get(23L).offsetState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(23L).batchMemberId()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(28L).batchState()); + assertNotNull(sharePartition.cachedState().get(28L).offsetState()); + + Map expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + + expectedOffsetStateMap.clear(); + expectedOffsetStateMap.put(28L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(29L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(30L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(31L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(32L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(28L).offsetState()); + + // Send next batch from offset 12, only 3 records should be acquired. + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 40, 3, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + // Though record2 batch exists to acquire but send batch record3, it should be acquired but + // next fetch offset should not move. + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 40, 3, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + // Acquire partial records from batch 2. + MemoryRecords subsetRecords = memoryRecords(2, 17); + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(15, sharePartition.nextFetchOffset()); + + // Acquire partial records from record 4 to further test if the next fetch offset move + // accordingly once complete record 2 is also acquired. + subsetRecords = memoryRecords(1, 28); + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(15, sharePartition.nextFetchOffset()); + + // Try to acquire complete record 2 though it's already partially acquired, the next fetch + // offset should move. + result = sharePartition.acquire( + MEMBER_ID, + new FetchPartitionData(Errors.NONE, 20, 3, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); + assertFalse(result.isCompletedExceptionally()); + + acquiredRecordsList = result.join(); + // Offset 15,16 and 19 should be acquired. + List expectedAcquiredRecords = expectedAcquiredRecords(15, 16, 2); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2)); + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(29, sharePartition.nextFetchOffset()); + } + private MemoryRecords memoryRecords(int numOfRecords) { return memoryRecords(numOfRecords, 0); } @@ -343,11 +947,23 @@ public class SharePartitionTest { return acquiredRecordsList; } + private List expectedAcquiredRecords(long baseOffset, long lastOffset, int deliveryCount) { + List acquiredRecordsList = new ArrayList<>(); + for (long i = baseOffset; i <= lastOffset; i++) { + acquiredRecordsList.add(new AcquiredRecords() + .setFirstOffset(i) + .setLastOffset(i) + .setDeliveryCount((short) deliveryCount)); + } + return acquiredRecordsList; + } + private static class SharePartitionBuilder { private int acquisitionLockTimeoutMs = 30000; private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; + private Persister persister = NoOpShareStatePersister.getInstance(); private ReplicaManager replicaManager = REPLICA_MANAGER; private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { @@ -355,13 +971,18 @@ public class SharePartitionTest { return this; } + private SharePartitionBuilder withPersister(Persister persister) { + this.persister = persister; + return this; + } + public static SharePartitionBuilder builder() { return new SharePartitionBuilder(); } public SharePartition build() { return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, - acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, replicaManager); + acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager); } } } diff --git a/server-common/src/main/java/org/apache/kafka/server/group/share/NoOpShareStatePersister.java b/server-common/src/main/java/org/apache/kafka/server/group/share/NoOpShareStatePersister.java new file mode 100644 index 00000000000..9b9a984de17 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/group/share/NoOpShareStatePersister.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.group.share; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * A no-op singleton implementation of {@link Persister} interface. + */ +public class NoOpShareStatePersister implements Persister { + + private NoOpShareStatePersister() { + } + + private static final class InstanceHolder { + static final Persister INSTANCE = new NoOpShareStatePersister(); + } + + public static Persister getInstance() { + return InstanceHolder.INSTANCE; + } + + @Override + public CompletableFuture initializeState(InitializeShareGroupStateParameters request) { + GroupTopicPartitionData reqData = request.groupTopicPartitionData(); + List> resultArgs = new ArrayList<>(); + for (TopicData topicData : reqData.topicsData()) { + resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream() + .map(partStateData -> PartitionFactory.newPartitionErrorData(partStateData.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + .collect(Collectors.toList()))); + } + return CompletableFuture.completedFuture(new InitializeShareGroupStateResult.Builder().setTopicsData(resultArgs).build()); + } + + @Override + public CompletableFuture readState(ReadShareGroupStateParameters request) { + GroupTopicPartitionData reqData = request.groupTopicPartitionData(); + List> resultArgs = new ArrayList<>(); + // we will fetch topic and partition info from the request and + // return valid but default response (keep partition id and topic from request but initialize other + // values as default). + for (TopicData topicData : reqData.topicsData()) { + resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). + map(partitionIdData -> PartitionFactory.newPartitionAllData( + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE, Collections.emptyList())) + .collect(Collectors.toList()))); + } + return CompletableFuture.completedFuture(new ReadShareGroupStateResult.Builder().setTopicsData(resultArgs).build()); + } + + @Override + public CompletableFuture writeState(WriteShareGroupStateParameters request) { + GroupTopicPartitionData reqData = request.groupTopicPartitionData(); + List> resultArgs = new ArrayList<>(); + for (TopicData topicData : reqData.topicsData()) { + resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream() + .map(batch -> PartitionFactory.newPartitionErrorData(batch.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + .collect(Collectors.toList()))); + } + return CompletableFuture.completedFuture(new WriteShareGroupStateResult.Builder().setTopicsData(resultArgs).build()); + } + + @Override + public CompletableFuture deleteState(DeleteShareGroupStateParameters request) { + GroupTopicPartitionData reqData = request.groupTopicPartitionData(); + List> resultArgs = new ArrayList<>(); + for (TopicData topicData : reqData.topicsData()) { + resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream() + .map(batch -> PartitionFactory.newPartitionErrorData(batch.partition(), PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + .collect(Collectors.toList()))); + } + return CompletableFuture.completedFuture(new DeleteShareGroupStateResult.Builder().setTopicsData(resultArgs).build()); + } + + @Override + public CompletableFuture readSummary(ReadShareGroupStateSummaryParameters request) { + GroupTopicPartitionData reqData = request.groupTopicPartitionData(); + List> resultArgs = new ArrayList<>(); + // we will fetch topic and partition info from the request and + // return valid but default response (keep partition id and topic from request but initialize other + // values as default). + for (TopicData topicData : reqData.topicsData()) { + resultArgs.add(new TopicData<>(topicData.topicId(), topicData.partitions().stream(). + map(partitionIdData -> PartitionFactory.newPartitionStateErrorData( + partitionIdData.partition(), PartitionFactory.DEFAULT_STATE_EPOCH, PartitionFactory.DEFAULT_START_OFFSET, PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE)) + .collect(Collectors.toList()))); + } + return CompletableFuture.completedFuture(new ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build()); + } + + @Override + public void stop() { + //noop + } +} \ No newline at end of file