mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-16954: fix consumer close to release assignment in background (#16343)
This PR fixes consumer close to avoid updating the subscription state object in the app thread. Now the close simply triggers an UnsubscribeEvent that is handled in the background to trigger callbacks, clear assignment, and send leave heartbeat. Note that after triggering the event, the unsubscribe will continuously process background events until the event completes, to ensure that it allows for callbacks to run in the app thread. The logic around what happens if the unsubscribe fails remain unchanged: close will log, keep the first exception and carry on. It also removes the redundant LeaveOnClose event (it used to do the the exact same thing as the UnsubscribeEvent, both calling membershipMgr.leaveGroup). Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
		
							parent
							
								
									166d9e8059
								
							
						
					
					
						commit
						6c4e777079
					
				|  | @ -53,7 +53,6 @@ import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe | ||||||
| import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; | import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.EventProcessor; | import org.apache.kafka.clients.consumer.internals.events.EventProcessor; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; | import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; |  | ||||||
| import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; | import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; | import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.PollEvent; | import org.apache.kafka.clients.consumer.internals.events.PollEvent; | ||||||
|  | @ -90,7 +89,6 @@ import org.apache.kafka.common.utils.AppInfoParser; | ||||||
| import org.apache.kafka.common.utils.LogContext; | import org.apache.kafka.common.utils.LogContext; | ||||||
| import org.apache.kafka.common.utils.Time; | import org.apache.kafka.common.utils.Time; | ||||||
| import org.apache.kafka.common.utils.Timer; | import org.apache.kafka.common.utils.Timer; | ||||||
| import org.apache.kafka.common.utils.Utils; |  | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.event.Level; | import org.slf4j.event.Level; | ||||||
| 
 | 
 | ||||||
|  | @ -109,7 +107,6 @@ import java.util.Optional; | ||||||
| import java.util.OptionalLong; | import java.util.OptionalLong; | ||||||
| import java.util.Set; | import java.util.Set; | ||||||
| import java.util.SortedSet; | import java.util.SortedSet; | ||||||
| import java.util.TreeSet; |  | ||||||
| import java.util.concurrent.BlockingQueue; | import java.util.concurrent.BlockingQueue; | ||||||
| import java.util.concurrent.CompletableFuture; | import java.util.concurrent.CompletableFuture; | ||||||
| import java.util.concurrent.Future; | import java.util.concurrent.Future; | ||||||
|  | @ -1236,8 +1233,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { | ||||||
|         clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); |         clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); | ||||||
|         closeTimer.update(); |         closeTimer.update(); | ||||||
|         // Prepare shutting down the network thread |         // Prepare shutting down the network thread | ||||||
|         releaseAssignmentAndLeaveGroup(closeTimer, firstException); |         swallow(log, Level.ERROR, "Failed to release assignment before closing consumer", | ||||||
|         closeTimer.update(); |             () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException); | ||||||
|         swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", |         swallow(log, Level.ERROR, "Failed invoking asynchronous commit callback.", | ||||||
|             () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); |             () -> awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), firstException); | ||||||
|         if (applicationEventHandler != null) |         if (applicationEventHandler != null) | ||||||
|  | @ -1269,10 +1266,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { | ||||||
|     /** |     /** | ||||||
|      * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: |      * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: | ||||||
|      * 1. autocommit offsets |      * 1. autocommit offsets | ||||||
|      * 2. revoke all partitions |      * 2. release assignment. This is done via a background unsubscribe event that will | ||||||
|      * 3. if partition revocation completes successfully, send leave group |      * trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker | ||||||
|      */ |      */ | ||||||
|     void releaseAssignmentAndLeaveGroup(final Timer timer, final AtomicReference<Throwable> firstException) { |     private void releaseAssignmentAndLeaveGroup(final Timer timer) { | ||||||
|         if (!groupMetadata.get().isPresent()) |         if (!groupMetadata.get().isPresent()) | ||||||
|             return; |             return; | ||||||
| 
 | 
 | ||||||
|  | @ -1280,10 +1277,19 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { | ||||||
|             commitSyncAllConsumed(timer); |             commitSyncAllConsumed(timer); | ||||||
| 
 | 
 | ||||||
|         applicationEventHandler.add(new CommitOnCloseEvent()); |         applicationEventHandler.add(new CommitOnCloseEvent()); | ||||||
|         completeQuietly(() -> maybeRevokePartitions(), | 
 | ||||||
|             "Failed to execute callback to release assignment", firstException); |         log.info("Releasing assignment and leaving group before closing consumer"); | ||||||
|         completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))), |         UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); | ||||||
|             "Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); |         applicationEventHandler.add(unsubscribeEvent); | ||||||
|  |         try { | ||||||
|  |             processBackgroundEvents(unsubscribeEvent.future(), timer); | ||||||
|  |             log.info("Completed releasing assignment and sending leave group to close consumer"); | ||||||
|  |         } catch (TimeoutException e) { | ||||||
|  |             log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + | ||||||
|  |                 "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); | ||||||
|  |         } finally { | ||||||
|  |             timer.update(); | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Visible for testing |     // Visible for testing | ||||||
|  | @ -1299,37 +1305,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { | ||||||
|         timer.update(); |         timer.update(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Visible for testing |  | ||||||
|     void maybeRevokePartitions() { |  | ||||||
|         if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) |  | ||||||
|             return; |  | ||||||
|         try { |  | ||||||
|             SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); |  | ||||||
|             droppedPartitions.addAll(subscriptions.assignedPartitions()); |  | ||||||
|             if (subscriptions.rebalanceListener().isPresent()) |  | ||||||
|                 subscriptions.rebalanceListener().get().onPartitionsRevoked(droppedPartitions); |  | ||||||
|         } catch (Exception e) { |  | ||||||
|             throw new KafkaException(e); |  | ||||||
|         } finally { |  | ||||||
|             subscriptions.assignFromSubscribed(Collections.emptySet()); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     // Visible for testing |  | ||||||
|     void completeQuietly(final Utils.ThrowingRunnable function, |  | ||||||
|                          final String msg, |  | ||||||
|                          final AtomicReference<Throwable> firstException) { |  | ||||||
|         try { |  | ||||||
|             function.run(); |  | ||||||
|         } catch (TimeoutException e) { |  | ||||||
|             log.debug("Timeout expired before the {} operation could complete.", msg); |  | ||||||
|             firstException.compareAndSet(null, e); |  | ||||||
|         } catch (Exception e) { |  | ||||||
|             log.error(msg, e); |  | ||||||
|             firstException.compareAndSet(null, e); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     @Override |     @Override | ||||||
|     public void wakeup() { |     public void wakeup() { | ||||||
|         wakeupTrigger.wakeup(); |         wakeupTrigger.wakeup(); | ||||||
|  |  | ||||||
|  | @ -31,7 +31,7 @@ public abstract class ApplicationEvent { | ||||||
|         COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, |         COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, | ||||||
|         LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, |         LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE, | ||||||
|         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, |         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, | ||||||
|         COMMIT_ON_CLOSE, LEAVE_ON_CLOSE |         COMMIT_ON_CLOSE | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private final Type type; |     private final Type type; | ||||||
|  |  | ||||||
|  | @ -32,7 +32,6 @@ import org.slf4j.Logger; | ||||||
| 
 | 
 | ||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
| import java.util.Objects; |  | ||||||
| import java.util.concurrent.CompletableFuture; | import java.util.concurrent.CompletableFuture; | ||||||
| import java.util.function.BiConsumer; | import java.util.function.BiConsumer; | ||||||
| import java.util.function.Supplier; | import java.util.function.Supplier; | ||||||
|  | @ -119,10 +118,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven | ||||||
|                 process((CommitOnCloseEvent) event); |                 process((CommitOnCloseEvent) event); | ||||||
|                 return; |                 return; | ||||||
| 
 | 
 | ||||||
|             case LEAVE_ON_CLOSE: |  | ||||||
|                 process((LeaveOnCloseEvent) event); |  | ||||||
|                 return; |  | ||||||
| 
 |  | ||||||
|             default: |             default: | ||||||
|                 log.warn("Application event type " + event.type() + " was not expected"); |                 log.warn("Application event type " + event.type() + " was not expected"); | ||||||
|         } |         } | ||||||
|  | @ -268,20 +263,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven | ||||||
|         requestManagers.commitRequestManager.get().signalClose(); |         requestManagers.commitRequestManager.get().signalClose(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private void process(final LeaveOnCloseEvent event) { |  | ||||||
|         if (!requestManagers.heartbeatRequestManager.isPresent()) { |  | ||||||
|             event.future().complete(null); |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|         MembershipManager membershipManager = |  | ||||||
|             Objects.requireNonNull(requestManagers.heartbeatRequestManager.get().membershipManager(), "Expecting " + |  | ||||||
|                 "membership manager to be non-null"); |  | ||||||
|         log.debug("Leaving group before closing"); |  | ||||||
|         CompletableFuture<Void> future = membershipManager.leaveGroup(); |  | ||||||
|         // The future will be completed on heartbeat sent |  | ||||||
|         future.whenComplete(complete(event.future())); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) { |     private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) { | ||||||
|         return (value, exception) -> { |         return (value, exception) -> { | ||||||
|             if (exception != null) |             if (exception != null) | ||||||
|  |  | ||||||
|  | @ -1,24 +0,0 @@ | ||||||
| /* |  | ||||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more |  | ||||||
|  * contributor license agreements. See the NOTICE file distributed with |  | ||||||
|  * this work for additional information regarding copyright ownership. |  | ||||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 |  | ||||||
|  * (the "License"); you may not use this file except in compliance with |  | ||||||
|  * the License. You may obtain a copy of the License at |  | ||||||
|  * |  | ||||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 |  | ||||||
|  * |  | ||||||
|  * Unless required by applicable law or agreed to in writing, software |  | ||||||
|  * distributed under the License is distributed on an "AS IS" BASIS, |  | ||||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |  | ||||||
|  * See the License for the specific language governing permissions and |  | ||||||
|  * limitations under the License. |  | ||||||
|  */ |  | ||||||
| package org.apache.kafka.clients.consumer.internals.events; |  | ||||||
| 
 |  | ||||||
| public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> { |  | ||||||
| 
 |  | ||||||
|     public LeaveOnCloseEvent(final long deadlineMs) { |  | ||||||
|         super(Type.LEAVE_ON_CLOSE, deadlineMs); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | @ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListe | ||||||
| import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; | import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.EventProcessor; | import org.apache.kafka.clients.consumer.internals.events.EventProcessor; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; | import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent; |  | ||||||
| import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; | import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; | import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; | ||||||
| import org.apache.kafka.clients.consumer.internals.events.PollEvent; | import org.apache.kafka.clients.consumer.internals.events.PollEvent; | ||||||
|  | @ -108,7 +107,6 @@ import java.util.regex.Pattern; | ||||||
| import java.util.stream.Stream; | import java.util.stream.Stream; | ||||||
| 
 | 
 | ||||||
| import static java.util.Arrays.asList; | import static java.util.Arrays.asList; | ||||||
| import static java.util.Collections.emptySet; |  | ||||||
| import static java.util.Collections.singleton; | import static java.util.Collections.singleton; | ||||||
| import static java.util.Collections.singletonList; | import static java.util.Collections.singletonList; | ||||||
| import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; | import static org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED; | ||||||
|  | @ -131,7 +129,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; | ||||||
| import static org.junit.jupiter.api.Assertions.assertTrue; | import static org.junit.jupiter.api.Assertions.assertTrue; | ||||||
| import static org.junit.jupiter.api.Assertions.fail; | import static org.junit.jupiter.api.Assertions.fail; | ||||||
| import static org.mockito.ArgumentMatchers.any; | import static org.mockito.ArgumentMatchers.any; | ||||||
| import static org.mockito.ArgumentMatchers.eq; |  | ||||||
| import static org.mockito.Mockito.atLeast; | import static org.mockito.Mockito.atLeast; | ||||||
| import static org.mockito.Mockito.doAnswer; | import static org.mockito.Mockito.doAnswer; | ||||||
| import static org.mockito.Mockito.doReturn; | import static org.mockito.Mockito.doReturn; | ||||||
|  | @ -139,6 +136,7 @@ import static org.mockito.Mockito.doThrow; | ||||||
| import static org.mockito.Mockito.mock; | import static org.mockito.Mockito.mock; | ||||||
| import static org.mockito.Mockito.mockStatic; | import static org.mockito.Mockito.mockStatic; | ||||||
| import static org.mockito.Mockito.never; | import static org.mockito.Mockito.never; | ||||||
|  | import static org.mockito.Mockito.spy; | ||||||
| import static org.mockito.Mockito.verify; | import static org.mockito.Mockito.verify; | ||||||
| import static org.mockito.Mockito.when; | import static org.mockito.Mockito.when; | ||||||
| import static org.mockito.Mockito.clearInvocations; | import static org.mockito.Mockito.clearInvocations; | ||||||
|  | @ -241,6 +239,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|     @Test |     @Test | ||||||
|     public void testSuccessfulStartupShutdown() { |     public void testSuccessfulStartupShutdown() { | ||||||
|         consumer = newConsumer(); |         consumer = newConsumer(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         assertDoesNotThrow(() -> consumer.close()); |         assertDoesNotThrow(() -> consumer.close()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -253,6 +252,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|     @Test |     @Test | ||||||
|     public void testFailOnClosedConsumer() { |     public void testFailOnClosedConsumer() { | ||||||
|         consumer = newConsumer(); |         consumer = newConsumer(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         consumer.close(); |         consumer.close(); | ||||||
|         final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); |         final IllegalStateException res = assertThrows(IllegalStateException.class, consumer::assignment); | ||||||
|         assertEquals("This consumer has already been closed.", res.getMessage()); |         assertEquals("This consumer has already been closed.", res.getMessage()); | ||||||
|  | @ -945,6 +945,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|     @Test |     @Test | ||||||
|     public void testEnsureShutdownExecutedCommitAsyncCallbacks() { |     public void testEnsureShutdownExecutedCommitAsyncCallbacks() { | ||||||
|         consumer = newConsumer(); |         consumer = newConsumer(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         MockCommitCallback callback = new MockCommitCallback(); |         MockCommitCallback callback = new MockCommitCallback(); | ||||||
|         completeCommitAsyncApplicationEventSuccessfully(); |         completeCommitAsyncApplicationEventSuccessfully(); | ||||||
|         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); |         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); | ||||||
|  | @ -956,66 +957,45 @@ public class AsyncKafkaConsumerTest { | ||||||
|     @Test |     @Test | ||||||
|     public void testVerifyApplicationEventOnShutdown() { |     public void testVerifyApplicationEventOnShutdown() { | ||||||
|         consumer = newConsumer(); |         consumer = newConsumer(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         doReturn(null).when(applicationEventHandler).addAndGet(any()); |         doReturn(null).when(applicationEventHandler).addAndGet(any()); | ||||||
|         consumer.close(); |         consumer.close(); | ||||||
|         verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); |         verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); | ||||||
|         verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); |         verify(applicationEventHandler).add(any(CommitOnCloseEvent.class)); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|     public void testPartitionRevocationOnClose() { |     public void testUnsubscribeOnClose() { | ||||||
|         MockRebalanceListener listener = new MockRebalanceListener(); |         SubscriptionState subscriptions = mock(SubscriptionState.class); | ||||||
|         SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); |         consumer = spy(newConsumer( | ||||||
|         consumer = newConsumer( |  | ||||||
|             mock(FetchBuffer.class), |             mock(FetchBuffer.class), | ||||||
|             mock(ConsumerInterceptors.class), |             mock(ConsumerInterceptors.class), | ||||||
|             mock(ConsumerRebalanceListenerInvoker.class), |             mock(ConsumerRebalanceListenerInvoker.class), | ||||||
|             subscriptions, |             subscriptions, | ||||||
|             "group-id", |             "group-id", | ||||||
|             "client-id"); |             "client-id")); | ||||||
| 
 |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         consumer.subscribe(singleton("topic"), listener); |  | ||||||
|         subscriptions.assignFromSubscribed(singleton(new TopicPartition("topic", 0))); |  | ||||||
|         consumer.close(Duration.ZERO); |         consumer.close(Duration.ZERO); | ||||||
|         assertTrue(subscriptions.assignedPartitions().isEmpty()); |         verifyUnsubscribeEvent(subscriptions); | ||||||
|         assertEquals(1, listener.revokedCount); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|     public void testFailedPartitionRevocationOnClose() { |     public void testFailedPartitionRevocationOnClose() { | ||||||
|         // If rebalance listener failed to execute during close, we still send the leave group, |         // If rebalance listener failed to execute during close, we still send the leave group, | ||||||
|         // and proceed with closing the consumer. |         // and proceed with closing the consumer. | ||||||
|         ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class); |         SubscriptionState subscriptions = mock(SubscriptionState.class); | ||||||
|         SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); |         consumer = spy(newConsumer( | ||||||
|         consumer = newConsumer( |  | ||||||
|             mock(FetchBuffer.class), |             mock(FetchBuffer.class), | ||||||
|             new ConsumerInterceptors<>(Collections.emptyList()), |             new ConsumerInterceptors<>(Collections.emptyList()), | ||||||
|             mock(ConsumerRebalanceListenerInvoker.class), |             mock(ConsumerRebalanceListenerInvoker.class), | ||||||
|             subscriptions, |             subscriptions, | ||||||
|             "group-id", |             "group-id", | ||||||
|             "client-id"); |             "client-id")); | ||||||
|         subscriptions.subscribe(singleton("topic"), Optional.of(listener)); |         doThrow(new KafkaException()).when(consumer).processBackgroundEvents(any(), any()); | ||||||
|         TopicPartition tp = new TopicPartition("topic", 0); |  | ||||||
|         subscriptions.assignFromSubscribed(singleton(tp)); |  | ||||||
|         doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp))); |  | ||||||
|         assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); |         assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO)); | ||||||
|         verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class)); |         verifyUnsubscribeEvent(subscriptions); | ||||||
|         verify(listener).onPartitionsRevoked(eq(singleton(tp))); |         // Close operation should carry on even if the unsubscribe fails | ||||||
|         assertEquals(emptySet(), subscriptions.assignedPartitions()); |         verify(applicationEventHandler).close(any(Duration.class)); | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     @Test |  | ||||||
|     public void testCompleteQuietly() { |  | ||||||
|         AtomicReference<Throwable> exception = new AtomicReference<>(); |  | ||||||
|         CompletableFuture<Object> future = CompletableFuture.completedFuture(null); |  | ||||||
|         consumer = newConsumer(); |  | ||||||
|         assertDoesNotThrow(() -> consumer.completeQuietly(() -> future.get(0, TimeUnit.MILLISECONDS), "test", exception)); |  | ||||||
|         assertNull(exception.get()); |  | ||||||
| 
 |  | ||||||
|         assertDoesNotThrow(() -> consumer.completeQuietly(() -> { |  | ||||||
|             throw new KafkaException("Test exception"); |  | ||||||
|         }, "test", exception)); |  | ||||||
|         assertInstanceOf(KafkaException.class, exception.get()); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -1316,7 +1296,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|             } |             } | ||||||
|             return null; |             return null; | ||||||
|         }).when(applicationEventHandler).add(any()); |         }).when(applicationEventHandler).add(any()); | ||||||
| 
 |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         consumer.close(Duration.ZERO); |         consumer.close(Duration.ZERO); | ||||||
| 
 | 
 | ||||||
|         // A commit was triggered and not completed exceptionally by the wakeup |         // A commit was triggered and not completed exceptionally by the wakeup | ||||||
|  | @ -1354,6 +1334,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|         completeCommitAsyncApplicationEventSuccessfully(); |         completeCommitAsyncApplicationEventSuccessfully(); | ||||||
|         consumer.commitAsync(cb); |         consumer.commitAsync(cb); | ||||||
| 
 | 
 | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10))); |         assertDoesNotThrow(() -> consumer.close(Duration.ofMillis(10))); | ||||||
|         assertEquals(1, cb.invoked); |         assertEquals(1, cb.invoked); | ||||||
|     } |     } | ||||||
|  | @ -1368,6 +1349,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|         consumer = newConsumer(props); |         consumer = newConsumer(props); | ||||||
|         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); |         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get()); | ||||||
|         completeCommitSyncApplicationEventSuccessfully(); |         completeCommitSyncApplicationEventSuccessfully(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
| 
 | 
 | ||||||
|         consumer.close(Duration.ZERO); |         consumer.close(Duration.ZERO); | ||||||
| 
 | 
 | ||||||
|  | @ -2072,6 +2054,7 @@ public class AsyncKafkaConsumerTest { | ||||||
|     @Test |     @Test | ||||||
|     void testReaperInvokedInClose() { |     void testReaperInvokedInClose() { | ||||||
|         consumer = newConsumer(); |         consumer = newConsumer(); | ||||||
|  |         completeUnsubscribeApplicationEventSuccessfully(); | ||||||
|         consumer.close(); |         consumer.close(); | ||||||
|         verify(backgroundEventReaper).reap(backgroundEventQueue); |         verify(backgroundEventReaper).reap(backgroundEventQueue); | ||||||
|     } |     } | ||||||
|  | @ -2093,6 +2076,18 @@ public class AsyncKafkaConsumerTest { | ||||||
|         verify(backgroundEventReaper).reap(time.milliseconds()); |         verify(backgroundEventReaper).reap(time.milliseconds()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     private void verifyUnsubscribeEvent(SubscriptionState subscriptions) { | ||||||
|  |         // Check that an unsubscribe event was generated, and that the consumer waited for it to | ||||||
|  |         // complete processing background events. | ||||||
|  |         verify(applicationEventHandler).add(any(UnsubscribeEvent.class)); | ||||||
|  |         verify(consumer).processBackgroundEvents(any(), any()); | ||||||
|  | 
 | ||||||
|  |         // The consumer should not clear the assignment in the app thread. The unsubscribe | ||||||
|  |         // event is the one responsible for updating the assignment in the background when it | ||||||
|  |         // completes. | ||||||
|  |         verify(subscriptions, never()).assignFromSubscribed(any()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { |     private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { | ||||||
|         final TopicPartition t0 = new TopicPartition("t0", 2); |         final TopicPartition t0 = new TopicPartition("t0", 2); | ||||||
|         final TopicPartition t1 = new TopicPartition("t0", 3); |         final TopicPartition t1 = new TopicPartition("t0", 3); | ||||||
|  |  | ||||||
|  | @ -27,25 +27,18 @@ import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; | ||||||
| import org.apache.kafka.clients.consumer.internals.RequestManagers; | import org.apache.kafka.clients.consumer.internals.RequestManagers; | ||||||
| import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; | import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; | ||||||
| import org.apache.kafka.common.utils.LogContext; | import org.apache.kafka.common.utils.LogContext; | ||||||
| import org.apache.kafka.common.utils.MockTime; |  | ||||||
| import org.apache.kafka.common.utils.Time; |  | ||||||
| import org.junit.jupiter.api.BeforeEach; | import org.junit.jupiter.api.BeforeEach; | ||||||
| import org.junit.jupiter.api.Test; | import org.junit.jupiter.api.Test; | ||||||
| 
 | 
 | ||||||
| import java.util.Collections; | import java.util.Collections; | ||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.Optional; | import java.util.Optional; | ||||||
| import java.util.concurrent.CompletableFuture; |  | ||||||
| 
 | 
 | ||||||
| import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; |  | ||||||
| import static org.junit.jupiter.api.Assertions.assertTrue; |  | ||||||
| import static org.mockito.Mockito.doReturn; | import static org.mockito.Mockito.doReturn; | ||||||
| import static org.mockito.Mockito.mock; | import static org.mockito.Mockito.mock; | ||||||
| import static org.mockito.Mockito.verify; | import static org.mockito.Mockito.verify; | ||||||
| import static org.mockito.Mockito.when; |  | ||||||
| 
 | 
 | ||||||
| public class ApplicationEventProcessorTest { | public class ApplicationEventProcessorTest { | ||||||
|     private final Time time = new MockTime(1); |  | ||||||
|     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); |     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); | ||||||
|     private ApplicationEventProcessor processor; |     private ApplicationEventProcessor processor; | ||||||
|     private CommitRequestManager commitRequestManager; |     private CommitRequestManager commitRequestManager; | ||||||
|  | @ -87,16 +80,6 @@ public class ApplicationEventProcessorTest { | ||||||
|         verify(commitRequestManager).signalClose(); |         verify(commitRequestManager).signalClose(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |  | ||||||
|     public void testPrepClosingLeaveGroupEvent() { |  | ||||||
|         LeaveOnCloseEvent event = new LeaveOnCloseEvent(calculateDeadlineMs(time, 100)); |  | ||||||
|         when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); |  | ||||||
|         when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); |  | ||||||
|         processor.process(event); |  | ||||||
|         verify(membershipManager).leaveGroup(); |  | ||||||
|         assertTrue(event.future().isDone()); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() { |     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() { | ||||||
|         return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); |         return Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class)); | ||||||
|     } |     } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue