KAFKA-17470: CommitRequestManager should record failed request only once even if multiple errors in response (#17109)

Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
TengYao Chi 2024-09-10 03:52:32 +08:00 committed by GitHub
parent 4d182d12f6
commit 92672d1df8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 93 additions and 6 deletions

View File

@ -616,7 +616,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
}
private class OffsetCommitRequestState extends RetriableRequestState {
class OffsetCommitRequestState extends RetriableRequestState {
private Map<TopicPartition, OffsetAndMetadata> offsets;
private final String groupId;
private final Optional<String> groupInstanceId;
@ -711,6 +711,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
long currentTimeMs = response.receivedTimeMs();
OffsetCommitResponse commitResponse = (OffsetCommitResponse) response.responseBody();
Set<String> unauthorizedTopics = new HashSet<>();
boolean failedRequestRegistered = false;
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
@ -723,7 +724,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
continue;
}
onFailedAttempt(currentTimeMs);
if (!failedRequestRegistered) {
onFailedAttempt(currentTimeMs);
failedRequestRegistered = true;
}
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
return;
@ -1042,14 +1047,19 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
response.partitionDataMap(groupId);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(responseData.size());
Set<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<>();
boolean failedRequestRegistered = false;
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData partitionData = entry.getValue();
if (partitionData.hasError()) {
onFailedAttempt(currentTimeMs);
Errors error = partitionData.error;
log.debug("Failed to fetch offset for partition {}: {}", tp, error.message());
if (!failedRequestRegistered) {
onFailedAttempt(currentTimeMs);
failedRequestRegistered = true;
}
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
return;

View File

@ -79,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@ -900,6 +901,33 @@ public class CommitRequestManagerTest {
assertFutureThrows(commitResult, RetriableCommitFailedException.class);
}
@ParameterizedTest
@MethodSource("offsetCommitExceptionSupplier")
public void testOffsetCommitSingleFailedAttemptPerRequestWhenPartitionErrors(final Errors error) {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(1));
offsets.put(new TopicPartition("t1", 1), new OffsetAndMetadata(2));
offsets.put(new TopicPartition("t1", 2), new OffsetAndMetadata(3));
commitRequestManager.commitSync(offsets, time.milliseconds() + defaultApiTimeoutMs);
NetworkClientDelegate.PollResult res = commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic", (short) 1, error, offsets.size()));
CommitRequestManager.OffsetCommitRequestState commitRequest = commitRequestManager.pendingRequests.unsentOffsetCommits.peek();
if (error.exception() instanceof RetriableException) {
assertNotNull(commitRequest);
assertEquals(1, commitRequest.numAttempts, "Only one failed attempt should be registered, even if the response contains multiple partition errors");
time.sleep(retryBackoffMs);
res = commitRequestManager.poll(time.milliseconds());
res.unsentRequests.get(0).handler().onComplete(mockOffsetCommitResponse("topic", (short) 1, error, offsets.size()));
assertEquals(2, commitRequest.numAttempts, "Only one failed attempt should be registered, even if the response contains multiple partition errors");
} else assertNull(commitRequest);
}
@Test
public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
CommitRequestManager commitRequestManager = create(true, 100);
@ -1205,6 +1233,8 @@ public class CommitRequestManagerTest {
final Errors error
) {
futures.forEach(f -> assertFalse(f.isDone()));
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
"Only one failed attempt should be registered, even if the response contains multiple partition errors");
// The manager should backoff before retry
time.sleep(retryBackoffMs);
@ -1212,6 +1242,8 @@ public class CommitRequestManagerTest {
assertEquals(1, poll.unsentRequests.size());
futures.forEach(f -> assertFalse(f.isDone()));
mimicResponse(error, poll);
assertEquals(2, commitRequestManager.pendingRequests.unsentOffsetFetches.get(0).numAttempts,
"Only one failed attempt should be registered, even if the response contains multiple partition errors");
// Sleep util timeout
time.sleep(defaultApiTimeoutMs);
@ -1294,8 +1326,10 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
TopicPartition tp1 = new TopicPartition("t1", 2);
TopicPartition tp2 = new TopicPartition("t2", 3);
TopicPartition tp3 = new TopicPartition("t3", 4);
partitions.add(tp1);
partitions.add(tp2);
partitions.add(tp3);
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
@ -1307,6 +1341,7 @@ public class CommitRequestManagerTest {
HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = new HashMap<>();
topicPartitionData.put(tp1, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
topicPartitionData.put(tp2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
topicPartitionData.put(tp3, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
res.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(
res.unsentRequests.get(0),
@ -1464,13 +1499,14 @@ public class CommitRequestManagerTest {
}
public ClientResponse mockOffsetCommitResponse(String topic,
private ClientResponse mockOffsetCommitResponse(String topic,
int partition,
short apiKeyVersion,
Errors error) {
return mockOffsetCommitResponse(topic, partition, apiKeyVersion, time.milliseconds(), time.milliseconds(), error);
}
public ClientResponse mockOffsetCommitResponse(String topic,
private ClientResponse mockOffsetCommitResponse(String topic,
int partition,
short apiKeyVersion,
long createdTimeMs,
@ -1499,7 +1535,40 @@ public class CommitRequestManagerTest {
);
}
public ClientResponse mockOffsetCommitResponseDisconnected(String topic, int partition,
private ClientResponse mockOffsetCommitResponse(String topic,
short apiKeyVersion,
Errors error,
int partitionSize) {
return mockOffsetCommitResponse(topic, apiKeyVersion, time.milliseconds(), time.milliseconds(), error, partitionSize);
}
private ClientResponse mockOffsetCommitResponse(String topic,
short apiKeyVersion,
long createdTimeMs,
long receivedTimeMs,
Errors error,
int partitionSize) {
OffsetCommitResponseData responseData = new OffsetCommitResponseData()
.setTopics(Collections.singletonList(
new OffsetCommitResponseData.OffsetCommitResponseTopic()
.setName(topic)
.setPartitions(mockOffsetCommitResponseWithPartitionErrors(error, partitionSize))));
OffsetCommitResponse response = mock(OffsetCommitResponse.class);
when(response.data()).thenReturn(responseData);
return new ClientResponse(
new RequestHeader(ApiKeys.OFFSET_COMMIT, apiKeyVersion, "", 1),
null,
"-1",
createdTimeMs,
receivedTimeMs,
false,
null,
null,
new OffsetCommitResponse(responseData)
);
}
private ClientResponse mockOffsetCommitResponseDisconnected(String topic, int partition,
short apiKeyVersion,
NetworkClientDelegate.UnsentRequest unsentRequest) {
OffsetCommitResponseData responseData = new OffsetCommitResponseData()
@ -1553,4 +1622,12 @@ public class CommitRequestManagerTest {
name,
CONSUMER_COORDINATOR_METRICS));
}
private List<OffsetCommitResponseData.OffsetCommitResponsePartition> mockOffsetCommitResponseWithPartitionErrors(Errors error, int partitionSize) {
List<OffsetCommitResponseData.OffsetCommitResponsePartition> partitions = new ArrayList<>(partitionSize);
for (int i = 0; i < partitionSize; i++) {
partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition().setErrorCode(error.code()).setPartitionIndex(i));
}
return partitions;
}
}