KAFKA-17367: Share coordinator persistent batch merging algorithm. [3/N] (#17149)

This patch introduces a merging algorithm for persistent state batches in the share coordinator. 

The algorithm removes any expired batches (lastOffset before startOffset) and then places the rest in a sorted map. It then identifies batch pairs which overlap and combine them while preserving the relative priorities of any intersecting sub-ranges. The resultant batches are placed back into the map. The algorithm ends when no more overlapping pairs can be found.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Arthur <mumrah@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Sushant Mahajan 2024-10-02 21:00:51 +05:30 committed by GitHub
parent 12a16ecf28
commit 7b7eb6243f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1036 additions and 219 deletions

View File

@ -62,6 +62,8 @@
<allow pkg="org.apache.kafka.server.util.timer" />
<allow pkg="org.apache.kafka.timeline" />
<allow pkg="org.junit.jupiter.api" />
<allow pkg="org.junit.jupiter.params" />
<allow pkg="org.junit.jupiter.provider" />
<allow pkg="org.mockito" />
<allow pkg="org.slf4j" />
<subpackage name="generated">

View File

@ -341,6 +341,12 @@
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>
<!-- share coordinator -->
<suppress checks="NPathComplexity"
files="ShareCoordinatorShard.java"/>
<suppress checks="CyclomaticComplexity"
files="ShareCoordinatorShard.java"/>
<!-- storage -->
<suppress checks="CyclomaticComplexity"
files="(LogLoader|LogValidator|RemoteLogManagerConfig|RemoteLogManager).java"/>

View File

@ -1,100 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.share.PersisterStateBatch;
import java.util.Objects;
/**
* This is a helper class which overrides the equals and hashcode
* methods to only focus on the first and last offset fields of the
* state batch. This is useful when combining batches.
*/
public class PersisterOffsetsStateBatch {
private final PersisterStateBatch delegate;
public PersisterOffsetsStateBatch(
long firstOffset,
long lastOffset,
byte deliveryState,
short deliveryCount
) {
delegate = new PersisterStateBatch(firstOffset, lastOffset, deliveryState, deliveryCount);
}
public long firstOffset() {
return delegate.firstOffset();
}
public long lastOffset() {
return delegate.lastOffset();
}
public byte deliveryState() {
return delegate.deliveryState();
}
public short deliveryCount() {
return delegate.deliveryCount();
}
public static PersisterOffsetsStateBatch from(WriteShareGroupStateRequestData.StateBatch batch) {
return new PersisterOffsetsStateBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.deliveryState(),
batch.deliveryCount()
);
}
public static PersisterOffsetsStateBatch from(ShareUpdateValue.StateBatch batch) {
return new PersisterOffsetsStateBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.deliveryState(),
batch.deliveryCount()
);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof PersisterOffsetsStateBatch)) {
return false;
}
PersisterOffsetsStateBatch that = (PersisterOffsetsStateBatch) o;
return this.firstOffset() == that.firstOffset() && this.lastOffset() == that.lastOffset();
}
@Override
public int hashCode() {
return Objects.hash(firstOffset(), lastOffset());
}
@Override
public String toString() {
return "PersisterOffsetsStateBatch(" +
"firstOffset=" + firstOffset() + "," +
"lastOffset=" + lastOffset() + "," +
"deliveryState=" + deliveryState() + "," +
"deliveryCount=" + deliveryCount() +
")";
}
}

View File

