diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 3b099eb0766..0fb0346d5f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -75,7 +75,7 @@ public class HeartbeatRequestManager implements RequestManager {
* Time that the group coordinator will wait on member to revoke its partitions. This is provided by the group
* coordinator in the heartbeat
*/
- private final int rebalanceTimeoutMs;
+ private final int maxPollIntervalMs;
/**
* CoordinatorRequestManager manages the connection to the group coordinator
@@ -102,6 +102,12 @@ public class HeartbeatRequestManager implements RequestManager {
*/
private final BackgroundEventHandler backgroundEventHandler;
+ /**
+ * Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop
+ * sending heartbeat until the next poll.
+ */
+ private final Timer pollTimer;
+
private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
public HeartbeatRequestManager(
@@ -116,17 +122,19 @@ public class HeartbeatRequestManager implements RequestManager {
this.logger = logContext.logger(getClass());
this.membershipManager = membershipManager;
this.backgroundEventHandler = backgroundEventHandler;
- this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+ this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
- this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, rebalanceTimeoutMs);
+ this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs);
this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0, retryBackoffMs,
- retryBackoffMaxMs, rebalanceTimeoutMs);
+ retryBackoffMaxMs, maxPollIntervalMs);
+ this.pollTimer = time.timer(maxPollIntervalMs);
}
// Visible for testing
HeartbeatRequestManager(
final LogContext logContext,
+ final Timer timer,
final ConsumerConfig config,
final CoordinatorRequestManager coordinatorRequestManager,
final MembershipManager membershipManager,
@@ -134,12 +142,13 @@ public class HeartbeatRequestManager implements RequestManager {
final HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler) {
this.logger = logContext.logger(this.getClass());
- this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+ this.maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
this.coordinatorRequestManager = coordinatorRequestManager;
this.heartbeatRequestState = heartbeatRequestState;
this.heartbeatState = heartbeatState;
this.membershipManager = membershipManager;
this.backgroundEventHandler = backgroundEventHandler;
+ this.pollTimer = timer;
}
/**
@@ -167,20 +176,33 @@ public class HeartbeatRequestManager implements RequestManager {
*/
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
- if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) {
+ if (!coordinatorRequestManager.coordinator().isPresent() ||
+ membershipManager.shouldSkipHeartbeat() ||
+ pollTimer.isExpired()) {
membershipManager.onHeartbeatRequestSkipped();
return NetworkClientDelegate.PollResult.EMPTY;
}
+ pollTimer.update(currentTimeMs);
+ if (pollTimer.isExpired()) {
+ logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " +
+ "was longer than the configured max.poll.interval.ms, which typically implies that " +
+ "the poll loop is spending too much time processing messages. You can address this " +
+ "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " +
+ "returned in poll() with max.poll.records.");
+ // This should trigger a heartbeat with leave group epoch
+ membershipManager.transitionToStaled();
+ NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, true);
+ // We can ignore the leave response because we can join before or after receiving the response.
+ heartbeatRequestState.reset();
+ return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
+ }
boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
-
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
}
- heartbeatRequestState.onSendAttempt(currentTimeMs);
- membershipManager.onHeartbeatRequestSent();
- NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest();
+ NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false);
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
}
@@ -199,15 +221,49 @@ public class HeartbeatRequestManager implements RequestManager {
return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
}
- private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() {
+ /**
+ * When consumer polls, we need to reset the pollTimer. If the poll timer has expired, we rejoin only when the
+ * member is in the {@link MemberState#UNSUBSCRIBED} state.
+ */
+ public void resetPollTimer() {
+ pollTimer.reset(maxPollIntervalMs);
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs,
+ final boolean ignoreResponse) {
+ NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse);
+ heartbeatRequestState.onSendAttempt(currentTimeMs);
+ membershipManager.onHeartbeatRequestSent();
+ return request;
+ }
+
+ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) {
NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(
new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
coordinatorRequestManager.coordinator());
+ if (ignoreResponse)
+ return logResponse(request);
+ else
+ return request.whenComplete((response, exception) -> {
+ if (response != null) {
+ onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
+ } else {
+ onFailure(exception, request.handler().completionTimeMs());
+ }
+ });
+ }
+
+ private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDelegate.UnsentRequest request) {
return request.whenComplete((response, exception) -> {
if (response != null) {
- onResponse((ConsumerGroupHeartbeatResponse) response.responseBody(), request.handler().completionTimeMs());
+ Errors error =
+ Errors.forCode(((ConsumerGroupHeartbeatResponse) response.responseBody()).data().errorCode());
+ if (error == Errors.NONE)
+ logger.debug("GroupHeartbeat responded successfully: {}", response);
+ else
+ logger.error("GroupHeartbeat failed because of {}: {}", error, response);
} else {
- onFailure(exception, request.handler().completionTimeMs());
+ logger.error("GroupHeartbeat failed because of unexpected exception.", exception);
}
});
}
@@ -229,10 +285,10 @@ public class HeartbeatRequestManager implements RequestManager {
private void onResponse(final ConsumerGroupHeartbeatResponse response, long currentTimeMs) {
if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
- this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
- this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
- this.heartbeatRequestState.resetTimer();
- this.membershipManager.onHeartbeatResponseReceived(response.data());
+ heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
+ heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
+ heartbeatRequestState.resetTimer();
+ membershipManager.onHeartbeatResponseReceived(response.data());
maybeSendGroupMetadataUpdateEvent();
return;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
index 088d1ccd346..585440bafca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java
@@ -18,6 +18,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.protocol.Errors;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -99,7 +100,14 @@ public enum MemberState {
* unrecoverable state where the member won't send any requests to the broker and cannot
* perform any other transition.
*/
- FATAL;
+ FATAL,
+
+ /**
+ * An intermediate state indicating the consumer is staled because the user has not polled the consumer
+ * within the max.poll.interval.ms
time bound; therefore causing the member to leave the
+ * group. The member rejoins on the next poll.
+ */
+ STALED;
// Valid state transitions
static {
@@ -116,7 +124,7 @@ public enum MemberState {
FENCED.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING,
PREPARE_LEAVING, LEAVING);
- JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED);
+ JOINING.previousValidStates = Arrays.asList(FENCED, UNSUBSCRIBED, STALED);
PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING,
ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
@@ -124,6 +132,8 @@ public enum MemberState {
LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
UNSUBSCRIBED.previousValidStates = Arrays.asList(LEAVING);
+
+ STALED.previousValidStates = Arrays.asList(JOINING, RECONCILING, ACKNOWLEDGING, STABLE);
}
private List previousValidStates;
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
index b635eb3a0fd..78127ecb206 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java
@@ -61,6 +61,11 @@ public interface MembershipManager {
*/
MemberState state();
+ /**
+ * @return True if the member is staled due to expired poll timer.
+ */
+ boolean isStaled();
+
/**
* Update member info and transition member state based on a successful heartbeat response.
*
@@ -139,4 +144,15 @@ public interface MembershipManager {
* Note that list of topics of the subscription is taken from the shared subscription state.
*/
void onSubscriptionUpdated();
+
+ /**
+ * Transition to the {@link MemberState#JOINING} state to attempt joining a group.
+ */
+ void transitionToJoining();
+
+ /**
+ * When the user stops polling the consumer and the max.poll.interval.ms
timer expires, we transition
+ * the member to STALE.
+ */
+ void transitionToStaled();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 317126801ec..dfb339680c6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -228,7 +228,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
* (callbacks executed and heartbeat request to leave is sent out). This will be empty is the
* member is not leaving.
*/
- private Optional> leaveGroupInProgress;
+ private Optional> leaveGroupInProgress = Optional.empty();
/**
* True if the member has registered to be notified when the cluster metadata is updated.
@@ -315,6 +315,11 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
return memberEpoch;
}
+ @Override
+ public boolean isStaled() {
+ return state == MemberState.STALED;
+ }
+
/**
* {@inheritDoc}
*/
@@ -481,7 +486,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
* the user calls the subscribe API, or when the member wants to rejoin after getting fenced.
* Visible for testing.
*/
- void transitionToJoining() {
+ @Override
+ public void transitionToJoining() {
if (state == MemberState.FATAL) {
log.warn("No action taken to join the group with the updated subscription because " +
"the member is in FATAL state");
@@ -601,7 +607,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
@Override
public boolean shouldHeartbeatNow() {
MemberState state = state();
- return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING;
+ return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING || state == MemberState.JOINING;
}
/**
@@ -610,6 +616,12 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
@Override
public void onHeartbeatRequestSent() {
MemberState state = state();
+ if (isStaled()) {
+ log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch);
+ transitionToJoining();
+ return;
+ }
+
if (state == MemberState.ACKNOWLEDGING) {
if (allPendingAssignmentsReconciled()) {
transitionTo(MemberState.STABLE);
@@ -655,6 +667,17 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
}
+ /**
+ * Sets the epoch to the leave group epoch and clears the assignments. The member will rejoin with
+ * the existing subscriptions on the next time user polls.
+ */
+ @Override
+ public void transitionToStaled() {
+ memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+ currentAssignment.clear();
+ transitionTo(MemberState.STALED);
+ }
+
/**
* Reconcile the assignment that has been received from the server and for which topic names
* are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 33233561a34..9c0bcdeefbd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.common.KafkaException;
@@ -128,8 +129,8 @@ public class ApplicationEventProcessor extends EventProcessor
return;
}
- CommitRequestManager manager = requestManagers.commitRequestManager.get();
- manager.updateAutoCommitTimer(event.pollTimeMs());
+ requestManagers.commitRequestManager.ifPresent(m -> m.updateAutoCommitTimer(event.pollTimeMs()));
+ requestManagers.heartbeatRequestManager.ifPresent(HeartbeatRequestManager::resetPollTimer);
}
private void process(final CommitApplicationEvent event) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 28a68e9f751..ba0410c40b6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import java.io.Closeable;
import java.time.Duration;
@@ -80,6 +81,7 @@ public class ConsumerTestBuilder implements Closeable {
final FetchConfig fetchConfig;
final FetchBuffer fetchBuffer;
final Metrics metrics;
+ final Timer pollTimer;
final FetchMetricsManager metricsManager;
final NetworkClientDelegate networkClientDelegate;
final OffsetsRequestManager offsetsRequestManager;
@@ -148,6 +150,7 @@ public class ConsumerTestBuilder implements Closeable {
this.subscriptions = spy(createSubscriptionState(config, logContext));
this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners()));
this.metricsManager = createFetchMetricsManager(metrics);
+ this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs);
this.client = new MockClient(time, metadata);
MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() {
@@ -219,6 +222,7 @@ public class ConsumerTestBuilder implements Closeable {
gi.heartbeatJitterMs));
HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager(
logContext,
+ pollTimer,
config,
coordinator,
mm,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 0496e90b2b4..2d64c9ee2e7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -17,8 +17,9 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.common.KafkaException;
@@ -32,7 +33,10 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -48,28 +52,37 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.BlockingQueue;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR;
+import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class HeartbeatRequestManagerTest {
+ private long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
+ private int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
+ private int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
+ private long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
+ private static final String DEFAULT_GROUP_ID = "groupId";
private ConsumerTestBuilder testBuilder;
private Time time;
+ private Timer pollTimer;
+ private ConsumerConfig config;
private CoordinatorRequestManager coordinatorRequestManager;
private SubscriptionState subscriptions;
private Metadata metadata;
@@ -99,6 +112,7 @@ public class HeartbeatRequestManagerTest {
subscriptions = testBuilder.subscriptions;
membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
metadata = testBuilder.metadata;
+ config = testBuilder.config;
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
}
@@ -509,6 +523,41 @@ public class HeartbeatRequestManagerTest {
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
}
+ @Test
+ public void testPollTimerExpiration() {
+ heartbeatRequestManager = createHeartbeatRequestManager();
+ when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
+ when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
+
+ time.sleep(maxPollIntervalMs);
+ assertLeaveGroup(heartbeatRequestManager);
+ verify(membershipManager).transitionToStaled();
+
+ assertNoHeartbeat(heartbeatRequestManager);
+ heartbeatRequestManager.resetPollTimer();
+ assertTrue(pollTimer.notExpired());
+ assertHeartbeat(heartbeatRequestManager);
+ }
+
+ private void assertHeartbeat(HeartbeatRequestManager hrm) {
+ NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
+ assertEquals(1, pollResult.unsentRequests.size());
+ assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, pollResult.timeUntilNextPollMs);
+ pollResult.unsentRequests.get(0).handler().onComplete(createHeartbeatResponse(pollResult.unsentRequests.get(0),
+ Errors.NONE));
+ }
+
+ private void assertNoHeartbeat(HeartbeatRequestManager hrm) {
+ NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
+ assertEquals(0, pollResult.unsentRequests.size());
+ }
+
+ private NetworkClientDelegate.PollResult assertLeaveGroup(HeartbeatRequestManager hrm) {
+ NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
+ assertEquals(1, pollResult.unsentRequests.size());
+ return pollResult;
+ }
+
private void mockStableMember() {
membershipManager.onSubscriptionUpdated();
// Heartbeat response without assignment to set the state to STABLE.
@@ -591,4 +640,43 @@ public class HeartbeatRequestManagerTest {
null,
response);
}
+
+ private ConsumerConfig config() {
+ Properties prop = new Properties();
+ prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+
+ prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(retryBackoffMaxMs));
+ prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs));
+ return new ConsumerConfig(prop);
+ }
+
+ private HeartbeatRequestManager createHeartbeatRequestManager() {
+ LogContext logContext = new LogContext();
+ coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ subscriptions = mock(SubscriptionState.class);
+ membershipManager = mock(MembershipManager.class);
+ backgroundEventHandler = mock(BackgroundEventHandler.class);
+ heartbeatState = new HeartbeatRequestManager.HeartbeatState(subscriptions, membershipManager, maxPollIntervalMs);
+ heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(
+ logContext,
+ time,
+ heartbeatIntervalMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ 0);
+ pollTimer = time.timer(maxPollIntervalMs);
+ return new HeartbeatRequestManager(
+ logContext,
+ pollTimer,
+ config(),
+ coordinatorRequestManager,
+ membershipManager,
+ heartbeatState,
+ heartbeatRequestState,
+ backgroundEventHandler);
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 30b1c1aea26..9f03b130dca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -42,6 +42,7 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -1044,6 +1045,22 @@ public class MembershipManagerImplTest {
assertEquals(0, rebalanceListener.revokedCount);
}
+ @Test
+ public void testTransitionToStaled() {
+ MembershipManager membershipManager = memberJoinWithAssignment("topic", Uuid.randomUuid());
+ membershipManager.transitionToStaled();
+ assertEquals(LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
+ assertTrue(membershipManager.currentAssignment().isEmpty());
+ }
+
+ @Test
+ public void testHeartbeatSentOnStaledMember() {
+ MembershipManagerImpl membershipManager = createMemberInStableState();
+ membershipManager.transitionToStaled();
+ membershipManager.onHeartbeatRequestSent();
+ assertEquals(MemberState.JOINING, membershipManager.state());
+ }
+
private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment(
Uuid topicId, String topicName, List partitions) {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
@@ -1383,4 +1400,14 @@ public class MembershipManagerImplTest {
.setPartitions(Arrays.asList(3, 4, 5))
));
}
+
+ private MembershipManager memberJoinWithAssignment(String topicName, Uuid topicId) {
+ MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(true);
+ membershipManager.onHeartbeatRequestSent();
+ when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName));
+ receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
+ membershipManager.onHeartbeatRequestSent();
+ assertFalse(membershipManager.currentAssignment().isEmpty());
+ return membershipManager;
+ }
}
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 5631c27721d..b7768097b07 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -948,6 +948,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// 1 consumer using range assignment
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "range-group")
this.consumerConfig.setProperty(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, "range")
+ this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000")
val consumer = createConsumer()
// create two new topics, each having 2 partitions