KAFKA-19337: Write state writes snapshot for higher state epoch. (#19843)

- Due to condition on number of updates/snapshot in
`generateShareStateRecord`, share updates gets written for write state
requests even if they have the highest state epoch seen so far.
- A share update cannot record state epoch. As a result, this update
gets missed.
- This PR remedies the issue and adds a test as proof of the fix.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-05-29 18:15:54 +05:30 committed by GitHub
parent bd939f56de
commit 13b5627274
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 124 additions and 5 deletions

View File

@ -51,7 +51,7 @@ public class ShareCoordinatorRecordHelpers {
); );
} }
public static CoordinatorRecord newShareSnapshotUpdateRecord(String groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) { public static CoordinatorRecord newShareUpdateRecord(String groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) {
return CoordinatorRecord.record( return CoordinatorRecord.record(
new ShareUpdateKey() new ShareUpdateKey()
.setGroupId(groupId) .setGroupId(groupId)

View File

@ -657,6 +657,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
SharePartitionKey key SharePartitionKey key
) { ) {
long timestamp = time.milliseconds(); long timestamp = time.milliseconds();
int updatesPerSnapshotLimit = config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
if (!shareStateMap.containsKey(key)) { if (!shareStateMap.containsKey(key)) {
// Since this is the first time we are getting a write request for key, we should be creating a share snapshot record. // Since this is the first time we are getting a write request for key, we should be creating a share snapshot record.
// The incoming partition data could have overlapping state batches, we must merge them // The incoming partition data could have overlapping state batches, we must merge them
@ -671,7 +672,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
.setCreateTimestamp(timestamp) .setCreateTimestamp(timestamp)
.setWriteTimestamp(timestamp) .setWriteTimestamp(timestamp)
.build()); .build());
} else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { } else if (snapshotUpdateCount.getOrDefault(key, 0) >= updatesPerSnapshotLimit || partitionData.stateEpoch() > shareStateMap.get(key).stateEpoch()) {
ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true ShareGroupOffset currentState = shareStateMap.get(key); // shareStateMap will have the entry as containsKey is true
int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch(); int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? currentState.leaderEpoch() : partitionData.leaderEpoch();
int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch(); int newStateEpoch = partitionData.stateEpoch() == -1 ? currentState.stateEpoch() : partitionData.stateEpoch();
@ -697,7 +698,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
// Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot // Share snapshot is present and number of share snapshot update records < snapshotUpdateRecordsPerSnapshot
// so create a share update record. // so create a share update record.
// The incoming partition data could have overlapping state batches, we must merge them. // The incoming partition data could have overlapping state batches, we must merge them.
return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( return ShareCoordinatorRecordHelpers.newShareUpdateRecord(
key.groupId(), key.topicId(), partitionData.partition(), key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder() new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch()) // Use same snapshotEpoch as last share snapshot. .setSnapshotEpoch(currentState.snapshotEpoch()) // Use same snapshotEpoch as last share snapshot.

View File

@ -84,7 +84,7 @@ public class ShareCoordinatorRecordHelpersTest {
Uuid topicId = Uuid.randomUuid(); Uuid topicId = Uuid.randomUuid();
int partitionId = 1; int partitionId = 1;
PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1); PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, (short) 1);
CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareUpdateRecord(
groupId, groupId,
topicId, topicId,
partitionId, partitionId,

View File

@ -269,6 +269,124 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
} }
@Test
public void testWriteStateSequentialRequestsWithHigherStateEpochCreateShareSnapshots() {
// Makes 3 requests. First 2 with same state epoch, and 3rd with incremented state epoch.
// The test config defines number of updates/snapshot as 50. So, this test proves that
// a higher state epoch in a request forces snapshot creation, even if number of share updates
// have not breached the updates/snapshots limit.
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
int stateEpoch = 0;
SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(stateEpoch)
.setLeaderEpoch(0)
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
.setDeliveryState((byte) 0)))))));
CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> result = shard.writeState(request);
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds())
));
assertEquals(0, shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds())
).value().message()), shard.getShareStateMapValue(shareCoordinatorKey));
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
// State epoch stays same so share update.
request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(stateEpoch)
.setLeaderEpoch(0)
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 2)
.setDeliveryState((byte) 0)))))));
result = shard.writeState(request);
shard.replay(0L, 0L, (short) 0, result.records().get(0));
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds())
));
// Snapshot epoch did not increase
assertEquals(0, shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds())
).value().message()), shard.getShareStateMapValue(shareCoordinatorKey));
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
verify(shard.getMetricsShard(), times(2)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
// State epoch incremented so share snapshot.
request = new WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
.setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(stateEpoch + 1) // incremented
.setLeaderEpoch(0)
.setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 2)
.setDeliveryState((byte) 0)))))));
result = shard.writeState(request);
shard.replay(0L, 0L, (short) 0, result.records().get(0));
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1, TIME.milliseconds())
));
// Snapshot epoch increased.
assertEquals(1, shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1, TIME.milliseconds())
).value().message()), shard.getShareStateMapValue(shareCoordinatorKey));
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
verify(shard.getMetricsShard(), times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
@Test @Test
public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() { public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() {
ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build();
@ -328,7 +446,7 @@ class ShareCoordinatorShardTest {
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
// The snapshot epoch here will be 1 since this is a snapshot update record, // The snapshot epoch here will be 1 since this is a snapshot update record,
// and it refers to parent share snapshot. // and it refers to parent share snapshot.
expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), TIME.milliseconds()) GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), TIME.milliseconds())
)); ));