mirror of https://github.com/apache/kafka.git
KAFKA-18569: New consumer close may wait on unneeded FindCoordinator (#18590)
Reviewers: Lianet Magrans <lmagrans@confluent.io>, Kirk True <ktrue@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
4eb35a435d
commit
90573b4b53
|
@ -62,6 +62,7 @@ import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
|
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
|
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
|
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
|
||||||
|
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
|
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
|
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
|
||||||
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
|
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
|
||||||
|
@ -1333,6 +1334,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
// sequence...
|
// sequence...
|
||||||
swallow(log, Level.ERROR, "Failed to auto-commit offsets",
|
swallow(log, Level.ERROR, "Failed to auto-commit offsets",
|
||||||
() -> autoCommitOnClose(closeTimer), firstException);
|
() -> autoCommitOnClose(closeTimer), firstException);
|
||||||
|
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
|
||||||
|
this::stopFindCoordinatorOnClose, firstException);
|
||||||
swallow(log, Level.ERROR, "Failed to release group assignment",
|
swallow(log, Level.ERROR, "Failed to release group assignment",
|
||||||
() -> runRebalanceCallbacksOnClose(closeTimer), firstException);
|
() -> runRebalanceCallbacksOnClose(closeTimer), firstException);
|
||||||
swallow(log, Level.ERROR, "Failed to leave group while closing consumer",
|
swallow(log, Level.ERROR, "Failed to leave group while closing consumer",
|
||||||
|
@ -1421,6 +1424,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void stopFindCoordinatorOnClose() {
|
||||||
|
if (groupMetadata.get().isEmpty())
|
||||||
|
return;
|
||||||
|
log.debug("Stop finding coordinator during consumer close");
|
||||||
|
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
|
||||||
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
void commitSyncAllConsumed(final Timer timer) {
|
void commitSyncAllConsumed(final Timer timer) {
|
||||||
log.debug("Sending synchronous auto-commit on closing");
|
log.debug("Sending synchronous auto-commit on closing");
|
||||||
|
|
|
@ -59,6 +59,7 @@ public class CoordinatorRequestManager implements RequestManager {
|
||||||
private final RequestState coordinatorRequestState;
|
private final RequestState coordinatorRequestState;
|
||||||
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
|
private long timeMarkedUnknownMs = -1L; // starting logging a warning only after unable to connect for a while
|
||||||
private long totalDisconnectedMin = 0;
|
private long totalDisconnectedMin = 0;
|
||||||
|
private boolean closing = false;
|
||||||
private Node coordinator;
|
private Node coordinator;
|
||||||
|
|
||||||
public CoordinatorRequestManager(
|
public CoordinatorRequestManager(
|
||||||
|
@ -80,6 +81,11 @@ public class CoordinatorRequestManager implements RequestManager {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void signalClose() {
|
||||||
|
closing = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Poll for the FindCoordinator request.
|
* Poll for the FindCoordinator request.
|
||||||
* If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list.
|
* If we don't need to discover a coordinator, this method will return a PollResult with Long.MAX_VALUE backoff time and an empty list.
|
||||||
|
@ -92,7 +98,7 @@ public class CoordinatorRequestManager implements RequestManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
|
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
|
||||||
if (this.coordinator != null)
|
if (closing || this.coordinator != null)
|
||||||
return EMPTY;
|
return EMPTY;
|
||||||
|
|
||||||
if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
|
if (coordinatorRequestState.canSendRequest(currentTimeMs)) {
|
||||||
|
|
|
@ -34,7 +34,7 @@ public abstract class ApplicationEvent {
|
||||||
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
|
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
|
||||||
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
|
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
|
||||||
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
|
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
|
||||||
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
|
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
|
||||||
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
|
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
|
||||||
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||||
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
|
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
|
||||||
|
|
|
@ -148,6 +148,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
process((LeaveGroupOnCloseEvent) event);
|
process((LeaveGroupOnCloseEvent) event);
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
case STOP_FIND_COORDINATOR_ON_CLOSE:
|
||||||
|
process((StopFindCoordinatorOnCloseEvent) event);
|
||||||
|
return;
|
||||||
|
|
||||||
case CREATE_FETCH_REQUESTS:
|
case CREATE_FETCH_REQUESTS:
|
||||||
process((CreateFetchRequestsEvent) event);
|
process((CreateFetchRequestsEvent) event);
|
||||||
return;
|
return;
|
||||||
|
@ -452,6 +456,13 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
||||||
future.whenComplete(complete(event.future()));
|
future.whenComplete(complete(event.future()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void process(@SuppressWarnings("unused") final StopFindCoordinatorOnCloseEvent event) {
|
||||||
|
requestManagers.coordinatorRequestManager.ifPresent(manager -> {
|
||||||
|
log.debug("Signal CoordinatorRequestManager closing");
|
||||||
|
manager.signalClose();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process event that tells the share consume request manager to fetch more records.
|
* Process event that tells the share consume request manager to fetch more records.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.clients.consumer.internals.events;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This event is raised when the consumer is closing to prevent the CoordinatorRequestManager from
|
||||||
|
* generating FindCoordinator requests. This event ensures that no new coordinator requests
|
||||||
|
* are initiated once the consumer has completed all coordinator-dependent operations and
|
||||||
|
* is in the process of shutting down.
|
||||||
|
*/
|
||||||
|
public class StopFindCoordinatorOnCloseEvent extends ApplicationEvent {
|
||||||
|
public StopFindCoordinatorOnCloseEvent() {
|
||||||
|
super(Type.STOP_FIND_COORDINATOR_ON_CLOSE);
|
||||||
|
}
|
||||||
|
}
|
|
@ -254,6 +254,21 @@ public class CoordinatorRequestManagerTest {
|
||||||
assertEquals(1, res2.unsentRequests.size());
|
assertEquals(1, res2.unsentRequests.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSignalOnClose() {
|
||||||
|
CoordinatorRequestManager coordinatorManager = setupCoordinatorManager(GROUP_ID);
|
||||||
|
expectFindCoordinatorRequest(coordinatorManager, Errors.NONE);
|
||||||
|
assertTrue(coordinatorManager.coordinator().isPresent());
|
||||||
|
coordinatorManager.markCoordinatorUnknown("coordinator changed", time.milliseconds());
|
||||||
|
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
|
||||||
|
coordinatorManager.signalClose();
|
||||||
|
time.sleep(RETRY_BACKOFF_MS - 1);
|
||||||
|
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests);
|
||||||
|
time.sleep(RETRY_BACKOFF_MS);
|
||||||
|
assertEquals(Collections.emptyList(), coordinatorManager.poll(time.milliseconds()).unsentRequests,
|
||||||
|
"Should not generate find coordinator request during close");
|
||||||
|
}
|
||||||
|
|
||||||
private void expectFindCoordinatorRequest(
|
private void expectFindCoordinatorRequest(
|
||||||
CoordinatorRequestManager coordinatorManager,
|
CoordinatorRequestManager coordinatorManager,
|
||||||
Errors error
|
Errors error
|
||||||
|
|
|
@ -413,9 +413,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
|
||||||
* close should terminate immediately without sending leave group.
|
* close should terminate immediately without sending leave group.
|
||||||
*/
|
*/
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
|
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
|
||||||
// TODO: enable for all protocols after fix for not generating/blocking on unneeded
|
|
||||||
// FindCoordinator on close for the new consumer
|
|
||||||
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
|
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
|
||||||
val topic = "closetest"
|
val topic = "closetest"
|
||||||
createTopic(topic, 10, brokerCount)
|
createTopic(topic, 10, brokerCount)
|
||||||
|
|
Loading…
Reference in New Issue