KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18548)

Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>
This commit is contained in:
Ken Huang 2025-01-31 00:22:54 +08:00 committed by GitHub
parent be96807ac8
commit 4b29fd6383
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 75 additions and 49 deletions

View File

@ -161,6 +161,7 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) { public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) { if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped(); membershipManager().onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY; return NetworkClientDelegate.PollResult.EMPTY;
} }
pollTimer.update(currentTimeMs); pollTimer.update(currentTimeMs);
@ -263,6 +264,11 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
pollTimer.reset(maxPollIntervalMs); pollTimer.reset(maxPollIntervalMs);
} }
private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.getAndClearFatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
}
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs); heartbeatRequestState.onSendAttempt(currentTimeMs);

View File

@ -176,9 +176,11 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
*/ */
@Override @Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll only when the coordinator node is known. // poll when the coordinator node is known and fatal error is not present
if (coordinatorRequestManager.coordinator().isEmpty()) if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
return EMPTY; return EMPTY;
}
if (closing) { if (closing) {
return drainPendingOffsetCommitRequests(); return drainPendingOffsetCommitRequests();
@ -1246,6 +1248,16 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
clearAll(); clearAll();
return res; return res;
} }
private void maybeFailOnCoordinatorFatalError() {
coordinatorRequestManager.fatalError().ifPresent(error -> {
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
clearAll();
}
);
}
} }
/** /**

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.kafka.clients.consumer.internals; package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.DisconnectException;
@ -53,7 +51,6 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
public class CoordinatorRequestManager implements RequestManager { public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Logger log; private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId; private final String groupId;
private final RequestState coordinatorRequestState; private final RequestState coordinatorRequestState;
@ -61,17 +58,21 @@ public class CoordinatorRequestManager implements RequestManager {
private long totalDisconnectedMin = 0; private long totalDisconnectedMin = 0;
private boolean closing = false; private boolean closing = false;
private Node coordinator; private Node coordinator;
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
// appropriate actions.
// For example:
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
// - CommitRequestManager fail pending requests.
private Optional<Throwable> fatalError = Optional.empty();
public CoordinatorRequestManager( public CoordinatorRequestManager(
final LogContext logContext, final LogContext logContext,
final long retryBackoffMs, final long retryBackoffMs,
final long retryBackoffMaxMs, final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId final String groupId
) { ) {
Objects.requireNonNull(groupId); Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass()); this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId; this.groupId = groupId;
this.coordinatorRequestState = new RequestState( this.coordinatorRequestState = new RequestState(
logContext, logContext,
@ -120,6 +121,7 @@ public class CoordinatorRequestManager implements RequestManager {
); );
return unsentRequest.whenComplete((clientResponse, throwable) -> { return unsentRequest.whenComplete((clientResponse, throwable) -> {
getAndClearFatalError();
if (clientResponse != null) { if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response); onResponse(clientResponse.receivedTimeMs(), response);
@ -206,12 +208,12 @@ public class CoordinatorRequestManager implements RequestManager {
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException)); fatalError = Optional.of(groupAuthorizationException);
return; return;
} }
log.warn("FindCoordinator request failed due to fatal exception", exception); log.warn("FindCoordinator request failed due to fatal exception", exception);
backgroundEventHandler.add(new ErrorEvent(exception)); fatalError = Optional.of(exception);
} }
/** /**
@ -250,4 +252,14 @@ public class CoordinatorRequestManager implements RequestManager {
public Optional<Node> coordinator() { public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator); return Optional.ofNullable(this.coordinator);
} }
public Optional<Throwable> getAndClearFatalError() {
Optional<Throwable> fatalError = this.fatalError;
this.fatalError = Optional.empty();
return fatalError;
}
public Optional<Throwable> fatalError() {
return fatalError;
}
} }

View File

@ -194,7 +194,6 @@ public class RequestManagers implements Closeable {
logContext, logContext,
retryBackoffMs, retryBackoffMs,
retryBackoffMaxMs, retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId); groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager( commitRequestManager = new CommitRequestManager(
time, time,
@ -295,7 +294,6 @@ public class RequestManagers implements Closeable {
logContext, logContext,
retryBackoffMs, retryBackoffMs,
retryBackoffMaxMs, retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId); groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new ShareMembershipManager( ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext, logContext,

View File

@ -2239,7 +2239,7 @@ public class KafkaConsumerTest {
// by the background thread, so it can realize there is authentication fail and then // by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException // throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class, assertPollEventuallyThrows(consumer, AuthenticationException.class,
"he consumer was not able to discover metadata errors during continuous polling."); "this consumer was not able to discover metadata errors during continuous polling.");
} else { } else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
} }

View File

@ -1499,6 +1499,23 @@ public class CommitRequestManagerTest {
assertEquals("topic", data.topics().get(0).name()); assertEquals("topic", data.topics().get(0).name());
} }
@Test
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));
assertEmptyPendingRequests(commitRequestManager);
}
private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());

View File

@ -18,9 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -35,6 +33,8 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -49,9 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoInteractions;
public class CoordinatorRequestManagerTest { public class CoordinatorRequestManagerTest {
@ -191,23 +189,10 @@ public class CoordinatorRequestManagerTest {
} }
@Test @Test
public void testPropagateAndBackoffAfterFatalError() { public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
if (!(backgroundEvent instanceof ErrorEvent))
return false;
RuntimeException exception = ((ErrorEvent) backgroundEvent).error();
if (!(exception instanceof GroupAuthorizationException))
return false;
GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
return groupAuthException.groupId().equals(GROUP_ID);
}));
time.sleep(RETRY_BACKOFF_MS - 1); time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
@ -254,19 +239,20 @@ public class CoordinatorRequestManagerTest {
assertEquals(1, res2.unsentRequests.size()); assertEquals(1, res2.unsentRequests.size());
} }
@Test @ParameterizedTest
public void testSignalOnClose() { @EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.NONE); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
assertTrue(coordinatorManager.coordinator().isPresent()); assertTrue(coordinatorManager.fatalError().isPresent());
coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds());
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
coordinatorManager.signalClose();
time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
time.sleep(RETRY_BACKOFF_MS); time.sleep(RETRY_BACKOFF_MS);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests, // there are no successful responses, so the fatal error should persist
"Should not generate find coordinator request during close"); assertTrue(coordinatorManager.fatalError().isPresent());
// receiving a successful response should clear the fatal error
expectFindCoordinatorRequest(coordinatorManager, error);
assertTrue(coordinatorManager.fatalError().isEmpty());
} }
private void expectFindCoordinatorRequest( private void expectFindCoordinatorRequest(
@ -288,7 +274,6 @@ public class CoordinatorRequestManagerTest {
new LogContext(), new LogContext(),
RETRY_BACKOFF_MS, RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS, RETRY_BACKOFF_MS,
this.backgroundEventHandler,
groupId groupId
); );
} }

View File

@ -1294,7 +1294,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer() val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
@ -1331,7 +1331,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer() val consumer = createConsumer()
@ -1357,7 +1357,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic) createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)

View File

@ -435,8 +435,4 @@ object QuorumTestHarness {
Arguments.of("kraft", GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT)) Arguments.of("kraft", GroupProtocol.CONSUMER.name.toLowerCase(Locale.ROOT))
) )
} }
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
} }