KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the… (#11340)

Title: KafkaConsumer cannot jump out of the poll method, and cpu and traffic on the broker side increase sharply
description: The local test has been passed, the problem described by jira can be solved

JIRA link : https://issues.apache.org/jira/browse/KAFKA-13310

Reviewers: Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
RivenSun 2022-02-09 15:05:42 +08:00 committed by GitHub
parent 61b0014ec9
commit 4b468a9d81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 184 additions and 37 deletions

View File

@ -189,8 +189,9 @@ public abstract class AbstractCoordinator implements Closeable {
* cleanup from the previous generation (such as committing offsets for the consumer)
* @param generation The previous generation or -1 if there was none
* @param memberId The identifier of this member in the previous group or "" if there was none
* @return true If onJoinPrepare async commit succeeded, false otherwise
*/
protected abstract void onJoinPrepare(int generation, String memberId);
protected abstract boolean onJoinPrepare(int generation, String memberId);
/**
* Perform assignment for the group. This is used by the leader to push state to all the members
@ -421,7 +422,12 @@ public abstract class AbstractCoordinator implements Closeable {
// need to set the flag before calling onJoinPrepare since the user callback may throw
// exception, in which case upon retry we should not retry onJoinPrepare either.
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
// return false when onJoinPrepare is waiting for committing offset
if (!onJoinPrepare(generation.generationId, generation.memberId)) {
needsJoinPrepare = true;
//should not initiateJoinGroup if needsJoinPrepare still is true
return false;
}
}
final RequestFuture<ByteBuffer> future = initiateJoinGroup();

View File

@ -706,10 +706,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
protected boolean onJoinPrepare(int generation, String memberId) {
log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
// commit offsets prior to rebalance if auto-commit enabled
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
boolean onJoinPrepareAsyncCommitCompleted = false;
// async commit offsets prior to rebalance if auto-commit enabled
RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
// return true when
// 1. future is null, which means no commit request sent, so it is still considered completed
// 2. offset commit completed
// 3. offset commit failed with non-retriable exception
if (future == null)
onJoinPrepareAsyncCommitCompleted = true;
else if (future.succeeded())
onJoinPrepareAsyncCommitCompleted = true;
else if (future.failed() && !future.isRetriable()) {
log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage());
onJoinPrepareAsyncCommitCompleted = true;
}
// the generation / member-id can possibly be reset by the heartbeat thread
// upon getting errors or heartbeat timeouts; in this case whatever is previously
@ -765,6 +779,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (exception != null) {
throw new KafkaException("User rebalance callback throws an error", exception);
}
return onJoinPrepareAsyncCommitCompleted;
}
@Override
@ -925,7 +940,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
maybeAutoCommitOffsetsSync(timer);
maybeAutoCommitOffsetsAsync();
while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
ensureCoordinatorReady(timer);
client.poll(timer);
@ -952,11 +967,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
RequestFuture<Void> future = null;
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
future = doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
@ -986,9 +1002,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
return future;
}
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@ -1012,6 +1029,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
});
return future;
}
/**
@ -1064,16 +1082,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
autoCommitOffsetsAsync();
}
}
}
private void doAutoCommitOffsetsAsync() {
private RequestFuture<Void> autoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
return commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
@ -1088,22 +1106,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
});
}
private void maybeAutoCommitOffsetsSync(Timer timer) {
if (autoCommitEnabled) {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
try {
log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);
if (!commitOffsetsSync(allConsumedOffsets, timer))
log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());
}
}
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
return null;
}
private class DefaultOffsetCommitCallback implements OffsetCommitCallback {

View File

@ -2182,7 +2182,7 @@ public class KafkaConsumerTest {
}
// accessing closed consumer is illegal
consumer.close(Duration.ofSeconds(5));
consumer.close(Duration.ZERO);
assertThrows(IllegalStateException.class, consumer::groupMetadata);
}
@ -2766,6 +2766,7 @@ public class KafkaConsumerTest {
@Test
public void testClosingConsumerUnregistersConsumerMetrics() {
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));

View File

@ -1632,8 +1632,9 @@ public class AbstractCoordinatorTest {
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
protected boolean onJoinPrepare(int generation, String memberId) {
onJoinPrepareInvokes++;
return true;
}
@Override

View File

@ -1171,6 +1171,132 @@ public abstract class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
}
@Test
public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) {
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
put(topic1, 1);
put(topic2, 1);
}
}));
coordinator.maybeUpdateSubscriptionMetadata();
assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), subscriptions.subscription());
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() {
{
put(topic1, 1);
}
});
// Instrument the test so that metadata will contain only one topic after next refresh.
client.prepareMetadataUpdate(deletedMetadataResponse);
client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
client.prepareResponse(body -> {
SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.data().memberId().equals(consumerId) &&
sync.data().generationId() == 1 &&
sync.groupAssignments().isEmpty();
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
// This will trigger rebalance.
coordinator.poll(time.timer(Long.MAX_VALUE));
// Make sure that the metadata was refreshed during the rebalance and thus subscriptions now contain only one topic.
assertEquals(singleton(topic1), subscriptions.subscription());
// Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger
// rebalance again.
metadata.requestUpdate();
consumerClient.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
}
}
@Test
public void testJoinPrepareWithDisableAutoCommit() {
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) {
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
int generationId = 42;
String memberId = "consumer-42";
boolean res = coordinator.onJoinPrepare(generationId, memberId);
assertTrue(res);
assertTrue(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
assertFalse(coordinator.coordinatorUnknown());
}
}
@Test
public void testJoinPrepareAndCommitCompleted() {
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) {
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
int generationId = 42;
String memberId = "consumer-42";
boolean res = coordinator.onJoinPrepare(generationId, memberId);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(res);
assertFalse(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
assertFalse(coordinator.coordinatorUnknown());
}
}
@Test
public void testJoinPrepareAndCommitWithCoordinatorNotAvailable() {
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) {
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
int generationId = 42;
String memberId = "consumer-42";
boolean res = coordinator.onJoinPrepare(generationId, memberId);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertFalse(res);
assertFalse(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
assertTrue(coordinator.coordinatorUnknown());
}
}
@Test
public void testJoinPrepareAndCommitWithUnknownMemberId() {
try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) {
coordinator.ensureActiveGroup();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
int generationId = 42;
String memberId = "consumer-42";
boolean res = coordinator.onJoinPrepare(generationId, memberId);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(res);
assertFalse(client.hasPendingResponses());
assertFalse(client.hasInFlightRequests());
assertFalse(coordinator.coordinatorUnknown());
}
}
/**
* Verifies that the consumer re-joins after a metadata change. If JoinGroup fails
* and metadata reverts to its original value, the consumer should still retry JoinGroup.
@ -3272,12 +3398,18 @@ public abstract class ConsumerCoordinatorTest {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return commitRequest.data().groupId().equals(groupId);
}, new OffsetCommitResponse(new OffsetCommitResponseData()));
if (shouldLeaveGroup)
client.prepareResponse(body -> {
leaveGroupRequested.set(true);
LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.data().groupId().equals(groupId);
}, new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())));
client.prepareResponse(body -> {
leaveGroupRequested.set(true);
LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return leaveRequest.data().groupId().equals(groupId);
}, new LeaveGroupResponse(new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())));
commitRequested.set(true);
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return commitRequest.data().groupId().equals(groupId);
}, new OffsetCommitResponse(new OffsetCommitResponseData()));
coordinator.close();
assertTrue(commitRequested.get(), "Commit not requested");

View File

@ -218,7 +218,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
protected boolean onJoinPrepare(int generation, String memberId) {
log.info("Rebalance started");
leaderState(null);
final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
@ -230,6 +230,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's "
+ "explicitly revoked.", localAssignmentSnapshot);
}
return true;
}
@Override

View File

@ -288,7 +288,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
servers.foreach(server => killBroker(server.config.brokerId))
val closeTimeout = 2000
val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, Some(requestTimeout), Some(requestTimeout))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout))
future1.get
future2.get
}