KAFKA-17796: Persist higher leaderEpoch in read state call. (#17580)

This PR adds code into the ShareCoordinatorService.readState method to issue a runtime.scheduleWriteOperation call if the incoming read state request holds a valid leaderEpoch value (not -1).

Co-authored-by: TaiJu Wu <tjwu1217@gmail.com>

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Sushant Mahajan 2024-12-07 14:32:03 +05:30 committed by GitHub
parent 24385a89cf
commit 42f74a1c3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 287 additions and 166 deletions

View File

@ -338,7 +338,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
}); });
}); });
// Combine all futures into a single CompletableFuture<Void> // Combine all futures into a single CompletableFuture<Void>.
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream() CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream()
.flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new)); .flatMap(partMap -> partMap.values().stream()).toArray(CompletableFuture[]::new));
@ -349,7 +349,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
(topicId, topicEntry) -> { (topicId, topicEntry) -> {
List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size()); List<WriteShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach( topicEntry.forEach(
// map of partition id -> responses from api // Map of partition id -> responses from api.
(partitionId, responseFut) -> { (partitionId, responseFut) -> {
// This is the future returned by runtime.scheduleWriteOperation which returns when the // This is the future returned by runtime.scheduleWriteOperation which returns when the
// operation has completed including error information. When this line executes, the future // operation has completed including error information. When this line executes, the future
@ -363,8 +363,8 @@ public class ShareCoordinatorService implements ShareCoordinator {
} }
); );
// time taken for write // Time taken for write.
// at this point all futures are completed written above. // At this point all futures are completed written above.
shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME,
time.hiResClockMs() - startTimeMs); time.hiResClockMs() - startTimeMs);
@ -379,7 +379,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
// A map to store the futures for each topicId and partition. // A map to store the futures for each topicId and partition.
Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateResponseData>>> futureMap = new HashMap<>(); Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateResponseData>>> futureMap = new HashMap<>();
// Send an empty response if topic data is empty // Send an empty response if topic data is empty.
if (isEmpty(request.topics())) { if (isEmpty(request.topics())) {
log.error("Topic Data is empty: {}", request); log.error("Topic Data is empty: {}", request);
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
@ -387,7 +387,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
); );
} }
// Send an empty response if partition data is empty for any topic // Send an empty response if partition data is empty for any topic.
for (ReadShareGroupStateRequestData.ReadStateData topicData : request.topics()) { for (ReadShareGroupStateRequestData.ReadStateData topicData : request.topics()) {
if (isEmpty(topicData.partitions())) { if (isEmpty(topicData.partitions())) {
log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request); log.error("Partition Data for topic {} is empty: {}", topicData.topicId(), request);
@ -397,7 +397,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
} }
} }
// Send an empty response if groupId is invalid // Send an empty response if groupId is invalid.
if (isGroupIdEmpty(groupId)) { if (isGroupIdEmpty(groupId)) {
log.error("Group id must be specified and non-empty: {}", request); log.error("Group id must be specified and non-empty: {}", request);
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
@ -405,7 +405,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
); );
} }
// Send an empty response if the coordinator is not active // Send an empty response if the coordinator is not active.
if (!isActive.get()) { if (!isActive.get()) {
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
generateErrorReadStateResponse( generateErrorReadStateResponse(
@ -421,43 +421,55 @@ public class ShareCoordinatorService implements ShareCoordinator {
// be looping over the keys below and constructing new ReadShareGroupStateRequestData objects to pass // be looping over the keys below and constructing new ReadShareGroupStateRequestData objects to pass
// onto the shard method. // onto the shard method.
request.topics().forEach(topicData -> { // It is possible that a read state request contains a leaderEpoch which is the higher than seen so
// far, for a specific share partition. Hence, for each read request - we must check for this
// and update the state appropriately.
for (ReadShareGroupStateRequestData.ReadStateData topicData : request.topics()) {
Uuid topicId = topicData.topicId(); Uuid topicId = topicData.topicId();
topicData.partitions().forEach(partitionData -> { for (ReadShareGroupStateRequestData.PartitionData partitionData : topicData.partitions()) {
// Request object containing information of a single topic partition SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition());
ReadShareGroupStateRequestData requestForCurrentPartition = new ReadShareGroupStateRequestData() ReadShareGroupStateRequestData requestForCurrentPartition = new ReadShareGroupStateRequestData()
.setGroupId(groupId) .setGroupId(groupId)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData() .setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId) .setTopicId(topicId)
.setPartitions(Collections.singletonList(partitionData)))); .setPartitions(Collections.singletonList(partitionData))));
SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partitionData.partition());
// Scheduling a runtime read operation to read share partition state from the coordinator in memory state
CompletableFuture<ReadShareGroupStateResponseData> future = runtime.scheduleReadOperation(
"read-share-group-state",
topicPartitionFor(coordinatorKey),
(coordinator, offset) -> coordinator.readState(requestForCurrentPartition, offset)
).exceptionally(exception -> handleOperationException(
"read-share-group-state",
request,
exception,
(error, message) -> ReadShareGroupStateResponse.toErrorResponseData(
topicData.topicId(),
partitionData.partition(),
error,
"Unable to read share group state: " + exception.getMessage()
),
log
));
futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
.put(partitionData.partition(), future);
});
});
// Combine all futures into a single CompletableFuture<Void> // We are issuing a scheduleWriteOperation even though the request is of read type since
// we might want to update the leader epoch, if it is the highest seen so far for the specific
// share partition. In that case, we require the strong consistency offered by scheduleWriteOperation.
// At the time of writing, read after write consistency for the readState and writeState requests
// is not guaranteed.
CompletableFuture<ReadShareGroupStateResponseData> readFuture = runtime.scheduleWriteOperation(
"read-update-leader-epoch-state",
topicPartitionFor(coordinatorKey),
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.readStateAndMaybeUpdateLeaderEpoch(requestForCurrentPartition)
).exceptionally(readException ->
handleOperationException(
"read-update-leader-epoch-state",
request,
readException,
(error, message) -> ReadShareGroupStateResponse.toErrorResponseData(
topicData.topicId(),
partitionData.partition(),
error,
"Unable to read share group state: " + readException.getMessage()
),
log
));
futureMap.computeIfAbsent(topicId, k -> new HashMap<>())
.put(partitionData.partition(), readFuture);
}
}
// Combine all futures into a single CompletableFuture<Void>.
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream() CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureMap.values().stream()
.flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new)); .flatMap(map -> map.values().stream()).toArray(CompletableFuture[]::new));
// Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResponseData> // Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResponseData>.
return combinedFuture.thenApply(v -> { return combinedFuture.thenApply(v -> {
List<ReadShareGroupStateResponseData.ReadStateResult> readStateResult = new ArrayList<>(futureMap.size()); List<ReadShareGroupStateResponseData.ReadStateResult> readStateResult = new ArrayList<>(futureMap.size());
futureMap.forEach( futureMap.forEach(
@ -465,7 +477,7 @@ public class ShareCoordinatorService implements ShareCoordinator {
List<ReadShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size()); List<ReadShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach( topicEntry.forEach(
(partitionId, responseFut) -> { (partitionId, responseFut) -> {
// responseFut would already be completed by now since we have used // ResponseFut would already be completed by now since we have used
// CompletableFuture::allOf to create a combined future from the future map. // CompletableFuture::allOf to create a combined future from the future map.
partitionResults.add( partitionResults.add(
responseFut.getNow(null).results().get(0).partitions().get(0) responseFut.getNow(null).results().get(0).partitions().get(0)

View File

@ -201,7 +201,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value)); handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value));
break; break;
default: default:
// noop // Noop
} }
} }
@ -225,11 +225,11 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
SharePartitionKey mapKey = SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition()); SharePartitionKey mapKey = SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch()); maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
// share update does not hold state epoch information. // Share update does not hold state epoch information.
ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value); ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
// this is an incremental snapshot // This is an incremental snapshot,
// so, we need to apply it to our current soft state // so we need to apply it to our current soft state.
shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord : merge(v, value)); shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord : merge(v, value));
snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1); snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
} }
@ -268,8 +268,8 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState( public CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> writeState(
WriteShareGroupStateRequestData request WriteShareGroupStateRequestData request
) { ) {
// records to write (with both key and value of snapshot type), response to caller // Records to write (with both key and value of snapshot type), response to caller
// only one key will be there in the request by design // only one key will be there in the request by design.
metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> error = maybeGetWriteStateError(request); Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> error = maybeGetWriteStateError(request);
if (error.isPresent()) { if (error.isPresent()) {
@ -296,6 +296,88 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(Collections.singletonList(record), responseData); return new CoordinatorResult<>(Collections.singletonList(record), responseData);
} }
/**
* Method reads data from the soft state and if needed updates the leader epoch.
* It can happen that a read state call for a share partition has a higher leaderEpoch
* value than seen so far.
* In case an update is not required, empty record list will be generated along with a success response.
* @param request - represents ReadShareGroupStateRequestData
* @return CoordinatorResult object
*/
public CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> readStateAndMaybeUpdateLeaderEpoch(
ReadShareGroupStateRequestData request
) {
// Only one key will be there in the request by design.
Optional<ReadShareGroupStateResponseData> error = maybeGetReadStateError(request);
if (error.isPresent()) {
return new CoordinatorResult<>(Collections.emptyList(), error.get());
}
ReadShareGroupStateRequestData.ReadStateData topicData = request.topics().get(0);
ReadShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
Uuid topicId = topicData.topicId();
int partitionId = partitionData.partition();
int leaderEpoch = partitionData.leaderEpoch();
SharePartitionKey key = SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
ReadShareGroupStateResponseData responseData = null;
if (!shareStateMap.containsKey(key)) {
// Leader epoch update might be needed
responseData = ReadShareGroupStateResponse.toResponseData(
topicId,
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList()
);
} else {
// Leader epoch update might be needed
ShareGroupOffset offsetValue = shareStateMap.get(key);
List<ReadShareGroupStateResponseData.StateBatch> stateBatches = (offsetValue.stateBatches() != null && !offsetValue.stateBatches().isEmpty()) ?
offsetValue.stateBatches().stream()
.map(
stateBatch -> new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(stateBatch.firstOffset())
.setLastOffset(stateBatch.lastOffset())
.setDeliveryState(stateBatch.deliveryState())
.setDeliveryCount(stateBatch.deliveryCount())
).collect(Collectors.toList()) : Collections.emptyList();
responseData = ReadShareGroupStateResponse.toResponseData(
topicId,
partitionId,
offsetValue.startOffset(),
offsetValue.stateEpoch(),
stateBatches
);
}
// Optimization in case leaderEpoch update is not required.
if (leaderEpoch == -1 ||
(leaderEpochMap.get(key) != null && leaderEpochMap.get(key) == leaderEpoch)) {
return new CoordinatorResult<>(Collections.emptyList(), responseData);
}
// It is OK to info log this since this reaching this codepoint should be quite infrequent.
log.info("Read with leader epoch update call for key {} having new leader epoch {}.", key, leaderEpoch);
// Recording the sensor here as above if condition will not produce any record.
metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
// Generate record with leaderEpoch info.
WriteShareGroupStateRequestData.PartitionData writePartitionData = new WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionId)
.setLeaderEpoch(leaderEpoch)
.setStateBatches(Collections.emptyList())
.setStartOffset(responseData.results().get(0).partitions().get(0).startOffset())
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
CoordinatorRecord record = generateShareStateRecord(writePartitionData, key);
return new CoordinatorResult<>(Collections.singletonList(record), responseData);
}
/** /**
* Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions. * Util method to generate a ShareSnapshot or ShareUpdate type record for a key, based on various conditions.
* <p> * <p>
@ -335,14 +417,14 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return ShareCoordinatorRecordHelpers.newShareSnapshotRecord( return ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
key.groupId(), key.topicId(), partitionData.partition(), key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder() new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch() + 1) // we must increment snapshot epoch as this is new snapshot .setSnapshotEpoch(currentState.snapshotEpoch() + 1) // We must increment snapshot epoch as this is new snapshot.
.setStartOffset(newStartOffset) .setStartOffset(newStartOffset)
.setLeaderEpoch(newLeaderEpoch) .setLeaderEpoch(newLeaderEpoch)
.setStateEpoch(newStateEpoch) .setStateEpoch(newStateEpoch)
.setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset)) .setStateBatches(mergeBatches(currentState.stateBatches(), partitionData, newStartOffset))
.build()); .build());
} else { } else {
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true.
// Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot // Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
// so create a share update record. // so create a share update record.
@ -350,7 +432,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
key.groupId(), key.topicId(), partitionData.partition(), key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder() new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch()) // use same snapshotEpoch as last share snapshot .setSnapshotEpoch(currentState.snapshotEpoch()) // Use same snapshotEpoch as last share snapshot.
.setStartOffset(partitionData.startOffset()) .setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch()) .setLeaderEpoch(partitionData.leaderEpoch())
.setStateBatches(mergeBatches(Collections.emptyList(), partitionData)) .setStateBatches(mergeBatches(Collections.emptyList(), partitionData))
@ -378,70 +460,6 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.combineStateBatches(); .combineStateBatches();
} }
/**
* This method finds the ShareSnapshotValue record corresponding to the requested topic partition from the
* in-memory state of coordinator shard, the shareStateMap.
* <p>
* This method as called by the ShareCoordinatorService will be provided with
* the request data which covers only key i.e. group1:topic1:partition1. The implementation
* below was done keeping this in mind.
*
* @param request - WriteShareGroupStateRequestData for a single key
* @param offset - offset to read from the __share_group_state topic partition
* @return CoordinatorResult(records, response)
*/
public ReadShareGroupStateResponseData readState(ReadShareGroupStateRequestData request, Long offset) {
// records to read (with the key of snapshot type), response to caller
// only one key will be there in the request by design
Optional<ReadShareGroupStateResponseData> error = maybeGetReadStateError(request, offset);
if (error.isPresent()) {
return error.get();
}
Uuid topicId = request.topics().get(0).topicId();
int partition = request.topics().get(0).partitions().get(0).partition();
int leaderEpoch = request.topics().get(0).partitions().get(0).leaderEpoch();
SharePartitionKey coordinatorKey = SharePartitionKey.getInstance(request.groupId(), topicId, partition);
if (!shareStateMap.containsKey(coordinatorKey)) {
return ReadShareGroupStateResponse.toResponseData(
topicId,
partition,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList()
);
}
ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey, offset);
if (offsetValue == null) {
// Returning an error response as the snapshot value was not found
return ReadShareGroupStateResponse.toErrorResponseData(
topicId,
partition,
Errors.UNKNOWN_SERVER_ERROR,
"Data not found for topic {}, partition {} for group {}, in the in-memory state of share coordinator"
);
}
List<ReadShareGroupStateResponseData.StateBatch> stateBatches = (offsetValue.stateBatches() != null && !offsetValue.stateBatches().isEmpty()) ?
offsetValue.stateBatches().stream().map(
stateBatch -> new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(stateBatch.firstOffset())
.setLastOffset(stateBatch.lastOffset())
.setDeliveryState(stateBatch.deliveryState())
.setDeliveryCount(stateBatch.deliveryCount())
).collect(java.util.stream.Collectors.toList()) : Collections.emptyList();
// Updating the leader map with the new leader epoch
leaderEpochMap.put(coordinatorKey, leaderEpoch);
// Returning the successfully retrieved snapshot value
return ReadShareGroupStateResponse.toResponseData(topicId, partition, offsetValue.startOffset(), offsetValue.stateEpoch(), stateBatches);
}
private Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> maybeGetWriteStateError( private Optional<CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>> maybeGetWriteStateError(
WriteShareGroupStateRequestData request WriteShareGroupStateRequestData request
) { ) {
@ -461,11 +479,11 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
} }
SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId); SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId);
if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) { if (partitionData.leaderEpoch() != -1 && leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) {
log.error("Request leader epoch smaller than last recorded."); log.error("Request leader epoch smaller than last recorded.");
return Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId)); return Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId, partitionId));
} }
if (stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) { if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > partitionData.stateEpoch()) {
log.error("Request state epoch smaller than last recorded."); log.error("Request state epoch smaller than last recorded.");
return Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId)); return Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId, partitionId));
} }
@ -482,7 +500,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
return Optional.empty(); return Optional.empty();
} }
private Optional<ReadShareGroupStateResponseData> maybeGetReadStateError(ReadShareGroupStateRequestData request, Long offset) { private Optional<ReadShareGroupStateResponseData> maybeGetReadStateError(ReadShareGroupStateRequestData request) {
String groupId = request.groupId(); String groupId = request.groupId();
ReadShareGroupStateRequestData.ReadStateData topicData = request.topics().get(0); ReadShareGroupStateRequestData.ReadStateData topicData = request.topics().get(0);
ReadShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0); ReadShareGroupStateRequestData.PartitionData partitionData = topicData.partitions().get(0);
@ -503,7 +521,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
} }
SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId); SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, topicId, partitionId);
if (leaderEpochMap.containsKey(mapKey, offset) && leaderEpochMap.get(mapKey, offset) > partitionData.leaderEpoch()) { if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > partitionData.leaderEpoch()) {
log.error("Request leader epoch id is smaller than last recorded."); log.error("Request leader epoch id is smaller than last recorded.");
return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, partitionId, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message())); return Optional.of(ReadShareGroupStateResponse.toErrorResponseData(topicId, partitionId, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message()));
} }
@ -554,7 +572,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
} }
private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) { private static ShareGroupOffset merge(ShareGroupOffset soFar, ShareUpdateValue newData) {
// snapshot epoch should be same as last share snapshot // Snapshot epoch should be same as last share snapshot.
// state epoch is not present // state epoch is not present
List<PersisterStateBatch> currentBatches = soFar.stateBatches(); List<PersisterStateBatch> currentBatches = soFar.stateBatches();
long newStartOffset = newData.startOffset() == -1 ? soFar.startOffset() : newData.startOffset(); long newStartOffset = newData.startOffset() == -1 ? soFar.startOffset() : newData.startOffset();

View File

@ -40,7 +40,6 @@ import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import java.time.Duration; import java.time.Duration;
import java.util.Arrays; import java.util.Arrays;
@ -57,6 +56,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -163,10 +163,10 @@ class ShareCoordinatorServiceTest {
)); ));
when(runtime.scheduleWriteOperation( when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("write-share-group-state"), eq("write-share-group-state"),
ArgumentMatchers.eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
ArgumentMatchers.eq(Duration.ofMillis(5000)), eq(Duration.ofMillis(5000)),
ArgumentMatchers.any() any()
)) ))
.thenReturn(CompletableFuture.completedFuture(response1)) .thenReturn(CompletableFuture.completedFuture(response1))
.thenReturn(CompletableFuture.completedFuture(response2)); .thenReturn(CompletableFuture.completedFuture(response2));
@ -274,10 +274,11 @@ class ShareCoordinatorServiceTest {
))) )))
); );
when(runtime.scheduleReadOperation( when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("read-share-group-state"), eq("read-update-leader-epoch-state"),
ArgumentMatchers.eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)), eq(new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0)),
ArgumentMatchers.any() any(),
any()
)) ))
.thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData() .thenReturn(CompletableFuture.completedFuture(new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(topicData1)))) .setResults(Collections.singletonList(topicData1))))
@ -590,7 +591,7 @@ class ShareCoordinatorServiceTest {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
int partition = 0; int partition = 0;
when(runtime.scheduleReadOperation(any(), any(), any())) when(runtime.scheduleWriteOperation(any(), any(), any(), any()))
.thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())); .thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
assertEquals(new ReadShareGroupStateResponseData() assertEquals(new ReadShareGroupStateResponseData()
@ -625,7 +626,7 @@ class ShareCoordinatorServiceTest {
Time.SYSTEM Time.SYSTEM
); );
service.startup(() -> 50); service.startup(() -> 1);
String groupId = "group1"; String groupId = "group1";
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
@ -635,7 +636,7 @@ class ShareCoordinatorServiceTest {
assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic()); assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic());
int expectedPartition = tp.partition(); int expectedPartition = tp.partition();
// The presence of a topic name should not affect the choice of partition // The presence of a topic name should not affect the choice of partition.
tp = service.topicPartitionFor(new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever"))); tp = service.topicPartitionFor(new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever")));
assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic()); assertEquals(Topic.SHARE_GROUP_STATE_TOPIC_NAME, tp.topic());
assertEquals(expectedPartition, tp.partition()); assertEquals(expectedPartition, tp.partition());
@ -656,22 +657,20 @@ class ShareCoordinatorServiceTest {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
int partition = 0; int partition = 0;
// inactive shard should throw exception // Inactive shard should throw exception.
assertThrows(CoordinatorNotAvailableException.class, () -> service.partitionFor(SharePartitionKey.getInstance(groupId, topicId, partition))); assertThrows(CoordinatorNotAvailableException.class, () -> service.partitionFor(SharePartitionKey.getInstance(groupId, topicId, partition)));
final int numPartitions = 50; final int numPartitions = 1;
service.startup(() -> numPartitions); service.startup(() -> numPartitions);
final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, partition, null)); final SharePartitionKey key1 = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, partition, null));
int sharePartitionKey = service.partitionFor(key1); assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) % numPartitions, service.partitionFor(key1));
assertEquals(Utils.abs(key1.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey);
// The presence of a topic name should not affect the choice of partition // The presence of a topic name should not affect the choice of partition.
final SharePartitionKey key2 = new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever")); final SharePartitionKey key2 = new SharePartitionKey(groupId, new TopicIdPartition(topicId, partition, "whatever"));
sharePartitionKey = service.partitionFor(key2); assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) % numPartitions, service.partitionFor(key2));
assertEquals(Utils.abs(key2.asCoordinatorKey().hashCode()) % numPartitions, sharePartitionKey);
// asCoordinatorKey does not discriminate on topic name // asCoordinatorKey does not discriminate on topic name.
assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey()); assertEquals(key1.asCoordinatorKey(), key2.asCoordinatorKey());
} }
} }

