Revert "KAFKA-18034: CommitRequestManager should fail pending requests on fatal coordinator errors (#18050)" (#18544)

This reverts commit 70d6312a3a.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Ismael Juma 2025-01-15 00:16:47 -08:00 committed by GitHub
parent 1672a4bc27
commit f3a93551fa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 38 additions and 77 deletions

View File

@ -161,7 +161,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
public NetworkClientDelegate.PollResult poll(long currentTimeMs) { public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) { if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
membershipManager().onHeartbeatRequestSkipped(); membershipManager().onHeartbeatRequestSkipped();
maybePropagateCoordinatorFatalErrorEvent();
return NetworkClientDelegate.PollResult.EMPTY; return NetworkClientDelegate.PollResult.EMPTY;
} }
pollTimer.update(currentTimeMs); pollTimer.update(currentTimeMs);
@ -264,11 +263,6 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
pollTimer.reset(maxPollIntervalMs); pollTimer.reset(maxPollIntervalMs);
} }
private void maybePropagateCoordinatorFatalErrorEvent() {
coordinatorRequestManager.fatalError()
.ifPresent(fatalError -> backgroundEventHandler.add(new ErrorEvent(fatalError)));
}
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) { private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
heartbeatRequestState.onSendAttempt(currentTimeMs); heartbeatRequestState.onSendAttempt(currentTimeMs);

View File

@ -176,11 +176,9 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
*/ */
@Override @Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
// poll when the coordinator node is known and fatal error is not present // poll only when the coordinator node is known.
if (coordinatorRequestManager.coordinator().isEmpty()) { if (coordinatorRequestManager.coordinator().isEmpty())
pendingRequests.maybeFailOnCoordinatorFatalError();
return EMPTY; return EMPTY;
}
if (closing) { if (closing) {
return drainPendingOffsetCommitRequests(); return drainPendingOffsetCommitRequests();
@ -1248,16 +1246,6 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
clearAll(); clearAll();
return res; return res;
} }
private void maybeFailOnCoordinatorFatalError() {
coordinatorRequestManager.fatalError().ifPresent(error -> {
log.warn("Failing all unsent commit requests and offset fetches because of coordinator fatal error. ", error);
unsentOffsetCommits.forEach(request -> request.future.completeExceptionally(error));
unsentOffsetFetches.forEach(request -> request.future.completeExceptionally(error));
clearAll();
}
);
}
} }
/** /**

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.clients.consumer.internals; package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.DisconnectException;
@ -51,27 +53,24 @@ import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
public class CoordinatorRequestManager implements RequestManager { public class CoordinatorRequestManager implements RequestManager {
private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000; private static final long COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS = 60 * 1000;
private final Logger log; private final Logger log;
private final BackgroundEventHandler backgroundEventHandler;
private final String groupId; private final String groupId;
private final RequestState coordinatorRequestState; private final RequestState coordinatorRequestState;
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
private long totalDisconnectedMin = 0; private long totalDisconnectedMin = 0;
private Node coordinator; private Node coordinator;
// Hold the latest fatal error received. It is exposed so that managers requiring a coordinator can access it and take
// appropriate actions.
// For example:
// - AbstractHeartbeatRequestManager propagates the error event to the application thread.
// - CommitRequestManager fail pending requests.
private Optional<Throwable> fatalError = Optional.empty();
public CoordinatorRequestManager( public CoordinatorRequestManager(
final LogContext logContext, final LogContext logContext,
final long retryBackoffMs, final long retryBackoffMs,
final long retryBackoffMaxMs, final long retryBackoffMaxMs,
final BackgroundEventHandler errorHandler,
final String groupId final String groupId
) { ) {
Objects.requireNonNull(groupId); Objects.requireNonNull(groupId);
this.log = logContext.logger(this.getClass()); this.log = logContext.logger(this.getClass());
this.backgroundEventHandler = errorHandler;
this.groupId = groupId; this.groupId = groupId;
this.coordinatorRequestState = new RequestState( this.coordinatorRequestState = new RequestState(
logContext, logContext,
@ -115,7 +114,6 @@ public class CoordinatorRequestManager implements RequestManager {
); );
return unsentRequest.whenComplete((clientResponse, throwable) -> { return unsentRequest.whenComplete((clientResponse, throwable) -> {
clearFatalError();
if (clientResponse != null) { if (clientResponse != null) {
FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody(); FindCoordinatorResponse response = (FindCoordinatorResponse) clientResponse.responseBody();
onResponse(clientResponse.receivedTimeMs(), response); onResponse(clientResponse.receivedTimeMs(), response);
@ -202,12 +200,12 @@ public class CoordinatorRequestManager implements RequestManager {
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) { if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage()); log.debug("FindCoordinator request failed due to authorization error {}", exception.getMessage());
KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId); KafkaException groupAuthorizationException = GroupAuthorizationException.forGroupId(this.groupId);
fatalError = Optional.of(groupAuthorizationException); backgroundEventHandler.add(new ErrorEvent(groupAuthorizationException));
return; return;
} }
log.warn("FindCoordinator request failed due to fatal exception", exception); log.warn("FindCoordinator request failed due to fatal exception", exception);
fatalError = Optional.of(exception); backgroundEventHandler.add(new ErrorEvent(exception));
} }
/** /**
@ -246,12 +244,4 @@ public class CoordinatorRequestManager implements RequestManager {
public Optional<Node> coordinator() { public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator); return Optional.ofNullable(this.coordinator);
} }
private void clearFatalError() {
this.fatalError = Optional.empty();
}
public Optional<Throwable> fatalError() {
return fatalError;
}
} }

View File

@ -194,6 +194,7 @@ public class RequestManagers implements Closeable {
logContext, logContext,
retryBackoffMs, retryBackoffMs,
retryBackoffMaxMs, retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId); groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager( commitRequestManager = new CommitRequestManager(
time, time,
@ -294,6 +295,7 @@ public class RequestManagers implements Closeable {
logContext, logContext,
retryBackoffMs, retryBackoffMs,
retryBackoffMaxMs, retryBackoffMaxMs,
backgroundEventHandler,
groupRebalanceConfig.groupId); groupRebalanceConfig.groupId);
ShareMembershipManager shareMembershipManager = new ShareMembershipManager( ShareMembershipManager shareMembershipManager = new ShareMembershipManager(
logContext, logContext,

View File

@ -2239,7 +2239,7 @@ public class KafkaConsumerTest {
// by the background thread, so it can realize there is authentication fail and then // by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException // throw the AuthenticationException
assertPollEventuallyThrows(consumer, AuthenticationException.class, assertPollEventuallyThrows(consumer, AuthenticationException.class,
"this consumer was not able to discover metadata errors during continuous polling."); "he consumer was not able to discover metadata errors during continuous polling.");
} else { } else {
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
} }

View File

@ -1499,23 +1499,6 @@ public class CommitRequestManagerTest {
assertEquals("topic", data.topics().get(0).name()); assertEquals("topic", data.topics().get(0).name());
} }
@Test
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
CommitRequestManager commitRequestManager = create(true, 100);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
commitRequestManager.fetchOffsets(Collections.singleton(new TopicPartition("test", 0)), 200);
assertEquals(1, commitRequestManager.pendingRequests.unsentOffsetFetches.size());
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
when(coordinatorRequestManager.fatalError())
.thenReturn(Optional.of(new GroupAuthorizationException("Group authorization exception")));
assertEquals(NetworkClientDelegate.PollResult.EMPTY, commitRequestManager.poll(200));
assertEmptyPendingRequests(commitRequestManager);
}
private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) { private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManager) {
assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.inflightOffsetFetches.isEmpty());
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty()); assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());

View File

@ -18,7 +18,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -33,8 +35,6 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -49,7 +49,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoInteractions;
public class CoordinatorRequestManagerTest { public class CoordinatorRequestManagerTest {
@ -189,10 +191,23 @@ public class CoordinatorRequestManagerTest {
} }
@Test @Test
public void testBackoffAfterFatalError() { public void testPropagateAndBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID); CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED); expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
if (!(backgroundEvent instanceof ErrorEvent))
return false;
RuntimeException exception = ((ErrorEvent) backgroundEvent).error();
if (!(exception instanceof GroupAuthorizationException))
return false;
GroupAuthorizationException groupAuthException = (GroupAuthorizationException) exception;
return groupAuthException.groupId().equals(GROUP_ID);
}));
time.sleep(RETRY_BACKOFF_MS - 1); time.sleep(RETRY_BACKOFF_MS - 1);
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests); assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
@ -239,22 +254,6 @@ public class CoordinatorRequestManagerTest {
assertEquals(1, res2.unsentRequests.size()); assertEquals(1, res2.unsentRequests.size());
} }
@ParameterizedTest
@EnumSource(value = Errors.class, names = {"NONE", "COORDINATOR_NOT_AVAILABLE"})
public void testClearFatalErrorWhenReceivingSuccessfulResponse(Errors error) {
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager, Errors.GROUP_AUTHORIZATION_FAILED);
assertTrue(coordinatorManager.fatalError().isPresent());
time.sleep(RETRY_BACKOFF_MS);
// there are no successful responses, so the fatal error should persist
assertTrue(coordinatorManager.fatalError().isPresent());
// receiving a successful response should clear the fatal error
expectFindCoordinatorRequest(coordinatorManager, error);
assertTrue(coordinatorManager.fatalError().isEmpty());
}
private void expectFindCoordinatorRequest( private void expectFindCoordinatorRequest(
CoordinatorRequestManager coordinatorManager, CoordinatorRequestManager coordinatorManager,
Errors error Errors error
@ -274,6 +273,7 @@ public class CoordinatorRequestManagerTest {
new LogContext(), new LogContext(),
RETRY_BACKOFF_MS, RETRY_BACKOFF_MS,
RETRY_BACKOFF_MS, RETRY_BACKOFF_MS,
this.backgroundEventHandler,
groupId groupId
); );
} }

View File

@ -1272,7 +1272,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = { def testCommitWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer() val consumer = createConsumer()
assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava)) assertThrows(classOf[GroupAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
@ -1309,7 +1309,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { def testCommitWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)
val consumer = createConsumer() val consumer = createConsumer()
@ -1335,7 +1335,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034"))
def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = { def testOffsetFetchWithNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic) createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource) addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), topicResource)

View File

@ -441,4 +441,8 @@ object QuorumTestHarness {
// The following is for tests that only work with the classic group protocol because of relying on Zookeeper // The following is for tests that only work with the classic group protocol because of relying on Zookeeper
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT))) def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit: java.util.stream.Stream[Arguments] = stream.Stream.of(Arguments.of("zk", GroupProtocol.CLASSIC.name.toLowerCase(Locale.ROOT)))
// The following parameter groups are to *temporarily* avoid bugs with the CONSUMER group protocol Consumer
// implementation that would otherwise cause tests to fail.
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034: stream.Stream[Arguments] = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
} }