KAFKA-19154; Offset Fetch API should return INVALID_OFFSET if requested topic id does not match persisted one (#19744)
CI / build (push) Waiting to run Details

This patch updates the OffsetFetch API to ensure that a committed offset
is returned iff the requested topic id matches the persisted one; the
invalid offset is returned otherwise.

Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
David Jacot 2025-05-27 16:15:06 +02:00 committed by GitHub
parent d9233d2f16
commit 25031373da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 197 additions and 2 deletions

View File

@ -100,6 +100,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
}
}
protected def deleteTopic(
topic: String
): Unit = {
val admin = cluster.admin()
try {
admin
.deleteTopics(TopicCollection.ofTopicNames(List(topic).asJava))
.all()
.get()
} finally {
admin.close()
}
}
protected def createTopicAndReturnLeaders(
topic: String,
numPartitions: Int = 1,

View File

@ -527,4 +527,88 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB
)
}
}
@ClusterTest
def testFetchOffsetWithRecreatedTopic(): Unit = {
// There are two ways to ensure that committed of recreated topics are not returned.
// 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted is called to
// delete all its committed offsets.
// 2) Since version 10 of the OffsetCommit API, the topic id is stored alongside the
// committed offset. When it is queried, it is only returned iff the topic id of
// committed offset matches the requested one.
// The test tests both conditions but not in a deterministic way as they race
// against each others.
createOffsetsTopic()
// Create the topic.
var topicId = createTopic(
topic = "foo",
numPartitions = 3
)
// Join the consumer group. Note that we don't heartbeat here so we must use
// a session long enough for the duration of the test.
val (memberId, memberEpoch) = joinConsumerGroup("grp", true)
// Commit offsets.
for (partitionId <- 0 to 2) {
commitOffset(
groupId = "grp",
memberId = memberId,
memberEpoch = memberEpoch,
topic = "foo",
topicId = topicId,
partition = partitionId,
offset = 100L + partitionId,
expectedError = Errors.NONE,
version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
)
}
// Delete topic.
deleteTopic("foo")
// Recreate topic.
topicId = createTopic(
topic = "foo",
numPartitions = 3
)
// Start from version 10 because fetching topic id is not supported before.
for (version <- 10 to ApiKeys.OFFSET_FETCH.latestVersion(isUnstableApiEnabled)) {
assertEquals(
new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId("grp")
.setTopics(List(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setTopicId(topicId)
.setPartitions(List(
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(0)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(1)
.setCommittedOffset(-1L),
new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(2)
.setCommittedOffset(-1L)
).asJava)
).asJava),
fetchOffsets(
group = new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId("grp")
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setTopics(List(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setTopicId(topicId)
.setPartitionIndexes(List[Integer](0, 1, 2).asJava)
).asJava),
requireStable = true,
version = version.toShort
)
)
}
}
}

View File

@ -906,7 +906,7 @@ public class OffsetMetadataManager {
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata(""));
} else if (offsetAndMetadata == null) {
} else if (isOffsetInvalid(offsetAndMetadata, topic.topicId())) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setCommittedOffset(INVALID_OFFSET)
@ -927,6 +927,14 @@ public class OffsetMetadataManager {
.setTopics(topicResponses);
}
private static boolean isOffsetInvalid(OffsetAndMetadata offsetAndMetadata, Uuid expectedTopicId) {
return offsetAndMetadata == null || isMismatchedTopicId(offsetAndMetadata.topicId, expectedTopicId);
}
private static boolean isMismatchedTopicId(Uuid actual, Uuid expected) {
return !actual.equals(Uuid.ZERO_UUID) && !expected.equals(Uuid.ZERO_UUID) && !actual.equals(expected);
}
/**
* Fetch all offsets for a given Group.
*

View File

@ -441,6 +441,28 @@ public class OffsetMetadataManagerTest {
long offset,
int leaderEpoch,
long commitTimestamp
) {
commitOffset(
producerId,
groupId,
Uuid.ZERO_UUID,
topic,
partition,
offset,
leaderEpoch,
commitTimestamp
);
}
public void commitOffset(
long producerId,
String groupId,
Uuid topicId,
String topic,
int partition,
long offset,
int leaderEpoch,
long commitTimestamp
) {
replay(producerId, GroupCoordinatorRecordHelpers.newOffsetCommitRecord(
groupId,
@ -452,7 +474,7 @@ public class OffsetMetadataManagerTest {
"metadata",
commitTimestamp,
OptionalLong.empty(),
Uuid.ZERO_UUID
topicId
)
));
}
@ -1827,6 +1849,73 @@ public class OffsetMetadataManagerTest {
), context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testFetchOffsetsWithRecreatedTopic() {
Uuid fooId1 = Uuid.randomUuid();
Uuid fooId2 = Uuid.randomUuid();
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
context.commitOffset(
RecordBatch.NO_PRODUCER_ID,
"group",
fooId1,
"foo",
0,
100L,
1,
context.time.milliseconds()
);
context.commitOffset(
RecordBatch.NO_PRODUCER_ID,
"group",
fooId1,
"foo",
1,
100L,
1,
context.time.milliseconds()
);
// Request with the correct topic id.
var request = List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(fooId1)
.setPartitionIndexes(List.of(0, 1))
);
assertEquals(List.of(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setTopicId(fooId1)
.setPartitions(List.of(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 100L, 1, "metadata")
))
), context.fetchOffsets("group", request, Long.MAX_VALUE));
// Request with the incorrect topic id.
request = List.of(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setTopicId(fooId2)
.setPartitionIndexes(List.of(0, 1))
);
assertEquals(List.of(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setTopicId(fooId2)
.setPartitions(List.of(
mkInvalidOffsetPartitionResponse(0),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testFetchOffsetsAtDifferentCommittedOffset() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();