View File

@ -41,6 +41,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.config.ShareCoordinatorConfig; import org.apache.kafka.server.config.ShareCoordinatorConfig;
import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PersisterStateBatch; import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.SnapshotRegistry;
@ -53,7 +54,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@ -71,8 +74,8 @@ class ShareCoordinatorShardTest {
public static class ShareCoordinatorShardBuilder { public static class ShareCoordinatorShardBuilder {
private final LogContext logContext = new LogContext(); private final LogContext logContext = new LogContext();
private ShareCoordinatorConfig config = null; private ShareCoordinatorConfig config = null;
private CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class); private final CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
private CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class); private final CoordinatorMetricsShard metricsShard = mock(CoordinatorMetricsShard.class);
private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
private MetadataImage metadataImage = null; private MetadataImage metadataImage = null;
private Map<String, String> configOverrides = new HashMap<>(); private Map<String, String> configOverrides = new HashMap<>();
@ -186,7 +189,7 @@ class ShareCoordinatorShardTest {
) )
); );
// First replay should populate values in otherwise empty shareStateMap and leaderMap // First replay should populate values in otherwise empty shareStateMap and leaderMap.
shard.replay(offset, producerId, producerEpoch, record1); shard.replay(offset, producerId, producerEpoch, record1);
assertEquals(groupOffset(record1.value().message()), assertEquals(groupOffset(record1.value().message()),
@ -194,7 +197,7 @@ class ShareCoordinatorShardTest {
assertEquals(leaderEpoch, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(leaderEpoch, shard.getLeaderMapValue(shareCoordinatorKey));
// Second replay should update the existing values in shareStateMap and leaderMap // Second replay should update the existing values in shareStateMap and leaderMap.
shard.replay(offset + 1, producerId, producerEpoch, record2); shard.replay(offset + 1, producerId, producerEpoch, record2);
assertEquals(groupOffset(record2.value().message()), shard.getShareStateMapValue(shareCoordinatorKey)); assertEquals(groupOffset(record2.value().message()), shard.getShareStateMapValue(shareCoordinatorKey));
@ -298,8 +301,8 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0)); shard.replay(0L, 0L, (short) 0, result.records().get(0));
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
// the snapshot epoch here will be 1 since this is a snapshot update record, // The snapshot epoch here will be 1 since this is a snapshot update record,
// and it refers to parent share snapshot // and it refers to parent share snapshot.
expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 0) GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 0)
)); ));
@ -312,7 +315,7 @@ class ShareCoordinatorShardTest {
assertEquals(incrementalUpdate.snapshotEpoch(), combinedState.snapshotEpoch()); assertEquals(incrementalUpdate.snapshotEpoch(), combinedState.snapshotEpoch());
assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch()); assertEquals(incrementalUpdate.leaderEpoch(), combinedState.leaderEpoch());
assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset()); assertEquals(incrementalUpdate.startOffset(), combinedState.startOffset());
// the batches should have combined to 1 since same state // The batches should have combined to 1 since same state.
assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)), assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20, (byte) 0, (short) 1)),
combinedState.stateBatches()); combinedState.stateBatches());
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
@ -418,7 +421,7 @@ class ShareCoordinatorShardTest {
.setPartition(PARTITION) .setPartition(PARTITION)
.setStartOffset(0) .setStartOffset(0)
.setStateEpoch(0) .setStateEpoch(0)
.setLeaderEpoch(3) // lower leader epoch in the second request .setLeaderEpoch(3) // Lower leader epoch in the second request.
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11) .setFirstOffset(11)
.setLastOffset(20) .setLastOffset(20)
@ -443,7 +446,7 @@ class ShareCoordinatorShardTest {
result = shard.writeState(request2); result = shard.writeState(request2);
// Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected // Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData( expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message()); TOPIC_ID, PARTITION, Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH.message());
expectedRecords = Collections.emptyList(); expectedRecords = Collections.emptyList();
@ -451,7 +454,7 @@ class ShareCoordinatorShardTest {
assertEquals(expectedData, result.response()); assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records()); assertEquals(expectedRecords, result.records());
// No changes to the leaderMap // No changes to the leaderMap.
assertEquals(5, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(5, shard.getLeaderMapValue(shareCoordinatorKey));
} }
@ -483,7 +486,7 @@ class ShareCoordinatorShardTest {
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData() .setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION) .setPartition(PARTITION)
.setStartOffset(0) .setStartOffset(0)
.setStateEpoch(0) // lower state epoch in the second request .setStateEpoch(0) // Lower state epoch in the second request.
.setLeaderEpoch(5) .setLeaderEpoch(5)
.setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch() .setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11) .setFirstOffset(11)
@ -509,7 +512,7 @@ class ShareCoordinatorShardTest {
result = shard.writeState(request2); result = shard.writeState(request2);
// Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected // Since the leader epoch in the second request was lower than the one in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData( expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.message()); TOPIC_ID, PARTITION, Errors.FENCED_STATE_EPOCH, Errors.FENCED_STATE_EPOCH.message());
expectedRecords = Collections.emptyList(); expectedRecords = Collections.emptyList();
@ -517,7 +520,7 @@ class ShareCoordinatorShardTest {
assertEquals(expectedData, result.response()); assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records()); assertEquals(expectedRecords, result.records());
// No changes to the stateEpochMap // No changes to the stateEpochMap.
assertEquals(1, shard.getStateEpochMapValue(shareCoordinatorKey)); assertEquals(1, shard.getStateEpochMapValue(shareCoordinatorKey));
} }
@ -537,7 +540,7 @@ class ShareCoordinatorShardTest {
.setPartition(PARTITION) .setPartition(PARTITION)
.setLeaderEpoch(1))))); .setLeaderEpoch(1)))));
ReadShareGroupStateResponseData result = shard.readState(request, 0L); CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request);
assertEquals(ReadShareGroupStateResponse.toResponseData( assertEquals(ReadShareGroupStateResponse.toResponseData(
TOPIC_ID, TOPIC_ID,
@ -550,9 +553,9 @@ class ShareCoordinatorShardTest {
.setDeliveryCount((short) 1) .setDeliveryCount((short) 1)
.setDeliveryState((byte) 0) .setDeliveryState((byte) 0)
) )
), result); ), result.response());
assertEquals(1, shard.getLeaderMapValue(coordinatorKey)); assertEquals(0, shard.getLeaderMapValue(coordinatorKey));
} }
@Test @Test
@ -573,14 +576,14 @@ class ShareCoordinatorShardTest {
.setPartition(partition) .setPartition(partition)
.setLeaderEpoch(5))))); .setLeaderEpoch(5)))));
ReadShareGroupStateResponseData result = shard.readState(request, 0L); CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request);
ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData( ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
assertEquals(expectedData, result); assertEquals(expectedData, result.response());
// Leader epoch should not be changed because the request failed // Leader epoch should not be changed because the request failed.
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
} }
@ -602,14 +605,14 @@ class ShareCoordinatorShardTest {
.setPartition(0) .setPartition(0)
.setLeaderEpoch(5))))); .setLeaderEpoch(5)))));
ReadShareGroupStateResponseData result = shard.readState(request, 0L); CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request);
ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData( ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message()); TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
assertEquals(expectedData, result); assertEquals(expectedData, result.response());
// Leader epoch should not be changed because the request failed // Leader epoch should not be changed because the request failed.
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
} }
@ -619,7 +622,7 @@ class ShareCoordinatorShardTest {
int leaderEpoch = 5; int leaderEpoch = 5;
writeAndReplayRecord(shard, leaderEpoch); // leaderEpoch in the leaderMap will be 5 writeAndReplayRecord(shard, leaderEpoch); // leaderEpoch in the leaderMap will be 5.
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
@ -629,9 +632,9 @@ class ShareCoordinatorShardTest {
.setTopicId(TOPIC_ID) .setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData() .setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION) .setPartition(PARTITION)
.setLeaderEpoch(3))))); // lower leaderEpoch than the one stored in leaderMap .setLeaderEpoch(3))))); // Lower leaderEpoch than the one stored in leaderMap.
ReadShareGroupStateResponseData result = shard.readState(request, 0L); CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request);
ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData( ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, TOPIC_ID,
@ -639,7 +642,7 @@ class ShareCoordinatorShardTest {
Errors.FENCED_LEADER_EPOCH, Errors.FENCED_LEADER_EPOCH,
Errors.FENCED_LEADER_EPOCH.message()); Errors.FENCED_LEADER_EPOCH.message());
assertEquals(expectedData, result); assertEquals(expectedData, result.response());
assertEquals(leaderEpoch, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(leaderEpoch, shard.getLeaderMapValue(shareCoordinatorKey));
} }
@ -677,7 +680,7 @@ class ShareCoordinatorShardTest {
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
// set initial state // Set initial state.
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID) .setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
@ -724,7 +727,7 @@ class ShareCoordinatorShardTest {
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
// acknowledge b1 // Acknowledge b1.
WriteShareGroupStateRequestData requestUpdateB1 = new WriteShareGroupStateRequestData() WriteShareGroupStateRequestData requestUpdateB1 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID) .setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
@ -739,14 +742,14 @@ class ShareCoordinatorShardTest {
.setFirstOffset(100) .setFirstOffset(100)
.setLastOffset(109) .setLastOffset(109)
.setDeliveryCount((short) 1) .setDeliveryCount((short) 1)
.setDeliveryState((byte) 2))) // acked .setDeliveryState((byte) 2))) // Acked
)) ))
)); ));
result = shard.writeState(requestUpdateB1); result = shard.writeState(requestUpdateB1);
shard.replay(0L, 0L, (short) 0, result.records().get(0)); shard.replay(0L, 0L, (short) 0, result.records().get(0));
// ack batch 3 and move start offset // Ack batch 3 and move start offset.
WriteShareGroupStateRequestData requestUpdateStartOffsetAndB3 = new WriteShareGroupStateRequestData() WriteShareGroupStateRequestData requestUpdateStartOffsetAndB3 = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID) .setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData() .setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
@ -761,7 +764,7 @@ class ShareCoordinatorShardTest {
.setFirstOffset(120) .setFirstOffset(120)
.setLastOffset(129) .setLastOffset(129)
.setDeliveryCount((short) 1) .setDeliveryCount((short) 1)
.setDeliveryState((byte) 2))) //acked .setDeliveryState((byte) 2))) //Acked
)) ))
)); ));
@ -775,7 +778,7 @@ class ShareCoordinatorShardTest {
.setStateEpoch(0) .setStateEpoch(0)
.setSnapshotEpoch(2) // since 2nd share snapshot .setSnapshotEpoch(2) // since 2nd share snapshot
.setStateBatches(Arrays.asList( .setStateBatches(Arrays.asList(
new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost
new PersisterStateBatch(120, 129, (byte) 2, (short) 1) new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
)) ))
.build(); .build();
@ -793,6 +796,95 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
} }
@Test
public void testReadStateLeaderEpochUpdateSuccess() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
ReadShareGroupStateRequestData request = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request);
shard.replay(0L, 0L, (short) 0, result.records().get(0));
ReadShareGroupStateResponseData expectedData = ReadShareGroupStateResponse.toResponseData(
TOPIC_ID, PARTITION,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
Collections.emptyList());
List<CoordinatorRecord> expectedRecords = Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, new ShareGroupOffset.Builder()
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
.setLeaderEpoch(2)
.setStateBatches(Collections.emptyList())
.setSnapshotEpoch(0)
.setStateEpoch(PartitionFactory.DEFAULT_STATE_EPOCH)
.build()
));
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertEquals(groupOffset(expectedRecords.get(0).value().message()), shard.getShareStateMapValue(shareCoordinatorKey));
assertEquals(2, shard.getLeaderMapValue(shareCoordinatorKey));
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
@Test
public void testReadStateLeaderEpochUpdateNoUpdate() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
ReadShareGroupStateRequestData request1 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result = shard.readStateAndMaybeUpdateLeaderEpoch(request1);
assertFalse(result.records().isEmpty()); // Record generated.
// Apply record to update soft state.
shard.replay(0L, 0L, (short) 0, result.records().get(0));
ReadShareGroupStateRequestData request2 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result2 = shard.readStateAndMaybeUpdateLeaderEpoch(request2);
assertTrue(result2.records().isEmpty()); // Leader epoch -1 - no update.
ReadShareGroupStateRequestData request3 = new ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
CoordinatorResult<ReadShareGroupStateResponseData, CoordinatorRecord> result3 = shard.readStateAndMaybeUpdateLeaderEpoch(request3);
assertTrue(result3.records().isEmpty()); // Same leader epoch - no update.
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
private static ShareGroupOffset groupOffset(ApiMessage record) { private static ShareGroupOffset groupOffset(ApiMessage record) {
if (record instanceof ShareSnapshotValue) { if (record instanceof ShareSnapshotValue) {
return ShareGroupOffset.fromRecord((ShareSnapshotValue) record); return ShareGroupOffset.fromRecord((ShareSnapshotValue) record);