@ -0,0 +1,402 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.share;
import org.apache.kafka.server.share.PersisterStateBatch;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
public class PersisterStateBatchCombiner {
private List<PersisterStateBatch> combinedBatchList; // link between pruning and merging
private final long startOffset;
private TreeSet<PersisterStateBatch> sortedBatches;
private List<PersisterStateBatch> finalBatchList; // final list is built here
public PersisterStateBatchCombiner(
List<PersisterStateBatch> batchesSoFar,
List<PersisterStateBatch> newBatches,
long startOffset
) {
initializeCombinedList(batchesSoFar, newBatches);
int estimatedResultSize = (combinedBatchList.size() * 3) / 2; // heuristic size - 50% overallocation
finalBatchList = new ArrayList<>(estimatedResultSize);
this.startOffset = startOffset;
}
private void initializeCombinedList(List<PersisterStateBatch> batchesSoFar, List<PersisterStateBatch> newBatches) {
boolean soFarEmpty = batchesSoFar == null || batchesSoFar.isEmpty();
boolean newBatchesEmpty = newBatches == null || newBatches.isEmpty();
if (soFarEmpty && newBatchesEmpty) {
combinedBatchList = new ArrayList<>();
} else if (soFarEmpty) {
combinedBatchList = new ArrayList<>(newBatches); // new list as the original one could be unmodifiable
} else if (newBatchesEmpty) {
combinedBatchList = new ArrayList<>(batchesSoFar); // new list as the original one could be unmodifiable
} else {
combinedBatchList = new ArrayList<>(batchesSoFar.size() + newBatches.size());
combinedBatchList.addAll(batchesSoFar);
combinedBatchList.addAll(newBatches);
}
}
/**
* Algorithm: Merge current state batches and new batches into a single non-overlapping batch list.
* Input: batchesSoFar, newBatches, startOffset
* Output: combined list with non-overlapping batches (finalBatchList)
* <p>
* - Add both currentBatches and newBatches into a single list combinedBatchList
* - if combinedBatchList.size() <= 1 return combinedBatchList
* <p>
* - Remove/prune any batches from the combinedBatchList:
* - if batch.lastOffset < startOffset then remove batch from combinedBatchList
* - else if batch.firstOffset > startOffset then we will keep the batch
* - else if batch.firstOffset <= startOffset <= batch.lastOffset then keep [startOffset, batch.lastOffset] part only and discard rest.
* <p>
* - create a treeset sortedBatches using pruned combinedBatchList
* - find first 2 mergeable batches in sortedBatches set, say, prev and candidate.
* - remove any non-overlapping batches from sortedBatches encountered during the find operation and add them to a finalBatchList
* - do repeat until a mergeable pair is not found:
* - based on various conditions of offset overlap and batch state differences combine the batches or
* create new batches, if required, and add to the sortedBatches.
* - find first 2 mergeable batches in sortedBatches set, say, prev and candidate.
* - remove any non-mergeable batches from sortedBatches encountered during the find operation and add them to a finalBatchList
* - done
* - return the finalBatchList
*
* @return list of {@link PersisterStateBatch} representing non-overlapping combined batches
*/
public List<PersisterStateBatch> combineStateBatches() {
pruneBatches();
mergeBatches();
return finalBatchList;
}
private void mergeBatches() {
if (combinedBatchList.size() < 2) {
finalBatchList = combinedBatchList;
return;
}
sortedBatches = new TreeSet<>(combinedBatchList);
MergeCandidatePair overlapState = getMergeCandidatePair();
while (overlapState != MergeCandidatePair.EMPTY) {
PersisterStateBatch prev = overlapState.prev();
PersisterStateBatch candidate = overlapState.candidate();
// remove both previous and candidate for easier
// assessment about adding batches to sortedBatches
sortedBatches.remove(prev);
sortedBatches.remove(candidate);
if (compareBatchDeliveryInfo(candidate, prev) == 0) { // same state and overlap or contiguous
// overlap and same state (prev.firstOffset <= candidate.firstOffset) due to sort
// covers:
// case: 1 2 3 4 5 6 7 (contiguous)
// prev: ------ ------- ------- ------- ------- -------- -------
// candidate: ------ ---- ---------- --- ---- ------- -------
handleSameStateMerge(prev, candidate); // pair can be contiguous or overlapping
} else {
// If we reach here then it is guaranteed that the batch pair is overlapping and
// non-contiguous because getMergeCandidatePair only returns contiguous pair if
// the constituents have the same delivery count and state.
// covers:
// case: 1 2* 3 4 5 6 7*
// prev: ------ ------- ------- ------- ------- -------- ------
// candidate: ------ ---- --------- ---- ---- ------- -------
// max batches: 1 2 2 3 2 2 2
// min batches: 1 1 1 1 1 2 1
// * not possible with treeset
handleDiffStateOverlap(prev, candidate);
}
overlapState = getMergeCandidatePair();
}
finalBatchList.addAll(sortedBatches); // some non overlapping batches might have remained
}
/**
* Compares the non-offset state of 2 batches i.e. the deliveryCount and deliverState.
* <p>
* Uses standard compareTo contract x < y => +int, x > y => -int, x == y => 0
*
* @param b1 - {@link PersisterStateBatch} to compare
* @param b2 - {@link PersisterStateBatch} to compare
* @return int representing comparison result.
*/
private int compareBatchDeliveryInfo(PersisterStateBatch b1, PersisterStateBatch b2) {
int deltaCount = Short.compare(b1.deliveryCount(), b2.deliveryCount());
// Delivery state could be:
// 0 - AVAILABLE (non-terminal)
// 1 - ACQUIRED - should not be persisted yet
// 2 - ACKNOWLEDGED (terminal)
// 3 - ARCHIVING - not implemented in KIP-932 - non-terminal - leads only to ARCHIVED
// 4 - ARCHIVED (terminal)
if (deltaCount == 0) { // same delivery count
return Byte.compare(b1.deliveryState(), b2.deliveryState());
}
return deltaCount;
}
/**
* Accepts a sorted set of state batches and finds the first 2 batches which can be merged.
* Merged implies that they have some offsets in common or, they are contiguous with the same state.
* <p>
* Any non-mergeable batches prefixing a good mergeable pair are removed from the sortedBatches.
* For example:
* ----- ---- ----- ----- -----
* ------
* <---------------> <-------->
* non-overlapping 1st overlapping pair
*
* @return object representing the overlap state
*/
private MergeCandidatePair getMergeCandidatePair() {
if (sortedBatches == null || sortedBatches.isEmpty()) {
return MergeCandidatePair.EMPTY;
}
Iterator<PersisterStateBatch> iter = sortedBatches.iterator();
PersisterStateBatch prev = iter.next();
List<PersisterStateBatch> nonOverlapping = new ArrayList<>(sortedBatches.size());
while (iter.hasNext()) {
PersisterStateBatch candidate = iter.next();
if (candidate.firstOffset() <= prev.lastOffset() || // overlap
prev.lastOffset() + 1 == candidate.firstOffset() && compareBatchDeliveryInfo(prev, candidate) == 0) { // contiguous
updateBatchContainers(nonOverlapping);
return new MergeCandidatePair(
prev,
candidate
);
}
nonOverlapping.add(prev);
prev = candidate;
}
updateBatchContainers(nonOverlapping);
return MergeCandidatePair.EMPTY;
}
private void updateBatchContainers(List<PersisterStateBatch> nonOverlappingBatches) {
nonOverlappingBatches.forEach(sortedBatches::remove);
finalBatchList.addAll(nonOverlappingBatches);
}
/**
* Accepts a list of {@link PersisterStateBatch} and checks:
* - last offset is < start offset => batch is removed
* - first offset > start offset => batch is preserved
* - start offset intersects the batch => part of batch before start offset is removed and
* the part after it is preserved.
*/
private void pruneBatches() {
if (startOffset != -1) {
List<PersisterStateBatch> retainedBatches = new ArrayList<>(combinedBatchList.size());
combinedBatchList.forEach(batch -> {
if (batch.lastOffset() < startOffset) {
// batch is expired, skip current iteration
// -------
// | -> start offset
return;
}
if (batch.firstOffset() >= startOffset) {
// complete batch is valid
// ---------
// | -> start offset
retainedBatches.add(batch);
} else {
// start offset intersects batch
// ---------
// | -> start offset
retainedBatches.add(new PersisterStateBatch(startOffset, batch.lastOffset(), batch.deliveryState(), batch.deliveryCount()));
}
});
// update the instance variable
combinedBatchList = retainedBatches;
}
}
private void handleSameStateMerge(PersisterStateBatch prev, PersisterStateBatch candidate) {
sortedBatches.add(new PersisterStateBatch(
prev.firstOffset(),
// cover cases
// prev: ------ -------- ---------
// candidate: --- ---------- -----
Math.max(candidate.lastOffset(), prev.lastOffset()),
prev.deliveryState(),
prev.deliveryCount()
));
}
private void handleDiffStateOverlap(PersisterStateBatch prev, PersisterStateBatch candidate) {
if (candidate.firstOffset() == prev.firstOffset()) {
handleDiffStateOverlapFirstOffsetAligned(prev, candidate);
} else { // candidate.firstOffset() > prev.firstOffset()
handleDiffStateOverlapFirstOffsetNotAligned(prev, candidate);
}
}
private void handleDiffStateOverlapFirstOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
if (candidate.lastOffset() == prev.lastOffset()) { // case 1
// candidate can never have lower or equal priority
// since sortedBatches order takes that into account.
// -------
// -------
sortedBatches.add(candidate);
} else {
// case 2 is not possible with TreeSet. It is symmetric to case 3.
// case 3
// --------
// -----------
if (compareBatchDeliveryInfo(candidate, prev) < 0) {
sortedBatches.add(prev);
sortedBatches.add(new PersisterStateBatch(
prev.lastOffset() + 1,
candidate.lastOffset(),
candidate.deliveryState(),
candidate.deliveryCount()
));
} else {
// candidate priority is >= prev
sortedBatches.add(candidate);
}
}
}
private void handleDiffStateOverlapFirstOffsetNotAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
if (candidate.lastOffset() < prev.lastOffset()) { // case 4
handleDiffStateOverlapPrevSwallowsCandidate(prev, candidate);
} else if (candidate.lastOffset() == prev.lastOffset()) { // case 5
handleDiffStateOverlapLastOffsetAligned(prev, candidate);
} else { // case 6
handleDiffStateOverlapCandidateOffsetsLarger(prev, candidate);
}
}
private void handleDiffStateOverlapPrevSwallowsCandidate(PersisterStateBatch prev, PersisterStateBatch candidate) {
// --------
// ----
if (compareBatchDeliveryInfo(candidate, prev) < 0) {
sortedBatches.add(prev);
} else {
sortedBatches.add(new PersisterStateBatch(
prev.firstOffset(),
candidate.firstOffset() - 1,
prev.deliveryState(),
prev.deliveryCount()
));
sortedBatches.add(candidate);
sortedBatches.add(new PersisterStateBatch(
candidate.lastOffset() + 1,
prev.lastOffset(),
prev.deliveryState(),
prev.deliveryCount()
));
}
}
private void handleDiffStateOverlapLastOffsetAligned(PersisterStateBatch prev, PersisterStateBatch candidate) {
// --------
// -----
if (compareBatchDeliveryInfo(candidate, prev) < 0) {
sortedBatches.add(prev);
} else {
sortedBatches.add(new PersisterStateBatch(
prev.firstOffset(),
candidate.firstOffset() - 1,
prev.deliveryState(),
prev.deliveryCount()
));
sortedBatches.add(candidate);
}
}
private void handleDiffStateOverlapCandidateOffsetsLarger(PersisterStateBatch prev, PersisterStateBatch candidate) {
// -------
// -------
if (compareBatchDeliveryInfo(candidate, prev) < 0) {
sortedBatches.add(prev);
sortedBatches.add(new PersisterStateBatch(
prev.lastOffset() + 1,
candidate.lastOffset(),
candidate.deliveryState(),
candidate.deliveryCount()
));
} else {
// candidate has higher priority
sortedBatches.add(new PersisterStateBatch(
prev.firstOffset(),
candidate.firstOffset() - 1,
prev.deliveryState(),
prev.deliveryCount()
));
sortedBatches.add(candidate);
}
}
/**
* Holder class for intermediate state
* used in the batch merge algorithm.
*/
static class MergeCandidatePair {
private final PersisterStateBatch prev;
private final PersisterStateBatch candidate;
public static final MergeCandidatePair EMPTY = new MergeCandidatePair(null, null);
public MergeCandidatePair(
PersisterStateBatch prev,
PersisterStateBatch candidate
) {
this.prev = prev;
this.candidate = candidate;
}
public PersisterStateBatch prev() {
return prev;
}
public PersisterStateBatch candidate() {
return candidate;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof MergeCandidatePair)) return false;
MergeCandidatePair that = (MergeCandidatePair) o;
return Objects.equals(prev, that.prev) && Objects.equals(candidate, that.candidate);
}
@Override
public int hashCode() {
return Objects.hash(prev, candidate);
}
}
}

