diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java index 69070f65e93..0b3e5a5ff08 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java @@ -60,13 +60,20 @@ public class ShareCoordinatorOffsetsManager { * * @param key - represents {@link SharePartitionKey} whose offset needs updating * @param offset - represents the latest partition offset for provided key + * @param isDelete - true if the offset is for a tombstone record */ - public void updateState(SharePartitionKey key, long offset) { + public void updateState(SharePartitionKey key, long offset, boolean isDelete) { lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset)); offsets.put(key, offset); Optional redundantOffset = findRedundantOffset(); redundantOffset.ifPresent(lastRedundantOffset::set); + + // If the share partition is deleted, we should not hold onto its offset in our calculations + // as there is nothing beyond deletion which is going to update its state. + if (isDelete) { + offsets.remove(key); + } } private Optional findRedundantOffset() { diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 76a654de4c1..9d52780faa5 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -266,7 +266,7 @@ public class ShareCoordinatorShard implements CoordinatorShard expectedOffset; + final boolean isDelete; - private TestTuple(SharePartitionKey key, long offset, Optional expectedOffset) { + private TestTuple(SharePartitionKey key, long offset, Optional expectedOffset, boolean isDelete) { this.key = key; this.offset = offset; this.expectedOffset = expectedOffset; + this.isDelete = isDelete; } static TestTuple instance(SharePartitionKey key, long offset, Optional expectedOffset) { - return new TestTuple(key, offset, expectedOffset); + return new TestTuple(key, offset, expectedOffset, false); + } + + static TestTuple instance(SharePartitionKey key, long offset, Optional expectedOffset, boolean isDelete) { + return new TestTuple(key, offset, expectedOffset, isDelete); } } @@ -96,19 +106,35 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream generateNoRedundantStateCases() { return Stream.of( new ShareOffsetTestHolder( - "no redundant state single key", + "no redundant state single key.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)) ) ), new ShareOffsetTestHolder( - "no redundant state multiple keys", + "no redundant state single key with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true) + ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L)) ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(11L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(13L), true) + ) ) ); } @@ -116,7 +142,7 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream generateRedundantStateCases() { return Stream.of( new ShareOffsetTestHolder( - "redundant state single key", + "redundant state single key.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)), @@ -125,7 +151,7 @@ public class ShareCoordinatorOffsetsManagerTest { ), new ShareOffsetTestHolder( - "redundant state multiple keys", + "redundant state multiple keys.", // KEY1: 10 17 // KEY2: 11 16 // KEY3: 15 @@ -136,6 +162,20 @@ public class ShareCoordinatorOffsetsManagerTest { ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L)) ) + ), + + new ShareOffsetTestHolder( + "redundant state multiple keys with delete.", + // KEY1: 10 17 + // KEY2: 11 16 + // KEY3: 15 + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned + ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(16L)) // Because we have removed KEY3 from calculation + ) ) ); @@ -144,7 +184,7 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream generateComplexCases() { return Stream.of( new ShareOffsetTestHolder( - "redundant state reverse key order", + "redundant state reverse key order.", // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1. List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), @@ -156,6 +196,18 @@ public class ShareCoordinatorOffsetsManagerTest { ) ), + new ShareOffsetTestHolder( + "redundant state reverse key order with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(25L)) // Because KEY2 and KEY3 are gone. + ) + ), + new ShareOffsetTestHolder( "redundant state infrequently written partition.", List.of( @@ -170,6 +222,20 @@ public class ShareCoordinatorOffsetsManagerTest { ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L)) ) + ), + + new ShareOffsetTestHolder( + "redundant state infrequently written partition with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L), true), //KEY3 no longer party to calculation + ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L), true), //KEY2 no longer party to calculation + ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(30L)) + ) ) ); } @@ -179,7 +245,7 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } @@ -190,7 +256,7 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } @@ -201,9 +267,9 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } } -} +} \ No newline at end of file