diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index 45baa20266b..d66d34fddf9 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -100,6 +100,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { } } + protected def deleteTopic( + topic: String + ): Unit = { + val admin = cluster.admin() + try { + admin + .deleteTopics(TopicCollection.ofTopicNames(List(topic).asJava)) + .all() + .get() + } finally { + admin.close() + } + } + protected def createTopicAndReturnLeaders( topic: String, numPartitions: Int = 1, diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala index 8b949c5d3e0..326f02ec7e2 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala @@ -527,4 +527,88 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB ) } } + + @ClusterTest + def testFetchOffsetWithRecreatedTopic(): Unit = { + // There are two ways to ensure that committed of recreated topics are not returned. + // 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted is called to + // delete all its committed offsets. + // 2) Since version 10 of the OffsetCommit API, the topic id is stored alongside the + // committed offset. When it is queried, it is only returned iff the topic id of + // committed offset matches the requested one. + // The test tests both conditions but not in a deterministic way as they race + // against each others. + + createOffsetsTopic() + + // Create the topic. + var topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup("grp", true) + + // Commit offsets. + for (partitionId <- 0 to 2) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + topicId = topicId, + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } + + // Delete topic. + deleteTopic("foo") + + // Recreate topic. + topicId = createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Start from version 10 because fetching topic id is not supported before. + for (version <- 10 to ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) { + assertEquals( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("grp") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(topicId) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(-1L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(-1L), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(2) + .setCommittedOffset(-1L) + ).asJava) + ).asJava), + fetchOffsets( + group = new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("grp") + .setMemberId(memberId) + .setMemberEpoch(memberEpoch) + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setTopicId(topicId) + .setPartitionIndexes(List[Integer](0, 1, 2).asJava) + ).asJava), + requireStable = true, + version = version.toShort + ) + ) + } + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 1f67d23246c..ffba6dee2ce 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -906,7 +906,7 @@ public class OffsetMetadataManager { .setCommittedOffset(INVALID_OFFSET) .setCommittedLeaderEpoch(-1) .setMetadata("")); - } else if (offsetAndMetadata == null) { + } else if (isOffsetInvalid(offsetAndMetadata, topic.topicId())) { topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(partitionIndex) .setCommittedOffset(INVALID_OFFSET) @@ -927,6 +927,14 @@ public class OffsetMetadataManager { .setTopics(topicResponses); } + private static boolean isOffsetInvalid(OffsetAndMetadata offsetAndMetadata, Uuid expectedTopicId) { + return offsetAndMetadata == null || isMismatchedTopicId(offsetAndMetadata.topicId, expectedTopicId); + } + + private static boolean isMismatchedTopicId(Uuid actual, Uuid expected) { + return !actual.equals(Uuid.ZERO_UUID) && !expected.equals(Uuid.ZERO_UUID) && !actual.equals(expected); + } + /** * Fetch all offsets for a given Group. * diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index d3ddef15773..7eac3d25890 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -441,6 +441,28 @@ public class OffsetMetadataManagerTest { long offset, int leaderEpoch, long commitTimestamp + ) { + commitOffset( + producerId, + groupId, + Uuid.ZERO_UUID, + topic, + partition, + offset, + leaderEpoch, + commitTimestamp + ); + } + + public void commitOffset( + long producerId, + String groupId, + Uuid topicId, + String topic, + int partition, + long offset, + int leaderEpoch, + long commitTimestamp ) { replay(producerId, GroupCoordinatorRecordHelpers.newOffsetCommitRecord( groupId, @@ -452,7 +474,7 @@ public class OffsetMetadataManagerTest { "metadata", commitTimestamp, OptionalLong.empty(), - Uuid.ZERO_UUID + topicId ) )); } @@ -1827,6 +1849,73 @@ public class OffsetMetadataManagerTest { ), context.fetchOffsets("group", request, Long.MAX_VALUE)); } + @Test + public void testFetchOffsetsWithRecreatedTopic() { + Uuid fooId1 = Uuid.randomUuid(); + Uuid fooId2 = Uuid.randomUuid(); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); + + context.commitOffset( + RecordBatch.NO_PRODUCER_ID, + "group", + fooId1, + "foo", + 0, + 100L, + 1, + context.time.milliseconds() + ); + + context.commitOffset( + RecordBatch.NO_PRODUCER_ID, + "group", + fooId1, + "foo", + 1, + 100L, + 1, + context.time.milliseconds() + ); + + // Request with the correct topic id. + var request = List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setTopicId(fooId1) + .setPartitionIndexes(List.of(0, 1)) + ); + + assertEquals(List.of( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setTopicId(fooId1) + .setPartitions(List.of( + mkOffsetPartitionResponse(0, 100L, 1, "metadata"), + mkOffsetPartitionResponse(1, 100L, 1, "metadata") + )) + ), context.fetchOffsets("group", request, Long.MAX_VALUE)); + + // Request with the incorrect topic id. + request = List.of( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setTopicId(fooId2) + .setPartitionIndexes(List.of(0, 1)) + ); + + assertEquals(List.of( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setTopicId(fooId2) + .setPartitions(List.of( + mkInvalidOffsetPartitionResponse(0), + mkInvalidOffsetPartitionResponse(1) + )) + ), context.fetchOffsets("group", request, Long.MAX_VALUE)); + } + @Test public void testFetchOffsetsAtDifferentCommittedOffset() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();