View File

@ -48,19 +48,16 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.share.PartitionFactory;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord> {
@ -78,6 +75,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
public static final Exception NULL_TOPIC_ID = new Exception("The topic id cannot be null.");
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The partition id cannot be a negative number.");
public static final Exception UNEXPECTED_CURRENT_STATE = new Exception("Unexpected current state was found.");
public static class Builder implements CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
private ShareCoordinatorConfig config;
@ -262,7 +260,6 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @param request - WriteShareGroupStateRequestData for a single key
* @return CoordinatorResult(records, response)
*/
@SuppressWarnings("NPathComplexity")
public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState(
WriteShareGroupStateRequestData request
) {
@ -274,79 +271,106 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return error.get();
}
String groupId = request.groupId();
WriteShareGroupStateRequestData.WriteStateData topicData = request.topics().get(0);
WriteShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), partitionData.partition());
SharePartitionKey key = SharePartitionKey.getInstance(groupId, topicData.topicId(), partitionData.partition());
List<CoordinatorRecord> recordList;
CoordinatorRecord record = generateShareStateRecord(partitionData, key);
// build successful response if record is correctly created
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData()
.setResults(
Collections.singletonList(
WriteShareGroupStateResponse.toResponseWriteStateResult(key.topicId(),
Collections.singletonList(
WriteShareGroupStateResponse.toResponsePartitionResult(
key.partition()
))
))
);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
}
/**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
* <p>
* if no snapshot has been created for the key => create a new ShareSnapshot record
* else if number of ShareUpdate records for key >= max allowed per snapshot per key => create a new ShareSnapshot record
* else create a new ShareUpdate record
*
* @param partitionData - Represents the data which should be written into the share state record.
* @param key - The {@link SharePartitionKey} object.
* @return {@link CoordinatorRecord} representing ShareSnapshot or ShareUpdate
*/
private CoordinatorRecord generateShareStateRecord(
WriteShareGroupStateRequestData.PartitionData partitionData,
SharePartitionKey key
) {
if (!shareStateMap.containsKey(key)) {
// since this is the first time we are getting a write request, we should be creating a share snapshot record
recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData)
));
// Since this is the first time we are getting a write request for key, we should be creating a share snapshot record.
// The incoming partition data could have overlapping state batches, we must merge them
return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder()
.setSnapshotEpoch(0)
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateEpoch(partitionData.stateEpoch())
.setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
.build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? shareStateMap.get(key).leaderEpoch() : partitionData.leaderEpoch();
int newStateEpoch = partitionData.stateEpoch() == -1 ? shareStateMap.get(key).stateEpoch() : partitionData.stateEpoch();
long newStartOffset = partitionData.startOffset() == -1 ? shareStateMap.get(key).startOffset() : partitionData.startOffset();
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch();
int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch();
long newStartOffset = partitionData.startOffset() == -1 ? currentState.startOffset() : partitionData.startOffset();
// Since the number of update records for this share part key exceeds snapshotUpdateRecordsPerSnapshot,
// we should be creating a share snapshot record.
List<PersisterOffsetsStateBatch> batchesToAdd = combineStateBatches(
shareStateMap.get(key).stateBatchAsSet(),
partitionData.stateBatches().stream()
.map(PersisterOffsetsStateBatch::from)
.collect(Collectors.toCollection(LinkedHashSet::new)),
newStartOffset);
recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
groupId, topicData.topicId(), partitionData.partition(),
// The incoming partition data could have overlapping state batches, we must merge them.
return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch() + 1) // we must increment snapshot epoch as this is new snapshot
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateEpoch(newStateEpoch)
.setStateBatches(batchesToAdd)
.build()));
.setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset))
.build());
} else {
// share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
recordList = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
groupId, topicData.topicId(), partitionData.partition(), ShareGroupOffset.fromRequest(partitionData, shareStateMap.get(key).snapshotEpoch())
));
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
// Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
// so create a share update record.
// The incoming partition data could have overlapping state batches, we must merge them.
return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch()) // use same snapshotEpoch as last share snapshot
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
.setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
.build());
}
}
List<CoordinatorRecord> validRecords = new LinkedList<>();
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
for (CoordinatorRecord record : recordList) { // should be single record
if (!(record.key().message() instanceof ShareSnapshotKey) && !(record.key().message() instanceof ShareUpdateKey)) {
continue;
}
SharePartitionKey mapKey = null;
boolean shouldIncSnapshotEpoch = false;
if (record.key().message() instanceof ShareSnapshotKey) {
ShareSnapshotKey recordKey = (ShareSnapshotKey) record.key().message();
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
recordKey.partition())))));
mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition());
shouldIncSnapshotEpoch = true;
} else if (record.key().message() instanceof ShareUpdateKey) {
ShareUpdateKey recordKey = (ShareUpdateKey) record.key().message();
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
recordKey.topicId(), Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
recordKey.partition())))));
mapKey = SharePartitionKey.getInstance(recordKey.groupId(), recordKey.topicId(), recordKey.partition());
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData) {
return mergeBatches(soFar, partitionData, partitionData.startOffset());
}
if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) {
ShareGroupOffset oldValue = shareStateMap.get(mapKey);
((ShareSnapshotValue) record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1); // increment the snapshot epoch
}
validRecords.add(record); // this will have updated snapshot epoch and on replay the value will trickle down to the map
}
return new CoordinatorResult<>(validRecords, responseData);
private List<PersisterStateBatch> mergeBatches(
List<PersisterStateBatch> soFar,
WriteShareGroupStateRequestData.PartitionData partitionData,
long startOffset) {
return new PersisterStateBatchCombiner(
soFar,
partitionData.stateBatches().stream()
.map(PersisterStateBatch::from)
.collect(Collectors.toList()),
startOffset
)
.combineStateBatches();
}
/**
@ -527,7 +551,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) {
// snapshot epoch should be same as last share snapshot
// state epoch is not present
Set<PersisterOffsetsStateBatch> currentBatches = soFar.stateBatchAsSet();
List<PersisterStateBatch> currentBatches = soFar.stateBatches();
long newStartOffset = newData.startOffset() == -1 ? soFar.startOffset() : newData.startOffset();
int newLeaderEpoch = newData.leaderEpoch() == -1 ? soFar.leaderEpoch() : newData.leaderEpoch();
@ -536,41 +560,13 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setStateEpoch(soFar.stateEpoch())
.setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch)
.setStateBatches(combineStateBatches(currentBatches, newData.stateBatches().stream()
.map(PersisterOffsetsStateBatch::from)
.collect(Collectors.toCollection(LinkedHashSet::new)), newStartOffset))
.setStateBatches(new PersisterStateBatchCombiner(currentBatches, newData.stateBatches().stream()
.map(ShareCoordinatorShard::toPersisterStateBatch)
.collect(Collectors.toList()), newStartOffset)
.combineStateBatches())
.build();
}
/**
* Util method which takes in 2 collections containing {@link PersisterOffsetsStateBatch}
* and the startOffset.
* It removes all batches from the 1st collection which have the same first and last offset
* as the batches in 2nd collection. It then creates a final list of batches which contains the
* former result and all the batches in the 2nd collection. In set notation (A - B) U B (we prefer batches in B
* which have same first and last offset in A).
* Finally, it removes any batches where the lastOffset < startOffset, if the startOffset > -1.
* @param currentBatch - collection containing current soft state of batches
* @param newBatch - collection containing batches in incoming request
* @param startOffset - startOffset to consider when removing old batches.
* @return List containing combined batches
*/
private static List<PersisterOffsetsStateBatch> combineStateBatches(
Collection<PersisterOffsetsStateBatch> currentBatch,
Collection<PersisterOffsetsStateBatch> newBatch,
long startOffset
) {
currentBatch.removeAll(newBatch);
List<PersisterOffsetsStateBatch> batchesToAdd = new LinkedList<>(currentBatch);
batchesToAdd.addAll(newBatch);
// Any batches where the last offset is < the current start offset
// are now expired. We should remove them from the persister.
if (startOffset != -1) {
batchesToAdd.removeIf(batch -> batch.lastOffset() < startOffset);
}
return batchesToAdd;
}
private static ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@ -578,4 +574,20 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return apiMessageAndVersion.message();
}
}
/**
* Util function to convert a state batch of type {@link ShareUpdateValue.StateBatch }
* to {@link PersisterStateBatch}.
*
* @param batch - The object representing {@link ShareUpdateValue.StateBatch}
* @return {@link PersisterStateBatch}
*/
private static PersisterStateBatch toPersisterStateBatch(ShareUpdateValue.StateBatch batch) {
return new PersisterStateBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.deliveryState(),
batch.deliveryCount()
);
}
}

