KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support (#15155)

This patch adds `UNSTABLE_OFFSET_COMMIT` errors support in the new group coordinator. `UNSTABLE_OFFSET_COMMIT` errors for partitions with unstable offset commits. Here unstable means that there are ongoing transactions.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-01-12 05:33:39 -08:00 committed by GitHub
parent e9f2218d94
commit 6b9cb5ccbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 282 additions and 17 deletions

View File

@ -50,6 +50,7 @@ import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.ArrayList;
@ -195,6 +196,11 @@ public class OffsetMetadataManager {
*/
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
/**
* The open transactions (producer ids) keyed by group.
*/
private final TimelineHashMap<String, TimelineHashSet<Long>> openTransactionsByGroup;
private class Offsets {
/**
* The offsets keyed by group id, topic name and partition id.
@ -279,6 +285,7 @@ public class OffsetMetadataManager {
this.metrics = metrics;
this.offsets = new Offsets();
this.pendingTransactionalOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.openTransactionsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
}
/**
@ -656,6 +663,28 @@ public class OffsetMetadataManager {
return numDeletedOffsets.get();
}
/**
* @return true iff there is at least one pending transactional offset for the given
* group, topic and partition.
*/
private boolean hasPendingTransactionalOffsets(
String groupId,
String topic,
int partition
) {
final TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions == null) return false;
for (Long producerId : openTransactions) {
Offsets offsets = pendingTransactionalOffsets.get(producerId);
if (offsets != null && offsets.get(groupId, topic, partition) != null) {
return true;
}
}
return false;
}
/**
* Fetch offsets for a given Group.
*
@ -668,6 +697,8 @@ public class OffsetMetadataManager {
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws ApiException {
final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE;
boolean failAllPartitions = false;
try {
validateOffsetFetch(request, lastCommittedOffset);
@ -691,7 +722,14 @@ public class OffsetMetadataManager {
final OffsetAndMetadata offsetAndMetadata = topicOffsets == null ?
null : topicOffsets.get(partitionIndex, lastCommittedOffset);
if (offsetAndMetadata == null) {
if (requireStable && hasPendingTransactionalOffsets(request.groupId(), topic.name(), partitionIndex)) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata(""));
} else if (offsetAndMetadata == null) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partitionIndex)
.setCommittedOffset(INVALID_OFFSET)
@ -724,6 +762,8 @@ public class OffsetMetadataManager {
OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws ApiException {
final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE;
try {
validateOffsetFetch(request, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
@ -749,11 +789,20 @@ public class OffsetMetadataManager {
final int partition = partitionEntry.getKey();
final OffsetAndMetadata offsetAndMetadata = partitionEntry.getValue();
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
if (requireStable && hasPendingTransactionalOffsets(request.groupId(), topic, partition)) {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setErrorCode(Errors.UNSTABLE_OFFSET_COMMIT.code())
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata(""));
} else {
topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setCommittedOffset(offsetAndMetadata.offset)
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
.setMetadata(offsetAndMetadata.metadata));
}
});
});
}
@ -879,13 +928,17 @@ public class OffsetMetadataManager {
// Otherwise, the transaction offset is stored in the pending transactional
// offsets store. Pending offsets there are moved to the main store when
// the transaction is committed; or removed when the transaction is aborted.
Offsets pendingOffsets = pendingTransactionalOffsets.computeIfAbsent(producerId, __ -> new Offsets());
pendingOffsets.put(
groupId,
topic,
partition,
OffsetAndMetadata.fromRecord(value)
);
pendingTransactionalOffsets
.computeIfAbsent(producerId, __ -> new Offsets())
.put(
groupId,
topic,
partition,
OffsetAndMetadata.fromRecord(value)
);
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
@ -907,9 +960,21 @@ public class OffsetMetadataManager {
) throws RuntimeException {
Offsets pendingOffsets = pendingTransactionalOffsets.remove(producerId);
if (pendingOffsets == null) {
log.debug("Replayed end transaction marker with result {} for producer id {} but " +
"no pending offsets are present. Ignoring it.", result, producerId);
return;
}
pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
TimelineHashSet<Long> openTransactions = openTransactionsByGroup.get(groupId);
if (openTransactions != null) {
openTransactions.remove(producerId);
}
});
if (result == TransactionResult.COMMIT) {
log.debug("Committed transactional offset commits for producer id {}.", producerId);
if (pendingOffsets == null) return;
pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
topicOffsets.forEach((topicName, partitionOffsets) -> {

View File

@ -181,6 +181,12 @@ public class OffsetMetadataManagerTest {
this.offsetMetadataManager = offsetMetadataManager;
}
public void commit() {
long lastCommittedOffset = this.lastCommittedOffset;
this.lastCommittedOffset = lastWrittenOffset;
snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
}
public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset(
OffsetCommitRequestData request
) {
@ -357,8 +363,14 @@ public class OffsetMetadataManagerTest {
long offset,
int leaderEpoch
) {
commitOffset(groupId, topic, partition, offset, leaderEpoch, time.milliseconds());
commitOffset(
groupId,
topic,
partition,
offset,
leaderEpoch,
time.milliseconds()
);
}
public void commitOffset(
@ -369,7 +381,27 @@ public class OffsetMetadataManagerTest {
int leaderEpoch,
long commitTimestamp
) {
replay(RecordHelpers.newOffsetCommitRecord(
commitOffset(
RecordBatch.NO_PRODUCER_ID,
groupId,
topic,
partition,
offset,
leaderEpoch,
commitTimestamp
);
}
public void commitOffset(
long producerId,
String groupId,
String topic,
int partition,
long offset,
int leaderEpoch,
long commitTimestamp
) {
replay(producerId, RecordHelpers.newOffsetCommitRecord(
groupId,
topic,
partition,
@ -1856,6 +1888,91 @@ public class OffsetMetadataManagerTest {
), context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testFetchOffsetsWithPendingTransactionalOffsets() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
context.commitOffset("group", "foo", 0, 100L, 1);
context.commitOffset("group", "foo", 1, 110L, 1);
context.commitOffset("group", "bar", 0, 200L, 1);
context.commit();
assertEquals(3, context.lastWrittenOffset);
assertEquals(3, context.lastCommittedOffset);
context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds());
context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds());
// Note that bar-1 does not exist in the initial commits. UNSTABLE_OFFSET_COMMIT errors
// must be returned in this case too.
context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
// Always use the same request.
List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Arrays.asList(0, 1)),
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("bar")
.setPartitionIndexes(Arrays.asList(0, 1))
);
// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for
// foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1, bar-0 and bar-1.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT),
mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT)
))
), context.fetchOffsets("group", request, Long.MAX_VALUE));
// Fetching offsets without "require stable" (lastCommittedOffset) should return the committed
// offset for foo-0, foo-1 and bar-0 and the INVALID_OFFSET for bar-1.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata"),
mkInvalidOffsetPartitionResponse(1)
))
), context.fetchOffsets("group", request, context.lastCommittedOffset));
// Commit the ongoing transaction.
context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
// Fetching offsets with "require stable" (Long.MAX_VALUE) should not return any errors now.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 201L, 1, "metadata"),
mkOffsetPartitionResponse(1, 211L, 1, "metadata")
))
), context.fetchOffsets("group", request, Long.MAX_VALUE));
}
@Test
public void testGenericGroupFetchAllOffsetsWithDeadGroup() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
@ -1963,6 +2080,80 @@ public class OffsetMetadataManagerTest {
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
context.commitOffset("group", "foo", 0, 100L, 1);
context.commitOffset("group", "foo", 1, 110L, 1);
context.commitOffset("group", "bar", 0, 200L, 1);
context.commit();
assertEquals(3, context.lastWrittenOffset);
assertEquals(3, context.lastCommittedOffset);
context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds());
context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds());
// Note that bar-1 does not exist in the initial commits. The API does not return it at all until
// the transaction is committed.
context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the committed offset for
// foo-0 and the UNSTABLE_OFFSET_COMMIT error for foo-1 and bar-0.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT)
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT)
))
), context.fetchAllOffsets("group", Long.MAX_VALUE));
// Fetching offsets without "require stable" (lastCommittedOffset) should the committed
// offset for the foo-0, foo-1 and bar-0.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 200L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 110L, 1, "metadata")
))
), context.fetchAllOffsets("group", context.lastCommittedOffset));
// Commit the ongoing transaction.
context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
// Fetching offsets with "require stable" (Long.MAX_VALUE) should not return any errors now.
assertEquals(Arrays.asList(
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("bar")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 201L, 1, "metadata"),
mkOffsetPartitionResponse(1, 211L, 1, "metadata")
)),
new OffsetFetchResponseData.OffsetFetchResponseTopics()
.setName("foo")
.setPartitions(Arrays.asList(
mkOffsetPartitionResponse(0, 100L, 1, "metadata"),
mkOffsetPartitionResponse(1, 111L, 1, "metadata")
))
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
@Test
public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
@ -2310,6 +2501,15 @@ public class OffsetMetadataManagerTest {
.setMetadata("");
}
static private OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(int partition, Errors error) {
return new OffsetFetchResponseData.OffsetFetchResponsePartitions()
.setPartitionIndex(partition)
.setErrorCode(error.code())
.setCommittedOffset(INVALID_OFFSET)
.setCommittedLeaderEpoch(-1)
.setMetadata("");
}
@Test
public void testReplay() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();