KAFKA-18086: Enable propagation of the error message when writing state (#17980)

* KAFKA-18086: Enable propagation of the error message when writing state

* Propagate the error message in the writing state when calling SharePartitionManager.acknowledge and SharePartitionManager.releaseSession, and add corresponding tests to verify that the expected error message is propagated.

* Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Yung 2024-12-06 01:48:26 +08:00 committed by GitHub
parent 50b6953661
commit fa54065298
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 16 deletions

View File

@ -272,20 +272,20 @@ public class SharePartitionManager implements AutoCloseable {
log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}",
acknowledgeTopics.keySet(), groupId); acknowledgeTopics.keySet(), groupId);
this.shareGroupMetrics.shareAcknowledgement(); this.shareGroupMetrics.shareAcknowledgement();
Map<TopicIdPartition, CompletableFuture<Errors>> futures = new HashMap<>(); Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>();
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> { acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition != null) { if (sharePartition != null) {
CompletableFuture<Errors> future = new CompletableFuture<>(); CompletableFuture<Throwable> future = new CompletableFuture<>();
sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> {
if (throwable != null) { if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey, throwable); handleFencedSharePartitionException(sharePartitionKey, throwable);
future.complete(Errors.forException(throwable)); future.complete(throwable);
return; return;
} }
acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement)); acknowledgePartitionBatches.forEach(batch -> batch.acknowledgeTypes().forEach(this.shareGroupMetrics::recordAcknowledgement));
future.complete(Errors.NONE); future.complete(null);
}); });
// If we have an acknowledgement completed for a topic-partition, then we should check if // If we have an acknowledgement completed for a topic-partition, then we should check if
@ -295,7 +295,7 @@ public class SharePartitionManager implements AutoCloseable {
futures.put(topicIdPartition, future); futures.put(topicIdPartition, future);
} else { } else {
futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); futures.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
} }
}); });
@ -303,9 +303,16 @@ public class SharePartitionManager implements AutoCloseable {
futures.values().toArray(new CompletableFuture[0])); futures.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> { return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>(); Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>();
futures.forEach((topicIdPartition, future) -> result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData() futures.forEach((topicIdPartition, future) -> {
.setPartitionIndex(topicIdPartition.partition()) ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData()
.setErrorCode(future.join().code()))); .setPartitionIndex(topicIdPartition.partition());
Throwable t = future.join();
if (t != null) {
partitionData.setErrorCode(Errors.forException(t).code())
.setErrorMessage(t.getMessage());
}
result.put(topicIdPartition, partitionData);
});
return result; return result;
}); });
} }
@ -342,22 +349,22 @@ public class SharePartitionManager implements AutoCloseable {
return CompletableFuture.completedFuture(Collections.emptyMap()); return CompletableFuture.completedFuture(Collections.emptyMap());
} }
Map<TopicIdPartition, CompletableFuture<Errors>> futuresMap = new HashMap<>(); Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> { topicIdPartitions.forEach(topicIdPartition -> {
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition); SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey); SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
if (sharePartition == null) { if (sharePartition == null) {
log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition); log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);
futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION)); futuresMap.put(topicIdPartition, CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
} else { } else {
CompletableFuture<Errors> future = new CompletableFuture<>(); CompletableFuture<Throwable> future = new CompletableFuture<>();
sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> { sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> {
if (throwable != null) { if (throwable != null) {
handleFencedSharePartitionException(sharePartitionKey, throwable); handleFencedSharePartitionException(sharePartitionKey, throwable);
future.complete(Errors.forException(throwable)); future.complete(throwable);
return; return;
} }
future.complete(Errors.NONE); future.complete(null);
}); });
// If we have a release acquired request completed for a topic-partition, then we should check if // If we have a release acquired request completed for a topic-partition, then we should check if
// there is a pending share fetch request for the topic-partition and complete it. // there is a pending share fetch request for the topic-partition and complete it.
@ -372,9 +379,16 @@ public class SharePartitionManager implements AutoCloseable {
futuresMap.values().toArray(new CompletableFuture[0])); futuresMap.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> { return allFutures.thenApply(v -> {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>(); Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>();
futuresMap.forEach((topicIdPartition, future) -> result.put(topicIdPartition, new ShareAcknowledgeResponseData.PartitionData() futuresMap.forEach((topicIdPartition, future) -> {
.setPartitionIndex(topicIdPartition.partition()) ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData()
.setErrorCode(future.join().code()))); .setPartitionIndex(topicIdPartition.partition());
Throwable t = future.join();
if (t != null) {
partitionData.setErrorCode(Errors.forException(t).code())
.setErrorMessage(t.getMessage());
}
result.put(topicIdPartition, partitionData);
});
return result; return result;
}); });
} }

View File

@ -1328,9 +1328,11 @@ public class SharePartitionManagerTest {
assertEquals(Errors.NONE.code(), result.get(tp1).errorCode()); assertEquals(Errors.NONE.code(), result.get(tp1).errorCode());
assertEquals(2, result.get(tp2).partitionIndex()); assertEquals(2, result.get(tp2).partitionIndex());
assertEquals(Errors.INVALID_RECORD_STATE.code(), result.get(tp2).errorCode()); assertEquals(Errors.INVALID_RECORD_STATE.code(), result.get(tp2).errorCode());
assertEquals("Unable to release acquired records for the batch", result.get(tp2).errorMessage());
// tp3 was not a part of partitionCacheMap. // tp3 was not a part of partitionCacheMap.
assertEquals(4, result.get(tp3).partitionIndex()); assertEquals(4, result.get(tp3).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp3).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp3).errorMessage());
} }
@Test @Test
@ -1585,6 +1587,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp)); assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex()); assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage());
} }
@Test @Test
@ -1615,6 +1618,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp)); assertTrue(result.containsKey(tp));
assertEquals(0, result.get(tp).partitionIndex()); assertEquals(0, result.get(tp).partitionIndex());
assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode()); assertEquals(Errors.INVALID_REQUEST.code(), result.get(tp).errorCode());
assertEquals("Member is not the owner of batch record", result.get(tp).errorMessage());
} }
@Test @Test
@ -1637,6 +1641,7 @@ public class SharePartitionManagerTest {
assertTrue(result.containsKey(tp)); assertTrue(result.containsKey(tp));
assertEquals(3, result.get(tp).partitionIndex()); assertEquals(3, result.get(tp).partitionIndex());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode()); assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), result.get(tp).errorCode());
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), result.get(tp).errorMessage());
} }
@Test @Test