View File

@ -20,12 +20,12 @@ package org.apache.kafka.coordinator.share;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.share.PersisterStateBatch;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -37,13 +37,13 @@ public class ShareGroupOffset {
private final int stateEpoch;
private final int leaderEpoch;
private final long startOffset;
private final List<PersisterOffsetsStateBatch> stateBatches;
private final List<PersisterStateBatch> stateBatches;
private ShareGroupOffset(int snapshotEpoch,
int stateEpoch,
int leaderEpoch,
long startOffset,
List<PersisterOffsetsStateBatch> stateBatches) {
List<PersisterStateBatch> stateBatches) {
this.snapshotEpoch = snapshotEpoch;
this.stateEpoch = stateEpoch;
this.leaderEpoch = leaderEpoch;
@ -67,16 +67,16 @@ public class ShareGroupOffset {
return startOffset;
}
public List<PersisterOffsetsStateBatch> stateBatches() {
public List<PersisterStateBatch> stateBatches() {
return Collections.unmodifiableList(stateBatches);
}
private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareSnapshotValue.StateBatch stateBatch) {
return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
}
private static PersisterOffsetsStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) {
return new PersisterOffsetsStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
private static PersisterStateBatch toPersisterOffsetsStateBatch(ShareUpdateValue.StateBatch stateBatch) {
return new PersisterStateBatch(stateBatch.firstOffset(), stateBatch.lastOffset(), stateBatch.deliveryState(), stateBatch.deliveryCount());
}
public static ShareGroupOffset fromRecord(ShareSnapshotValue record) {
@ -99,11 +99,11 @@ public class ShareGroupOffset {
data.leaderEpoch(),
data.startOffset(),
data.stateBatches().stream()
.map(PersisterOffsetsStateBatch::from)
.map(PersisterStateBatch::from)
.collect(Collectors.toList()));
}
public Set<PersisterOffsetsStateBatch> stateBatchAsSet() {
public LinkedHashSet<PersisterStateBatch> stateBatchAsSet() {
return new LinkedHashSet<>(stateBatches);
}
@ -112,7 +112,7 @@ public class ShareGroupOffset {
private int stateEpoch;
private int leaderEpoch;
private long startOffset;
private List<PersisterOffsetsStateBatch> stateBatches;
private List<PersisterStateBatch> stateBatches;
public Builder setSnapshotEpoch(int snapshotEpoch) {
this.snapshotEpoch = snapshotEpoch;
@ -134,7 +134,7 @@ public class ShareGroupOffset {
return this;
}
public Builder setStateBatches(List<PersisterOffsetsStateBatch> stateBatches) {
public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
this.stateBatches = stateBatches;
return this;
}

View File

@ -0,0 +1,445 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.share;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PersisterStateBatchCombinerTest {
static class BatchTestHolder {
final String testName;
final List<PersisterStateBatch> batchesSoFar;
final List<PersisterStateBatch> newBatches;
final List<PersisterStateBatch> expectedResult;
final long startOffset;
final boolean shouldRun;
BatchTestHolder(
String testName,
List<PersisterStateBatch> batchesSoFar,
List<PersisterStateBatch> newBatches,
List<PersisterStateBatch> expectedResult,
long startOffset
) {
this(testName, batchesSoFar, newBatches, expectedResult, startOffset, false);
}
BatchTestHolder(
String testName,
List<PersisterStateBatch> batchesSoFar,
List<PersisterStateBatch> newBatches,
List<PersisterStateBatch> expectedResult,
long startOffset,
boolean shouldRun
) {
this.testName = testName;
this.batchesSoFar = batchesSoFar;
this.newBatches = newBatches;
this.expectedResult = expectedResult;
this.startOffset = startOffset;
this.shouldRun = shouldRun;
}
static List<PersisterStateBatch> singleBatch(
long firstOffset,
long lastOffset,
int deliveryState,
int deliveryCount
) {
return Collections.singletonList(
new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount)
);
}
static class MultiBatchBuilder {
private final List<PersisterStateBatch> batchList = new LinkedList<>();
MultiBatchBuilder addBatch(
long firstOffset,
long lastOffset,
int deliveryState,
int deliveryCount
) {
batchList.add(new PersisterStateBatch(firstOffset, lastOffset, (byte) deliveryState, (short) deliveryCount));
return this;
}
List<PersisterStateBatch> build() {
return batchList;
}
}
static MultiBatchBuilder multiBatch() {
return new MultiBatchBuilder();
}
@Override
public String toString() {
return this.testName;
}
}
private static Stream<BatchTestHolder> generatorCornerCases() {
return Stream.of(
new BatchTestHolder(
"Current batches with start offset midway are pruned.",
BatchTestHolder.singleBatch(100, 130, 0, 1),
Collections.emptyList(),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
),
new BatchTestHolder(
"New batches with start offset midway are pruned.",
Collections.emptyList(),
BatchTestHolder.singleBatch(100, 130, 0, 1),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
),
new BatchTestHolder(
"Both current and new batches empty.",
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
120
)
);
}
private static Stream<BatchTestHolder> generatorSameState() {
return Stream.of(
// same state
new BatchTestHolder(
"Same state. batchSoFar and newBatch have same first and last offset.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 1),
-1
),
new BatchTestHolder(
"Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly smaller.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 105, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 1),
-1
),
new BatchTestHolder(
"Same state. batchSoFar and newBatch have same first offset, newBatch last offset strictly larger.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 115, 0, 1),
BatchTestHolder.singleBatch(100, 115, 0, 1),
-1
),
new BatchTestHolder(
"Same state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(105, 108, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 1),
-1,
true
),
new BatchTestHolder(
"Same state. newBatch first offset strictly larger and last offset equal to batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(105, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 1),
-1
),
new BatchTestHolder(
"Same state. newBatch first offset strictly larger and last offset strictly larger than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(105, 115, 0, 1),
BatchTestHolder.singleBatch(100, 115, 0, 1),
-1
),
new BatchTestHolder(
"Same state. newBatch first offset is batchSoFar first offset + 1 (contiguous).",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(111, 115, 0, 1),
BatchTestHolder.singleBatch(100, 115, 0, 1),
-1
)
);
}
private static Stream<BatchTestHolder> generatorComplex() {
return Stream.of(
new BatchTestHolder(
"Handle overlapping batches in newBatches, same state",
BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 1)
.addBatch(121, 130, 0, 1)
.build(),
BatchTestHolder.multiBatch()
.addBatch(111, 119, 2, 2)
.addBatch(116, 123, 2, 2) // overlap with first batch
.build(), // , //[(111-123, 2, 2)]
BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 1)
.addBatch(111, 123, 2, 2)
.build(),
-1
),
new BatchTestHolder(
"Handle overlapping batches in newBatches, different priority.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.multiBatch()
.addBatch(101, 105, 1, 2)
.addBatch(101, 115, 2, 2)
.addBatch(101, 120, 3, 2) //[(111-123, 2, 2)]
.build(),
BatchTestHolder.multiBatch()
.addBatch(100, 100, 0, 1)
.addBatch(101, 120, 3, 2)
.build(),
-1
),
new BatchTestHolder(
"Handle overlapping batches in newBatches, with pruning.",
BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 1) // should get removed
.addBatch(121, 130, 0, 1)
.build(),
BatchTestHolder.multiBatch()
.addBatch(111, 119, 2, 2)
.addBatch(116, 123, 2, 2) // overlap with first batch //[(111-123, 2, 2)]
.build(),
BatchTestHolder.multiBatch()
.addBatch(120, 123, 2, 2)
.addBatch(124, 130, 0, 1)
.build(),
120
),
new BatchTestHolder(
"Multiple higher state batch updates.",
BatchTestHolder.singleBatch(111, 120, 0, 1),
BatchTestHolder.multiBatch()
.addBatch(111, 113, 0, 2)
.addBatch(114, 114, 2, 1)
.addBatch(115, 119, 0, 2) //[(111-123, 2, 2)]
.build(),
BatchTestHolder.multiBatch()
.addBatch(111, 113, 0, 2)
.addBatch(114, 114, 2, 1)
.addBatch(115, 119, 0, 2)
.addBatch(120, 120, 0, 1)
.build(),
-1
)
);
}
private static Stream<BatchTestHolder> generatorDifferentStates() {
return Stream.of(
// different states
new BatchTestHolder(
"newBatch higher state. newBatch first offset and last offset match batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 2),
BatchTestHolder.singleBatch(100, 110, 0, 2),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first offset and last offset match batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 3),
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 3),
-1
),
new BatchTestHolder(
"newBatch higher state. newBatch first offset same and last offset smaller than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 105, 0, 2),
BatchTestHolder.multiBatch()
.addBatch(100, 105, 0, 2)
.addBatch(106, 110, 0, 1)
.build(),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first offset same and last offset smaller than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 3),
BatchTestHolder.singleBatch(100, 105, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 3),
-1
),
new BatchTestHolder(
"newBatch higher state. newBatch first offset same and last offset strictly larger than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(100, 115, 0, 2),
BatchTestHolder.singleBatch(100, 115, 0, 2),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first offset same and last offset strictly larger than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 3),
BatchTestHolder.singleBatch(100, 115, 0, 1),
BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 3)
.addBatch(111, 115, 0, 1)
.build(),
-1
),
new BatchTestHolder(
"newBatch higher state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
BatchTestHolder.singleBatch(100, 115, 0, 1),
BatchTestHolder.singleBatch(105, 110, 1, 1),
BatchTestHolder.multiBatch()
.addBatch(100, 104, 0, 1)
.addBatch(105, 110, 1, 1)
.addBatch(111, 115, 0, 1)
.build(),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first offset strictly larger and last offset strictly smaller than batchSoFar.",
BatchTestHolder.singleBatch(100, 115, 1, 1),
BatchTestHolder.singleBatch(105, 110, 0, 1),
BatchTestHolder.singleBatch(100, 115, 1, 1),
-1
),
new BatchTestHolder(
"newBatch higher state. newBatch first offset strictly larger and last offset same as batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(105, 110, 0, 2),
BatchTestHolder.multiBatch()
.addBatch(100, 104, 0, 1)
.addBatch(105, 110, 0, 2)
.build(),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first offset strictly larger and last offset same as batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 2),
BatchTestHolder.singleBatch(105, 110, 0, 1),
BatchTestHolder.singleBatch(100, 110, 0, 2),
-1
),
new BatchTestHolder(
"newBatch higher state. newBatch first and last offsets strictly larger than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 1),
BatchTestHolder.singleBatch(105, 115, 0, 2),
BatchTestHolder.multiBatch()
.addBatch(100, 104, 0, 1)
.addBatch(105, 115, 0, 2)
.build(),
-1
),
new BatchTestHolder(
"newBatch lower state. newBatch first and last offsets strictly larger than batchSoFar.",
BatchTestHolder.singleBatch(100, 110, 0, 2),
BatchTestHolder.singleBatch(105, 115, 0, 1),
BatchTestHolder.multiBatch()
.addBatch(100, 110, 0, 2)
.addBatch(111, 115, 0, 1)
.build(),
-1
)
);
}
@ParameterizedTest
@MethodSource("generatorDifferentStates")
public void testStateBatchCombineDifferentStates(BatchTestHolder test) {
if (test.shouldRun) {
assertEquals(test.expectedResult,
new PersisterStateBatchCombiner(
test.batchesSoFar,
test.newBatches,
test.startOffset)
.combineStateBatches(),
test.testName
);
}
}
@ParameterizedTest
@MethodSource("generatorSameState")
public void testStateBatchCombineSameState(BatchTestHolder test) {
if (test.shouldRun) {
assertEquals(test.expectedResult,
new PersisterStateBatchCombiner(
test.batchesSoFar,
test.newBatches,
test.startOffset)
.combineStateBatches(),
test.testName
);
}
}
@ParameterizedTest
@MethodSource("generatorComplex")
public void testStateBatchCombineComplexCases(BatchTestHolder test) {
if (test.shouldRun) {
assertEquals(test.expectedResult,
new PersisterStateBatchCombiner(
test.batchesSoFar,
test.newBatches,
test.startOffset)
.combineStateBatches(),
test.testName
);
}
}
@ParameterizedTest
@MethodSource("generatorCornerCases")
public void testStateBatchCombineCornerCases(BatchTestHolder test) {
if (test.shouldRun) {
assertEquals(test.expectedResult,
new PersisterStateBatchCombiner(
test.batchesSoFar,
test.newBatches,
test.startOffset)
.combineStateBatches(),
test.testName
);
}
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.junit.jupiter.api.Test;
@ -36,7 +37,7 @@ public class ShareCoordinatorRecordHelpersTest {
String groupId = "test-group";
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1);
PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1);
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
groupId,
topicId,
@ -79,7 +80,7 @@ public class ShareCoordinatorRecordHelpersTest {
String groupId = "test-group";
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
PersisterOffsetsStateBatch batch = new PersisterOffsetsStateBatch(1L, 10L, (byte) 0, (short) 1);
PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1);
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
groupId,
topicId,

