mirror of https://github.com/apache/kafka.git
KAFKA-15818: ensure leave group on max poll interval (#14873)
Currently, poll interval is not being respected during consumer#poll. When the user stops polling the consumer, we should assume either the consumer is too slow to respond or is already dead. In either case, we should let the group coordinator kick the member out of the group and reassign its partition after the rebalance timeout expires. If the consumer comes back alive, we should send a heartbeat and the member will be fenced and rejoin. (and the partitions will be revoked). This is the same behavior as the current implementation. Reviewers: Lucas Brutschy <lucasbru@apache.org>, Bruno Cadonna <cadonna@apache.org>, Lianet Magrans <lianetmr@gmail.com>
This commit is contained in:
parent
dfc456daa7
commit
5b478aebfd
|
@ -75,7 +75,7 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
* Time that the group coordinator will wait on member to revoke its partitions. This is provided by the group
|
||||
* coordinator in the heartbeat
|
||||
*/
|
||||
private final int rebalanceTimeoutMs;
|
||||
private final int maxPollIntervalMs;
|
||||
|
||||
/**
|
||||
* CoordinatorRequestManager manages the connection to the group coordinator
|
||||
|
@ -102,6 +102,12 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
*/
|
||||
private final BackgroundEventHandler backgroundEventHandler;
|
||||
|
||||
/**
|
||||
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
|
||||
* sending heartbeat until the next poll.
|
||||
*/
|
||||
private final Timer pollTimer;
|
||||
|
||||
private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
|
||||
|
||||
public HeartbeatRequestManager(
|
||||
|
@ -116,17 +122,19 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
this.logger = logContext.logger(getClass());
|
||||
this.membershipManager = membershipManager;
|
||||
this.backgroundEventHandler = backgroundEventHandler;
|
||||
this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||
this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
|
||||
this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs);
|
||||
this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs);
|
||||
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs,
|
||||
retryBackoffMaxMs, rebalanceTimeoutMs);
|
||||
retryBackoffMaxMs, maxPollIntervalMs);
|
||||
this.pollTimer = time.timer(maxPollIntervalMs);
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
HeartbeatRequestManager(
|
||||
final LogContext logContext,
|
||||
final Timer timer,
|
||||
final ConsumerConfig config,
|
||||
final CoordinatorRequestManager coordinatorRequestManager,
|
||||
final MembershipManager membershipManager,
|
||||
|
@ -134,12 +142,13 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
final HeartbeatRequestState heartbeatRequestState,
|
||||
final BackgroundEventHandler backgroundEventHandler) {
|
||||
this.logger = logContext.logger(this.getClass());
|
||||
this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||
this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
|
||||
this.coordinatorRequestManager = coordinatorRequestManager;
|
||||
this.heartbeatRequestState = heartbeatRequestState;
|
||||
this.heartbeatState = heartbeatState;
|
||||
this.membershipManager = membershipManager;
|
||||
this.backgroundEventHandler = backgroundEventHandler;
|
||||
this.pollTimer = timer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,20 +176,33 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
*/
|
||||
@Override
|
||||
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
|
||||
if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) {
|
||||
if (!coordinatorRequestManager.coordinator().isPresent() ||
|
||||
membershipManager.shouldSkipHeartbeat() ||
|
||||
pollTimer.isExpired()) {
|
||||
membershipManager.onHeartbeatRequestSkipped();
|
||||
return NetworkClientDelegate.PollResult.EMPTY;
|
||||
}
|
||||
pollTimer.update(currentTimeMs);
|
||||
if (pollTimer.isExpired()) {
|
||||
logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
|
||||
"was longer than the configured max.poll.interval.ms, which typically implies that " +
|
||||
"the poll loop is spending too much time processing messages. You can address this " +
|
||||
"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
|
||||
"returned in poll() with max.poll.records.");
|
||||
// This should trigger a heartbeat with leave group epoch
|
||||
membershipManager.transitionToStaled();
|
||||
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true);
|
||||
// We can ignore the leave response because we can join before or after receiving the response.
|
||||
heartbeatRequestState.reset();
|
||||
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
|
||||
}
|
||||
|
||||
boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
|
||||
|
||||
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
|
||||
return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
|
||||
}
|
||||
|
||||
heartbeatRequestState.onSendAttempt(currentTimeMs);
|
||||
membershipManager.onHeartbeatRequestSent();
|
||||
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
|
||||
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false);
|
||||
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
|
||||
}
|
||||
|
||||
|
@ -199,15 +221,49 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
|
||||
}
|
||||
|
||||
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
|
||||
/**
|
||||
* When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the
|
||||
* member is in the {@link MemberState#UNSUBSCRIBED} state.
|
||||
*/
|
||||
public void resetPollTimer() {
|
||||
pollTimer.reset(maxPollIntervalMs);
|
||||
}
|
||||
|
||||
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs,
|
||||
final boolean ignoreResponse) {
|
||||
NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
|
||||
heartbeatRequestState.onSendAttempt(currentTimeMs);
|
||||
membershipManager.onHeartbeatRequestSent();
|
||||
return request;
|
||||
}
|
||||
|
||||
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) {
|
||||
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
|
||||
new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
|
||||
coordinatorRequestManager.coordinator());
|
||||
if (ignoreResponse)
|
||||
return logResponse(request);
|
||||
else
|
||||
return request.whenComplete((response, exception) -> {
|
||||
if (response != null) {
|
||||
onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
|
||||
} else {
|
||||
onFailure(exception, request.handler().completionTimeMs());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDelegate.UnsentRequest request) {
|
||||
return request.whenComplete((response, exception) -> {
|
||||
if (response != null) {
|
||||
onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
|
||||
Errors error =
|
||||
Errors.forCode(((ConsumerGroupHeartbeatResponse) response.responseBody()).data().errorCode());
|
||||
if (error == Errors.NONE)
|
||||
logger.debug("GroupHeartbeat responded successfully: {}", response);
|
||||
else
|
||||
logger.error("GroupHeartbeat failed because of {}: {}", error, response);
|
||||
} else {
|
||||
onFailure(exception, request.handler().completionTimeMs());
|
||||
logger.error("GroupHeartbeat failed because of unexpected exception.", exception);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -229,10 +285,10 @@ public class HeartbeatRequestManager implements RequestManager {
|
|||
|
||||
private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) {
|
||||
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
|
||||
this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
|
||||
this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
|
||||
this.heartbeatRequestState.resetTimer();
|
||||
this.membershipManager.onHeartbeatResponseReceived(response.data());
|
||||
heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
|
||||
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
|
||||
heartbeatRequestState.resetTimer();
|
||||
membershipManager.onHeartbeatResponseReceived(response.data());
|
||||
maybeSendGroupMetadataUpdateEvent();
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -99,7 +100,14 @@ public enum MemberState {
|
|||
* unrecoverable state where the member won't send any requests to the broker and cannot
|
||||
* perform any other transition.
|
||||
*/
|
||||
FATAL;
|
||||
FATAL,
|
||||
|
||||
/**
|
||||
* An intermediate state indicating the consumer is staled because the user has not polled the consumer
|
||||
* within the <code>max.poll.interval.ms</code> time bound; therefore causing the member to leave the
|
||||
* group. The member rejoins on the next poll.
|
||||
*/
|
||||
STALED;
|
||||
|
||||
// Valid state transitions
|
||||
static {
|
||||
|
@ -116,7 +124,7 @@ public enum MemberState {
|
|||
FENCED.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING,
|
||||
PREPARE_LEAVING, LEAVING);
|
||||
|
||||
JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED);
|
||||
JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED, STALED);
|
||||
|
||||
PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING,
|
||||
ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
|
||||
|
@ -124,6 +132,8 @@ public enum MemberState {
|
|||
LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
|
||||
|
||||
UNSUBSCRIBED.previousValidStates = Arrays.asList(LEAVING);
|
||||
|
||||
STALED.previousValidStates = Arrays.asList(JOINING, RECONCILING, ACKNOWLEDGING, STABLE);
|
||||
}
|
||||
|
||||
private List<MemberState> previousValidStates;
|
||||
|
|
|
@ -61,6 +61,11 @@ public interface MembershipManager {
|
|||
*/
|
||||
MemberState state();
|
||||
|
||||
/**
|
||||
* @return True if the member is staled due to expired poll timer.
|
||||
*/
|
||||
boolean isStaled();
|
||||
|
||||
/**
|
||||
* Update member info and transition member state based on a successful heartbeat response.
|
||||
*
|
||||
|
@ -139,4 +144,15 @@ public interface MembershipManager {
|
|||
* Note that list of topics of the subscription is taken from the shared subscription state.
|
||||
*/
|
||||
void onSubscriptionUpdated();
|
||||
|
||||
/**
|
||||
* Transition to the {@link MemberState#JOINING} state to attempt joining a group.
|
||||
*/
|
||||
void transitionToJoining();
|
||||
|
||||
/**
|
||||
* When the user stops polling the consumer and the <code>max.poll.interval.ms</code> timer expires, we transition
|
||||
* the member to STALE.
|
||||
*/
|
||||
void transitionToStaled();
|
||||
}
|
||||
|
|
|
@ -228,7 +228,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
* (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
|
||||
* member is not leaving.
|
||||
*/
|
||||
private Optional<CompletableFuture<Void>> leaveGroupInProgress;
|
||||
private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
|
||||
|
||||
/**
|
||||
* True if the member has registered to be notified when the cluster metadata is updated.
|
||||
|
@ -315,6 +315,11 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
return memberEpoch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStaled() {
|
||||
return state == MemberState.STALED;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
@ -481,7 +486,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
* the user calls the subscribe API, or when the member wants to rejoin after getting fenced.
|
||||
* Visible for testing.
|
||||
*/
|
||||
void transitionToJoining() {
|
||||
@Override
|
||||
public void transitionToJoining() {
|
||||
if (state == MemberState.FATAL) {
|
||||
log.warn("No action taken to join the group with the updated subscription because " +
|
||||
"the member is in FATAL state");
|
||||
|
@ -601,7 +607,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
@Override
|
||||
public boolean shouldHeartbeatNow() {
|
||||
MemberState state = state();
|
||||
return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING;
|
||||
return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -610,6 +616,12 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
@Override
|
||||
public void onHeartbeatRequestSent() {
|
||||
MemberState state = state();
|
||||
if (isStaled()) {
|
||||
log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch);
|
||||
transitionToJoining();
|
||||
return;
|
||||
}
|
||||
|
||||
if (state == MemberState.ACKNOWLEDGING) {
|
||||
if (allPendingAssignmentsReconciled()) {
|
||||
transitionTo(MemberState.STABLE);
|
||||
|
@ -655,6 +667,17 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
|
|||
return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the epoch to the leave group epoch and clears the assignments. The member will rejoin with
|
||||
* the existing subscriptions on the next time user polls.
|
||||
*/
|
||||
@Override
|
||||
public void transitionToStaled() {
|
||||
memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
||||
currentAssignment.clear();
|
||||
transitionTo(MemberState.STALED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile the assignment that has been received from the server and for which topic names
|
||||
* are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed,
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier;
|
|||
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
||||
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.MembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
@ -128,8 +129,8 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent>
|
|||
return;
|
||||
}
|
||||
|
||||
CommitRequestManager manager = requestManagers.commitRequestManager.get();
|
||||
manager.updateAutoCommitTimer(event.pollTimeMs());
|
||||
requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs()));
|
||||
requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer);
|
||||
}
|
||||
|
||||
private void process(final CommitApplicationEvent event) {
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
|||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
|
@ -80,6 +81,7 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
final FetchConfig fetchConfig;
|
||||
final FetchBuffer fetchBuffer;
|
||||
final Metrics metrics;
|
||||
final Timer pollTimer;
|
||||
final FetchMetricsManager metricsManager;
|
||||
final NetworkClientDelegate networkClientDelegate;
|
||||
final OffsetsRequestManager offsetsRequestManager;
|
||||
|
@ -148,6 +150,7 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
this.subscriptions = spy(createSubscriptionState(config, logContext));
|
||||
this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners()));
|
||||
this.metricsManager = createFetchMetricsManager(metrics);
|
||||
this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs);
|
||||
|
||||
this.client = new MockClient(time, metadata);
|
||||
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
|
||||
|
@ -219,6 +222,7 @@ public class ConsumerTestBuilder implements Closeable {
|
|||
gi.heartbeatJitterMs));
|
||||
HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager(
|
||||
logContext,
|
||||
pollTimer,
|
||||
config,
|
||||
coordinator,
|
||||
mm,
|
||||
|
|
|
@ -17,8 +17,9 @@
|
|||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.clients.ClientResponse;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.Metadata;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
@ -32,7 +33,10 @@ import org.apache.kafka.common.protocol.Errors;
|
|||
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
|
||||
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
|
||||
import org.apache.kafka.common.requests.RequestHeader;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -48,28 +52,37 @@ import java.util.Collections;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
|
||||
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class HeartbeatRequestManagerTest {
|
||||
private long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
|
||||
private int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
|
||||
private int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
|
||||
private long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
|
||||
private static final String DEFAULT_GROUP_ID = "groupId";
|
||||
|
||||
private ConsumerTestBuilder testBuilder;
|
||||
private Time time;
|
||||
private Timer pollTimer;
|
||||
private ConsumerConfig config;
|
||||
private CoordinatorRequestManager coordinatorRequestManager;
|
||||
private SubscriptionState subscriptions;
|
||||
private Metadata metadata;
|
||||
|
@ -99,6 +112,7 @@ public class HeartbeatRequestManagerTest {
|
|||
subscriptions = testBuilder.subscriptions;
|
||||
membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
|
||||
metadata = testBuilder.metadata;
|
||||
config = testBuilder.config;
|
||||
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
|
||||
}
|
||||
|
@ -509,6 +523,41 @@ public class HeartbeatRequestManagerTest {
|
|||
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPollTimerExpiration() {
|
||||
heartbeatRequestManager = createHeartbeatRequestManager();
|
||||
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
|
||||
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
|
||||
|
||||
time.sleep(maxPollIntervalMs);
|
||||
assertLeaveGroup(heartbeatRequestManager);
|
||||
verify(membershipManager).transitionToStaled();
|
||||
|
||||
assertNoHeartbeat(heartbeatRequestManager);
|
||||
heartbeatRequestManager.resetPollTimer();
|
||||
assertTrue(pollTimer.notExpired());
|
||||
assertHeartbeat(heartbeatRequestManager);
|
||||
}
|
||||
|
||||
private void assertHeartbeat(HeartbeatRequestManager hrm) {
|
||||
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
|
||||
assertEquals(1, pollResult.unsentRequests.size());
|
||||
assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs);
|
||||
pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
|
||||
Errors.NONE));
|
||||
}
|
||||
|
||||
private void assertNoHeartbeat(HeartbeatRequestManager hrm) {
|
||||
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
|
||||
assertEquals(0, pollResult.unsentRequests.size());
|
||||
}
|
||||
|
||||
private NetworkClientDelegate.PollResult assertLeaveGroup(HeartbeatRequestManager hrm) {
|
||||
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
|
||||
assertEquals(1, pollResult.unsentRequests.size());
|
||||
return pollResult;
|
||||
}
|
||||
|
||||
private void mockStableMember() {
|
||||
membershipManager.onSubscriptionUpdated();
|
||||
// Heartbeat response without assignment to set the state to STABLE.
|
||||
|
@ -591,4 +640,43 @@ public class HeartbeatRequestManagerTest {
|
|||
null,
|
||||
response);
|
||||
}
|
||||
|
||||
private ConsumerConfig config() {
|
||||
Properties prop = new Properties();
|
||||
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||
|
||||
prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs));
|
||||
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
|
||||
prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(retryBackoffMaxMs));
|
||||
prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs));
|
||||
return new ConsumerConfig(prop);
|
||||
}
|
||||
|
||||
private HeartbeatRequestManager createHeartbeatRequestManager() {
|
||||
LogContext logContext = new LogContext();
|
||||
coordinatorRequestManager = mock(CoordinatorRequestManager.class);
|
||||
subscriptions = mock(SubscriptionState.class);
|
||||
membershipManager = mock(MembershipManager.class);
|
||||
backgroundEventHandler = mock(BackgroundEventHandler.class);
|
||||
heartbeatState = new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs);
|
||||
heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(
|
||||
logContext,
|
||||
time,
|
||||
heartbeatIntervalMs,
|
||||
retryBackoffMs,
|
||||
retryBackoffMaxMs,
|
||||
0);
|
||||
pollTimer = time.timer(maxPollIntervalMs);
|
||||
return new HeartbeatRequestManager(
|
||||
logContext,
|
||||
pollTimer,
|
||||
config(),
|
||||
coordinatorRequestManager,
|
||||
membershipManager,
|
||||
heartbeatState,
|
||||
heartbeatRequestState,
|
||||
backgroundEventHandler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.TreeSet;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
|
@ -1044,6 +1045,22 @@ public class MembershipManagerImplTest {
|
|||
assertEquals(0, rebalanceListener.revokedCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionToStaled() {
|
||||
MembershipManager membershipManager = memberJoinWithAssignment("topic", Uuid.randomUuid());
|
||||
membershipManager.transitionToStaled();
|
||||
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
|
||||
assertTrue(membershipManager.currentAssignment().isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeartbeatSentOnStaledMember() {
|
||||
MembershipManagerImpl membershipManager = createMemberInStableState();
|
||||
membershipManager.transitionToStaled();
|
||||
membershipManager.onHeartbeatRequestSent();
|
||||
assertEquals(MemberState.JOINING, membershipManager.state());
|
||||
}
|
||||
|
||||
private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment(
|
||||
Uuid topicId, String topicName, List<Integer> partitions) {
|
||||
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
|
||||
|
@ -1383,4 +1400,14 @@ public class MembershipManagerImplTest {
|
|||
.setPartitions(Arrays.asList(3, 4, 5))
|
||||
));
|
||||
}
|
||||
|
||||
private MembershipManager memberJoinWithAssignment(String topicName, Uuid topicId) {
|
||||
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(true);
|
||||
membershipManager.onHeartbeatRequestSent();
|
||||
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
|
||||
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
|
||||
membershipManager.onHeartbeatRequestSent();
|
||||
assertFalse(membershipManager.currentAssignment().isEmpty());
|
||||
return membershipManager;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -948,6 +948,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
|
|||
// 1 consumer using range assignment
|
||||
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "range-group")
|
||||
this.consumerConfig.setProperty(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")
|
||||
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000")
|
||||
val consumer = createConsumer()
|
||||
|
||||
// create two new topics, each having 2 partitions
|
||||
|
|
Loading…
Reference in New Issue