KAFKA-19695: Fix bug in redundant offset calculation. (#20516)

* The `ShareCoordinatorShard` maintains the the record offset
information for `SharePartitionKey`s in the
`ShareCoordinatorOffsetsManager` class.
* Replay of `ShareSnapshot`s in the shards are reflected in the offsets
manager including records created due to delete state.
* However, if the share partition delete is due to topic delete, no
record will ever be written for the same  `SharePartitionKey` post the
delete tombstone (as topic id will not repeat).
As a result the offset manager will always consider the deleted share
partition's offset as the last redundant one.
* The fix is to make the offset manager aware of the tombstone records
and remove them from the redundant offset calculation.
* Unit tests have been updated for the same.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>
This commit is contained in:
Sushant Mahajan 2025-09-10 22:08:34 +05:30 committed by GitHub
parent 351203873d
commit ff5025a21c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 90 additions and 17 deletions

View File

@ -60,13 +60,20 @@ public class ShareCoordinatorOffsetsManager {
*
* @param key - represents {@link SharePartitionKey} whose offset needs updating
* @param offset - represents the latest partition offset for provided key
* @param isDelete - true if the offset is for a tombstone record
*/
public void updateState(SharePartitionKey key, long offset) {
public void updateState(SharePartitionKey key, long offset, boolean isDelete) {
lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
offsets.put(key, offset);
Optional<Long> redundantOffset = findRedundantOffset();
redundantOffset.ifPresent(lastRedundantOffset::set);
// If the share partition is deleted, we should not hold onto its offset in our calculations
// as there is nothing beyond deletion which is going to update its state.
if (isDelete) {
offsets.remove(key);
}
}
private Optional<Long> findRedundantOffset() {

View File

@ -266,7 +266,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord
}
}
offsetsManager.updateState(mapKey, offset);
offsetsManager.updateState(mapKey, offset, value == null);
}
private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) {

View File

@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ShareCoordinatorOffsetsManagerTest {
@ -48,16 +49,19 @@ public class ShareCoordinatorOffsetsManagerTest {
@Test
public void testUpdateStateAddsToInternalState() {
manager.updateState(KEY1, 0L);
manager.updateState(KEY1, 0L, false);
assertEquals(Optional.empty(), manager.lastRedundantOffset());
manager.updateState(KEY1, 10L);
manager.updateState(KEY1, 10L, false);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant.
manager.updateState(KEY2, 15L);
manager.updateState(KEY2, 15L, false);
assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, still 10L.
assertEquals(10L, manager.curState().get(KEY1));
manager.updateState(KEY1, 25L, true);
assertEquals(Optional.of(15L), manager.lastRedundantOffset()); // KEY1 deleted, no longer part of calculation.
assertNull(manager.curState().get(KEY1));
assertEquals(15L, manager.curState().get(KEY2));
}
@ -66,15 +70,21 @@ public class ShareCoordinatorOffsetsManagerTest {
final SharePartitionKey key;
final long offset;
final Optional<Long> expectedOffset;
final boolean isDelete;
private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset) {
private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset, boolean isDelete) {
this.key = key;
this.offset = offset;
this.expectedOffset = expectedOffset;
this.isDelete = isDelete;
}
static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset) {
return new TestTuple(key, offset, expectedOffset);
return new TestTuple(key, offset, expectedOffset, false);
}
static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset, boolean isDelete) {
return new TestTuple(key, offset, expectedOffset, isDelete);
}
}
@ -96,19 +106,35 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
"no redundant state single key",
"no redundant state single key.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L))
)
),
new ShareOffsetTestHolder(
"no redundant state multiple keys",
"no redundant state single key with delete.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true)
)
),
new ShareOffsetTestHolder(
"no redundant state multiple keys.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L))
)
),
new ShareOffsetTestHolder(
"no redundant state multiple keys with delete.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true),
ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(11L), true),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(13L), true)
)
)
);
}
@ -116,7 +142,7 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
return Stream.of(
new ShareOffsetTestHolder(
"redundant state single key",
"redundant state single key.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)),
@ -125,7 +151,7 @@ public class ShareCoordinatorOffsetsManagerTest {
),
new ShareOffsetTestHolder(
"redundant state multiple keys",
"redundant state multiple keys.",
// KEY1: 10 17
// KEY2: 11 16
// KEY3: 15
@ -136,6 +162,20 @@ public class ShareCoordinatorOffsetsManagerTest {
ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned
ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L))
)
),
new ShareOffsetTestHolder(
"redundant state multiple keys with delete.",
// KEY1: 10 17
// KEY2: 11 16
// KEY3: 15
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L), true),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned
ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(16L)) // Because we have removed KEY3 from calculation
)
)
);
@ -144,7 +184,7 @@ public class ShareCoordinatorOffsetsManagerTest {
static Stream<ShareOffsetTestHolder> generateComplexCases() {
return Stream.of(
new ShareOffsetTestHolder(
"redundant state reverse key order",
"redundant state reverse key order.",
// Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
@ -156,6 +196,18 @@ public class ShareCoordinatorOffsetsManagerTest {
)
),
new ShareOffsetTestHolder(
"redundant state reverse key order with delete.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L), true),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L), true),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(25L)) // Because KEY2 and KEY3 are gone.
)
),
new ShareOffsetTestHolder(
"redundant state infrequently written partition.",
List.of(
@ -170,6 +222,20 @@ public class ShareCoordinatorOffsetsManagerTest {
ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L))
)
),
new ShareOffsetTestHolder(
"redundant state infrequently written partition with delete.",
List.of(
ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L), true), //KEY3 no longer party to calculation
ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)),
ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L), true), //KEY2 no longer party to calculation
ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(30L))
)
)
);
}
@ -179,7 +245,7 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
@ -190,7 +256,7 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
@ -201,9 +267,9 @@ public class ShareCoordinatorOffsetsManagerTest {
public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
if (holder.shouldRun) {
holder.tuples.forEach(tuple -> {
manager.updateState(tuple.key, tuple.offset);
manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName);
});
}
}
}
}