diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index e15ee1fc66b..72a03f812ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -189,8 +189,9 @@ public abstract class AbstractCoordinator implements Closeable { * cleanup from the previous generation (such as committing offsets for the consumer) * @param generation The previous generation or -1 if there was none * @param memberId The identifier of this member in the previous group or "" if there was none + * @return true If onJoinPrepare async commit succeeded, false otherwise */ - protected abstract void onJoinPrepare(int generation, String memberId); + protected abstract boolean onJoinPrepare(int generation, String memberId); /** * Perform assignment for the group. This is used by the leader to push state to all the members @@ -421,7 +422,12 @@ public abstract class AbstractCoordinator implements Closeable { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. needsJoinPrepare = false; - onJoinPrepare(generation.generationId, generation.memberId); + // return false when onJoinPrepare is waiting for committing offset + if (!onJoinPrepare(generation.generationId, generation.memberId)) { + needsJoinPrepare = true; + //should not initiateJoinGroup if needsJoinPrepare still is true + return false; + } } final RequestFuture future = initiateJoinGroup(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 354818a270b..533f3cb40f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -706,10 +706,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)); + boolean onJoinPrepareAsyncCommitCompleted = false; + // async commit offsets prior to rebalance if auto-commit enabled + RequestFuture future = maybeAutoCommitOffsetsAsync(); + // return true when + // 1. future is null, which means no commit request sent, so it is still considered completed + // 2. offset commit completed + // 3. offset commit failed with non-retriable exception + if (future == null) + onJoinPrepareAsyncCommitCompleted = true; + else if (future.succeeded()) + onJoinPrepareAsyncCommitCompleted = true; + else if (future.failed() && !future.isRetriable()) { + log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); + onJoinPrepareAsyncCommitCompleted = true; + } + // the generation / member-id can possibly be reset by the heartbeat thread // upon getting errors or heartbeat timeouts; in this case whatever is previously @@ -765,6 +779,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (exception != null) { throw new KafkaException("User rebalance callback throws an error", exception); } + return onJoinPrepareAsyncCommitCompleted; } @Override @@ -925,7 +940,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // we do not need to re-enable wakeups since we are closing already client.disableWakeups(); try { - maybeAutoCommitOffsetsSync(timer); + maybeAutoCommitOffsetsAsync(); while (pendingAsyncCommits.get() > 0 && timer.notExpired()) { ensureCoordinatorReady(timer); client.poll(timer); @@ -952,11 +967,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - public void commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { + public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); + RequestFuture future = null; if (!coordinatorUnknown()) { - doCommitOffsetsAsync(offsets, callback); + future = doCommitOffsetsAsync(offsets, callback); } else { // we don't know the current coordinator, so try to find it and then send the commit // or fail (we don't want recursive retries which can cause offset commits to arrive @@ -986,9 +1002,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // Note that commits are treated as heartbeats by the coordinator, so there is no need to // explicitly allow heartbeats through delayed task execution. client.pollNoWakeup(); + return future; } - private void doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { + private RequestFuture doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { RequestFuture future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener() { @@ -1012,6 +1029,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } }); + return future; } /** @@ -1064,16 +1082,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator { nextAutoCommitTimer.update(now); if (nextAutoCommitTimer.isExpired()) { nextAutoCommitTimer.reset(autoCommitIntervalMs); - doAutoCommitOffsetsAsync(); + autoCommitOffsetsAsync(); } } } - private void doAutoCommitOffsetsAsync() { + private RequestFuture autoCommitOffsetsAsync() { Map allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); - commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> { + return commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> { if (exception != null) { if (exception instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets, @@ -1088,22 +1106,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { }); } - private void maybeAutoCommitOffsetsSync(Timer timer) { - if (autoCommitEnabled) { - Map allConsumedOffsets = subscriptions.allConsumed(); - try { - log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets); - if (!commitOffsetsSync(allConsumedOffsets, timer)) - log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets); - } catch (WakeupException | InterruptException e) { - log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets); - // rethrow wakeups since they are triggered by the user - throw e; - } catch (Exception e) { - // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage()); - } - } + private RequestFuture maybeAutoCommitOffsetsAsync() { + if (autoCommitEnabled) + return autoCommitOffsetsAsync(); + return null; } private class DefaultOffsetCommitCallback implements OffsetCommitCallback { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f8b86b83cd9..27c108bcdac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -2182,7 +2182,7 @@ public class KafkaConsumerTest { } // accessing closed consumer is illegal - consumer.close(Duration.ofSeconds(5)); + consumer.close(Duration.ZERO); assertThrows(IllegalStateException.class, consumer::groupMetadata); } @@ -2766,6 +2766,7 @@ public class KafkaConsumerTest { @Test public void testClosingConsumerUnregistersConsumerMetrics() { + Time time = new MockTime(1L); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 0ed6273082b..b54e44eef67 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -1632,8 +1632,9 @@ public class AbstractCoordinatorTest { } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { onJoinPrepareInvokes++; + return true; } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 1962c94820f..904d770f704 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1171,6 +1171,132 @@ public abstract class ConsumerCoordinatorTest { assertFalse(coordinator.rejoinNeededOrPending()); } + @Test + public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { + try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)) { + subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); + client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + put(topic2, 1); + } + })); + coordinator.maybeUpdateSubscriptionMetadata(); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), subscriptions.subscription()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + MetadataResponse deletedMetadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { + { + put(topic1, 1); + } + }); + // Instrument the test so that metadata will contain only one topic after next refresh. + client.prepareMetadataUpdate(deletedMetadataResponse); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); + client.prepareResponse(body -> { + SyncGroupRequest sync = (SyncGroupRequest) body; + return sync.data().memberId().equals(consumerId) && + sync.data().generationId() == 1 && + sync.groupAssignments().isEmpty(); + }, syncGroupResponse(singletonList(t1p), Errors.NONE)); + + partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p))); + + // This will trigger rebalance. + coordinator.poll(time.timer(Long.MAX_VALUE)); + + // Make sure that the metadata was refreshed during the rebalance and thus subscriptions now contain only one topic. + assertEquals(singleton(topic1), subscriptions.subscription()); + + // Refresh the metadata again. Since there have been no changes since the last refresh, it won't trigger + // rebalance again. + metadata.requestUpdate(); + consumerClient.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.rejoinNeededOrPending()); + } + } + + @Test + public void testJoinPrepareWithDisableAutoCommit() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + + assertTrue(res); + assertTrue(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitCompleted() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertTrue(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitWithCoordinatorNotAvailable() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertFalse(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertTrue(coordinator.coordinatorUnknown()); + } + } + + @Test + public void testJoinPrepareAndCommitWithUnknownMemberId() { + try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"))) { + coordinator.ensureActiveGroup(); + + prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID); + + int generationId = 42; + String memberId = "consumer-42"; + + boolean res = coordinator.onJoinPrepare(generationId, memberId); + coordinator.invokeCompletedOffsetCommitCallbacks(); + + assertTrue(res); + assertFalse(client.hasPendingResponses()); + assertFalse(client.hasInFlightRequests()); + assertFalse(coordinator.coordinatorUnknown()); + } + } + /** * Verifies that the consumer re-joins after a metadata change. If JoinGroup fails * and metadata reverts to its original value, the consumer should still retry JoinGroup. @@ -3272,12 +3398,18 @@ public abstract class ConsumerCoordinatorTest { OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.data().groupId().equals(groupId); }, new OffsetCommitResponse(new OffsetCommitResponseData())); + if (shouldLeaveGroup) + client.prepareResponse(body -> { + leaveGroupRequested.set(true); + LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; + return leaveRequest.data().groupId().equals(groupId); + }, new LeaveGroupResponse(new LeaveGroupResponseData() + .setErrorCode(Errors.NONE.code()))); client.prepareResponse(body -> { - leaveGroupRequested.set(true); - LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body; - return leaveRequest.data().groupId().equals(groupId); - }, new LeaveGroupResponse(new LeaveGroupResponseData() - .setErrorCode(Errors.NONE.code()))); + commitRequested.set(true); + OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; + return commitRequest.data().groupId().equals(groupId); + }, new OffsetCommitResponse(new OffsetCommitResponseData())); coordinator.close(); assertTrue(commitRequested.get(), "Commit not requested"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 425213fd94b..1d85b28605d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -218,7 +218,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable } @Override - protected void onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(int generation, String memberId) { log.info("Rebalance started"); leaderState(null); final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot; @@ -230,6 +230,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's " + "explicitly revoked.", localAssignmentSnapshot); } + return true; } @Override diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 90748e7ff88..f675ef5314a 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -288,7 +288,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { servers.foreach(server => killBroker(server.config.brokerId)) val closeTimeout = 2000 val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, Some(closeTimeout)) - val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, Some(requestTimeout), Some(requestTimeout)) + val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(requestTimeout)) future1.get future2.get }