View File

@ -40,6 +40,7 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.share.PersisterStateBatch;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -53,7 +54,6 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@ -312,7 +312,9 @@ class ShareCoordinatorShardTest {
assertEquals(incrementalUpdate.snapshotEpoch(), combinedState.snapshotEpoch());
assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch());
assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset());
assertTrue(combinedState.stateBatchAsSet().containsAll(incrementalUpdate.stateBatchAsSet()));
// the batches should have combined to 1 since same state
assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)),
combinedState.stateBatches());
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
}
@ -773,8 +775,8 @@ class ShareCoordinatorShardTest {
.setStateEpoch(0)
.setSnapshotEpoch(2) // since 2nd share snapshot
.setStateBatches(Arrays.asList(
new PersisterOffsetsStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
new PersisterOffsetsStateBatch(120, 129, (byte) 2, (short) 1)
new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
))
.build();
List<CoordinatorRecord> expectedRecordsFinal = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(

View File

@ -25,11 +25,11 @@ import java.util.Objects;
/**
* This class contains the information for a single batch of state information for use by the {@link Persister}.
*/
public class PersisterStateBatch {
public class PersisterStateBatch implements Comparable {
private final long firstOffset;
private final long lastOffset;
private final byte deliveryState;
private final short deliveryCount;
private final byte deliveryState;
public PersisterStateBatch(long firstOffset, long lastOffset, byte deliveryState, short deliveryCount) {
this.firstOffset = firstOffset;
@ -77,13 +77,13 @@ public class PersisterStateBatch {
PersisterStateBatch that = (PersisterStateBatch) o;
return firstOffset == that.firstOffset &&
lastOffset == that.lastOffset &&
deliveryState == that.deliveryState &&
deliveryCount == that.deliveryCount;
deliveryCount == that.deliveryCount &&
deliveryState == that.deliveryState;
}
@Override
public int hashCode() {
return Objects.hash(firstOffset, lastOffset, deliveryState, deliveryCount);
return Objects.hash(firstOffset, lastOffset, deliveryCount, deliveryState);
}
@Override
@ -91,8 +91,47 @@ public class PersisterStateBatch {
return "PersisterStateBatch(" +
"firstOffset=" + firstOffset + "," +
"lastOffset=" + lastOffset + "," +
"deliveryState=" + deliveryState + "," +
"deliveryCount=" + deliveryCount +
"deliveryCount=" + deliveryCount + "," +
"deliveryState=" + deliveryState +
")";
}
/**
* Compares 2 PersisterStateBatches in various dimensions.
* The priority of the dimensions are:
* - firstOffset
* - lastOffset
* - deliveryCount
* - deliveryState
* <p>
* Does not check all dimensions in every case. The first dimension
* check resulting in non-zero comparison result is returned.
* <p>
* In case the 2 objects are equal, all 4 dimension comparisons must
* be 0.
* <p>
* This method could be used for storing PersisterStateBatch objects
* in containers which allow a Comparator argument or various sort algorithms
* in the java library.
*
* @param o - object representing another PersisterStateBatch
* @return -INT, 0, +INT based on "this" being smaller, equal or larger than the argument.
*/
@Override
public int compareTo(Object o) {
PersisterStateBatch that = (PersisterStateBatch) o;
int deltaFirst = Long.compare(this.firstOffset(), that.firstOffset());
if (deltaFirst == 0) {
int deltaLast = Long.compare(this.lastOffset(), that.lastOffset());
if (deltaLast == 0) {
int deltaCount = this.deliveryCount() - that.deliveryCount();
if (deltaCount == 0) {
return Byte.compare(this.deliveryState(), that.deliveryState());
}
return deltaCount;
}
return deltaLast;
}
return deltaFirst;
}
}

View File

@ -69,6 +69,14 @@ public class SharePartitionKey {
return new SharePartitionKey(groupId, topicId, partition);
}
public String asCoordinatorKey() {
return asCoordinatorKey(groupId(), topicId(), partition());
}
public static String asCoordinatorKey(String groupId, Uuid topicId, int partition) {
return String.format("%s:%s:%d", groupId, topicId, partition);
}
@Override
public boolean equals(final Object obj) {
if (this == obj)