Compare commits

...

90 Commits

Author SHA1 Message Date
Kirk True 4305cfd51f
Merge f660b15e52 into 4a5aa37169 2025-10-07 21:21:36 +00:00
Kirk True f660b15e52 Refactor regex subscription evaluation 2025-10-07 14:21:30 -07:00
Chang-Chi Hsu 4a5aa37169
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server module (#20636)
CI / build (push) Waiting to run Details
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2025-10-08 01:10:58 +08:00
Kirk True 5215030a4c Update AsyncKafkaConsumer.java 2025-10-07 08:57:08 -07:00
Kirk True 68f5bc19d3 Whitespace change 2025-10-07 08:54:06 -07:00
Kirk True 2e5a982c1d Revert change to AsyncKafkaConsumer 2025-10-07 08:52:34 -07:00
Kirk True 4985c7de17 Updates for MetadataErrorNotifiableEvent 2025-10-07 08:52:21 -07:00
Kirk True e7b53865fd Fix stupid typo for poll timeout 2025-10-06 18:24:52 -07:00
Kirk True 91409bc9d3 Make ConsumerIntegrationTest.testRackAwareAssignment() poll more frequently to reconcile assignment changes more quickly 2025-10-06 16:50:31 -07:00
Kirk True d939675641 Added comment to NetworkClientDelegate.doSend() change 2025-10-06 16:49:22 -07:00
Kirk True b04b0a2c73 Removed superfluous imports 2025-10-06 14:43:47 -07:00
Kirk True 19ce01afa3 Removed check-and-update-positions future timeout and removed AsyncPollEvent.Result inner class 2025-10-06 14:41:03 -07:00
Kirk True 5be72105ba Update ApplicationEventProcessor.java 2025-10-06 10:21:06 -07:00
Kirk True fa784e5f25 Refactored AsyncPollEvent to remove State 2025-10-06 10:16:09 -07:00
Kirk True cac80ef38f Reverted minor import change with SaslClientsWithInvalidCredentialsTest 2025-10-06 09:33:49 -07:00
Kirk True b5f4c8683c Minor refactoring, cleanup, extraneous whitespace removal 2025-10-06 09:01:55 -07:00
Kirk True dcbe761869 Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-10-06 08:53:26 -07:00
Kirk True ddf042306c Revert wakeup() method visibility back to package-private 2025-10-05 21:19:16 -07:00
Kirk True f4eb1619fd Inline trackCheckAndUpdatePositionsForTimeout in process(AsyncPollEvent)
Moved the timeout tracking for check-and-update-positions logic directly into the main event processing flow, eliminating the separate trackCheckAndUpdatePositionsForTimeout method. This streamlines the handling of timeouts and improves code clarity by keeping related logic together.
2025-10-05 21:13:10 -07:00
Kirk True 767316ba60 Refactoring to remove interim callback step 2025-10-05 21:06:08 -07:00
Kirk True eace3ee1d6 Add detailed Javadoc to MetadataErrorNotifiable
Expanded the interface documentation to describe how metadata errors are detected and handled in the ConsumerNetworkThread loop, including when and how the metadataError method is invoked.
2025-10-05 19:26:07 -07:00
Kirk True 7ca4cc835e Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-10-05 17:03:21 -07:00
Kirk True e1cf7b7056 Replace awaitNonEmptyRecords with waitForRecords in PlaintextConsumerTest 2025-10-03 19:44:58 -07:00
Kirk True a8ccdb6f48 Replace pollRecordsUntilTrue with waitUntilTrue in verifyConsumerWithAuthenticationFailure 2025-10-03 19:30:22 -07:00
Kirk True 9fb9ee9e6e Refactoring 2025-10-03 16:35:46 -07:00
Kirk True 461ffdd9b0 Change debug to trace in ApplicationEventProcessor 2025-10-03 10:41:11 -07:00
Kirk True 2b2f70c36e Using Consumer.poll() helper methods from TestUtils 2025-10-03 10:40:52 -07:00
Kirk True a233e90d5d Remove unused processUpdatePatternSubscriptionEvent method
Deleted the private processUpdatePatternSubscriptionEvent method from ApplicationEventProcessor as it is no longer used.
2025-10-02 21:25:42 -07:00
Kirk True 0256bdf274 Moving the regex subscription check out of the poll path 2025-10-02 15:54:21 -07:00
Kirk True f6864a3ac4 Refactor POLL into ASYNC_POLL and SHARE_POLL 2025-10-02 14:20:15 -07:00
Kirk True 5041a36e6a Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-10-01 15:46:02 -07:00
Kirk True 5d1a34d27f Relying on reading records to determine group membership for streams group offsets tests
Refactored tests to use a counter for consumed records and added explicit consumer configuration with AUTO_OFFSET_RESET set to 'earliest'. This ensures all produced records are consumed and improves reliability of partition assignment checks.
2025-10-01 13:09:15 -07:00
Kirk True e9dbc61bff Notify FetchBuffer from background thread if pausing due to application thread callbacks
FetchBuffer is now provided to CompositePollEventProcessorContext and its supplier, allowing the context to call fetchBuffer.wakeup() after completing an event. The wakeup method in FetchBuffer is made public to support this usage.
2025-10-01 13:08:19 -07:00
Kirk True bc660d6462 Reverting changes made to omit pending-callback partitions from being returned 2025-09-30 16:27:39 -07:00
Kirk True 7ba60474c0 Improve logging and comments in poll event classes
Updated comments in CompositePollEvent for clarity and improved the debug log message in CompositePollEventInvoker to better describe event failure handling.
2025-09-30 16:25:50 -07:00
Kirk True cc49db81ba Update CompositePollEvent.java 2025-09-30 13:35:16 -07:00
Kirk True ab42fca4f4 Fixed streams tests in PlaintextAdminIntegrationTest
Surfaced isPendingCallbacks() at the SubscriptionState API so that partitions that are pending callback invocation are not returned. This is also in agreement with the JavaDoc which states:

"This will give the set of topic partitions currently assigned to the consumer (which may be none if . . . the partitions are in the process of getting reassigned)."
2025-09-30 13:20:05 -07:00
Kirk True 0aed4aff89 Minor refactoring and added documentation
Replaces nextEventType with startingEventType in CompositePollEvent and related classes for improved clarity and correctness. Adds validation for allowed starting event types, updates method names, improves logging, and enhances documentation for event processing context and state transitions.
2025-09-30 10:50:07 -07:00
Kirk True 5c99d81b18 Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-09-29 21:27:27 -07:00
Kirk True 72472c5fc6 Refactor composite poll event processing and context
Refactors the composite poll event flow to use explicit state transitions and result handling, replacing Optional-based APIs with more direct methods. Introduces clearer state management for CompositePollEvent, updates ApplicationEventProcessor and CompositePollEventProcessorContext to use new completion and error handling methods, and improves documentation and test utilities. Updates tests and utility methods to match the new APIs and behaviors.
2025-09-29 15:59:20 -07:00
Kirk True 164dfdd5ff Remove unused KafkaException import in ConsumerPollTestUtils
Eliminated an unused import of KafkaException from ConsumerPollTestUtils.java to clean up the code.
2025-09-29 13:05:10 -07:00
Kirk True a157071a0d Refactor and cleanup 2025-09-29 13:04:57 -07:00
Kirk True 8235ed2256 Refactoring and clean up 2025-09-29 12:58:07 -07:00
Kirk True f45b70e688 Refactoring 2025-09-29 12:26:11 -07:00
Kirk True bfcd7ec0f8 More clean up and refactoring 2025-09-29 12:16:54 -07:00
Kirk True 52c08455c1 Refactoring and clean up 2025-09-29 11:56:40 -07:00
Kirk True 5af8fc4357 Updates and fixes for a couple of integration tests 2025-09-29 09:21:09 -07:00
Kirk True 985bbd7582 Improve consumer poll reliability in integration tests
Replaced direct assertions on consumer.poll() with TestUtils.waitUntilTrue in several integration tests to ensure expected results or exceptions are observed within a timeout. Also refactored CompositePollEventInvoker to rename 'latest' to 'inflight' for clarity and improved logging. These changes enhance test robustness and code readability.
2025-09-23 18:31:57 -07:00
Kirk True 1f1ae24538 Remove commented-out event processing code
Deleted commented lines for processBackgroundEvents and offsetCommitCallbackInvoker.executeCallbacks in AsyncKafkaConsumer, cleaning up unused code.
2025-09-23 17:27:38 -07:00
Kirk True 2aaca8db9d Move pollForRecords helper method in KafkaConsumerTest
Relocated the pollForRecords() helper method from its previous position to after the testPollIdleRatio method for improved code organization in KafkaConsumerTest.
2025-09-23 17:27:09 -07:00
Kirk True 8b33f081ce Refactor lambda in waitForConsumerPollException call
Replaces single-line lambda with block lambda for clarity in the waitForConsumerPollException call within AsyncKafkaConsumerTest. No functional changes; improves readability.
2025-09-23 17:26:58 -07:00
Kirk True 91e881f2b4 Fix typo in CompositePollPseudoEvent class name
Renamed CompositePollPsuedoEvent to CompositePollPseudoEvent in ApplicationEventProcessor to correct a spelling error and ensure consistency in class naming.
2025-09-23 17:26:48 -07:00
Kirk True 2302427116 Remove debug logging from CompositePollEventInvoker
Eliminated a debug log statement that printed the result and state in CompositePollEventInvoker. This helps reduce unnecessary log output during normal operation.
2025-09-23 17:26:37 -07:00
Kirk True b1937702d2 Refactor CompositePollEventInvoker to standalone class
Moved CompositePollEventInvoker from AsyncKafkaConsumer to its own file for better separation of concerns and testability. Updated AsyncKafkaConsumer to use the new class and refactored constructors accordingly. Enhanced related tests to use new helper methods for polling and exception handling, improving test clarity and reliability.
2025-09-23 16:42:53 -07:00
Kirk True 2d21fa0fdf Refactor poll event handling and metadata error management
Simplifies AsyncKafkaConsumer's CompositePollEventInvoker by removing backoff logic and streamlining state handling. NetworkClientDelegate now uses AtomicReference for metadataError to improve thread safety. ApplicationEventProcessor refines error handling in composite poll events. Updates tests to reflect API changes and exception types.
2025-09-23 13:49:37 -07:00
Kirk True 1e52282c41 Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-09-23 10:22:48 -07:00
Kirk True 1570f6997f Handle exceptions in AsyncKafkaConsumer poll event
Wrap and propagate exceptions from resultOrError in AsyncKafkaConsumer, ensuring latest request is cleared when an error occurs. Also clear latest when poll event completes to properly track request lifecycle.
2025-09-22 21:12:06 -07:00
Kirk True abaa4dc639 Reset backoff on event submission in AsyncKafkaConsumer
Sets backoff to -1 when submitting a new application event in AsyncKafkaConsumer. This ensures backoff state is reset for each event submission.
2025-09-22 18:48:46 -07:00
Kirk True f40a4ac27f Refactor consumer record polling in tests
Replaces direct calls to consumer.poll with a new pollForRecords() helper method in multiple test cases. This improves code reuse and reliability by waiting for records to be available, and removes unnecessary suppress warnings and unchecked casts.
2025-09-22 18:48:37 -07:00
Kirk True 71120224f4 Update AsyncKafkaConsumer.java 2025-09-20 16:51:30 -07:00
Kirk True 81598844bd Refactor application thread requirement handling
Introduces AsyncConsumerApplicationThreadRequirement to encapsulate logic for determining when to pause event processing for application thread execution. Updates ApplicationEventProcessor and related classes to use a unified CompositePollApplicationThreadRequirement interface, simplifying constructor signatures and improving code clarity.
2025-09-20 16:44:49 -07:00
Kirk True 81b707e745 Update NetworkClientDelegate.java 2025-09-20 16:19:30 -07:00
Kirk True 702b25753b Revert removal of contains() from CompletableEventReaper 2025-09-20 16:18:38 -07:00
Kirk True 99304db9e8 Remove extra whitespace in NetworkClientDelegate
Cleaned up unnecessary whitespace and blank lines in NetworkClientDelegate.java to improve code readability.
2025-09-20 16:18:19 -07:00
Kirk True 9d65fa22f5
Merge pull request #10 from kirktrue/KAFKA-18376-chain-events-in-background-thread-without-addAndGet 2025-09-20 16:14:23 -07:00
Kirk True 56062f5b01 Update NetworkClientDelegate.java 2025-09-20 16:13:40 -07:00
Kirk True ea99a13021 Handle immediate metadata errors for CompletableEvents
Added logic to check and fail CompletableEvents for metadata errors immediately upon processing, ensuring events that do not enter the awaiting state are handled correctly. Updated related tests to use consistent mocks and reduced poll durations for faster execution.
2025-09-20 16:12:38 -07:00
Kirk True 18f4fa11f3 Remove BackgroundEventHandler from OffsetsRequestManager
Eliminated the BackgroundEventHandler parameter from OffsetsRequestManager and its usages in RequestManagers and related tests. This simplifies the constructor and removes unnecessary dependencies.
2025-09-20 16:08:33 -07:00
Kirk True 2c3547e06a Inject NetworkClientDelegate into ApplicationEventProcessor
Adds NetworkClientDelegate as a dependency to ApplicationEventProcessor and updates AsyncKafkaConsumer and ShareConsumerImpl to supply it. Introduces error handling in composite poll processing using metadata errors from NetworkClientDelegate. Updates related tests to mock the new dependency.
2025-09-20 16:05:44 -07:00
Kirk True 6775aacc2c Refactor poll event handling and metadata error propagation
Refactored AsyncKafkaConsumer and related classes to improve composite poll event handling, including explicit state management and pausing for background event processing or offset commit callbacks. Metadata errors are now optionally propagated via a dedicated error field in NetworkClientDelegate, allowing for more flexible error handling. Updated tests and logging to reflect these changes.
2025-09-20 15:40:48 -07:00
Kirk True ae0ddcc4c0 Add completion tracking to CompositePollEvent
Introduces an AtomicBoolean to track completion state in CompositePollEvent and updates ApplicationEventProcessor to mark events as complete when appropriate. Refactors AsyncKafkaConsumer to use a new CompositePollEventInvoker for polling, replacing prepareFetch, and implements exponential backoff for incomplete events.
2025-09-20 13:41:07 -07:00
Kirk True 0ac19f96b9 Clean up 2025-09-19 14:44:37 -07:00
Kirk True 00f9069c98 Merge branch 'KAFKA-18376-chain-events-in-background-thread' into KAFKA-18376-chain-events-in-background-thread-without-addAndGet 2025-09-19 14:28:25 -07:00
Kirk True 524782cd79 Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread-without-addAndGet 2025-09-19 13:35:28 -07:00
Kirk True c6a7923280 Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-09-19 13:35:10 -07:00
Kirk True 784aad2d4c Moving toward a non-blocking poll() implementation 2025-09-19 11:22:42 -07:00
Kirk True 529aab3316 Update AsyncKafkaConsumer.java 2025-09-18 08:51:26 -07:00
Kirk True 3e0b920399 Refactor CompositePollEvent to use Blocker for state management
Replaces CompletableFuture-based state handling in CompositePollEvent with a new Blocker class for improved synchronization and exception handling. Updates AsyncKafkaConsumer, WakeupTrigger, ApplicationEventProcessor, and related tests to use Blocker, simplifying event completion and error propagation.
2025-09-17 20:49:53 -07:00
Kirk True 40f6754810 Minor updates for CompletableEventReaper logging 2025-09-17 12:31:11 -07:00
Kirk True aaefbeff32 Clean up logic related to metadata errors that can happen along any step of CompositePollEvent 2025-09-17 12:14:14 -07:00
Kirk True d253b847e6 Work in progress to get past most of the integration test issues 2025-09-16 17:24:50 -07:00
Kirk True 464d5bafb9 Removed the verbose logging 2025-09-16 13:47:15 -07:00
Kirk True 5e794ce079 Inject NetworkClientDelegate via supplier for ApplicationEventProcessor to check metadata errors
Refactors AsyncKafkaConsumer, ShareConsumerImpl, and ApplicationEventProcessor to inject NetworkClientDelegate using a supplier method. Adds a static supplier factory to NetworkClientDelegate for deferred instantiation. Updates related tests and construction logic to support the new dependency injection approach.
2025-09-15 15:57:06 -07:00
Kirk True 09f8cb57cf Add documentation for RequiresApplicationThreadExecution
Added a Javadoc comment to the RequiresApplicationThreadExecution interface to clarify its purpose and usage, specifically regarding the need to interrupt CompositePollEvent processing when requiresApplicationThread() returns true.
2025-09-15 14:50:37 -07:00
Kirk True dbc4773a34 Updates to fix inverted logic in maybeInterruptCompositePoll() 2025-09-15 14:46:46 -07:00
Kirk True d3fa910d10 Minor clean up from design review 2025-09-15 14:06:41 -07:00
Kirk True 9fd7e5870a Merge branch 'trunk' into KAFKA-18376-chain-events-in-background-thread 2025-09-15 10:45:27 -07:00
Kirk True d4802c78e3 Re-enabling tests in AsyncKafkaConsumer 2025-09-10 21:16:46 -07:00
Kirk True b5d7d01dbc [WIP] More work on correctness 2025-09-10 17:22:45 -07:00
Kirk True 34932e2222 [WIP] KAFKA-18376: High CPU load when AsyncKafkaConsumer uses a small max poll value
Introduces CompositePollEvent and CompositePollResult to refactor and streamline the poll event handling in AsyncKafkaConsumer and ApplicationEventProcessor. The new approach enables multi-step polling logic, improves callback and background event processing, and enhances testability. Also adds size methods to BackgroundEventHandler and OffsetCommitCallbackInvoker, disables several tests, and updates related classes to support the new event flow.
2025-09-09 21:58:16 -07:00
32 changed files with 938 additions and 516 deletions

View File

@ -1,132 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
}
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.waitForCondition(() -> {
Map<Integer, Uuid> voters = descVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
}

View File

@ -122,7 +122,7 @@ public class ConsumerIntegrationTest {
} }
}); });
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1,
5000, 5000,
"failed to poll data"); "failed to poll data");
} }
@ -266,10 +266,11 @@ public class ConsumerIntegrationTest {
consumer1.subscribe(List.of(topic)); consumer1.subscribe(List.of(topic));
consumer2.subscribe(List.of(topic)); consumer2.subscribe(List.of(topic));
Duration pollTimeout = Duration.ofMillis(100);
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000)); consumer0.poll(pollTimeout);
consumer1.poll(Duration.ofMillis(1000)); consumer1.poll(pollTimeout);
consumer2.poll(Duration.ofMillis(1000)); consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().isEmpty() && consumer1.assignment().isEmpty() &&
consumer2.assignment().isEmpty(); consumer2.assignment().isEmpty();
@ -284,9 +285,9 @@ public class ConsumerIntegrationTest {
); );
clusterInstance.waitTopicCreation(topic, 3); clusterInstance.waitTopicCreation(topic, 3);
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000)); consumer0.poll(pollTimeout);
consumer1.poll(Duration.ofMillis(1000)); consumer1.poll(pollTimeout);
consumer2.poll(Duration.ofMillis(1000)); consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) && consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().isEmpty(); consumer2.assignment().isEmpty();
@ -301,9 +302,9 @@ public class ConsumerIntegrationTest {
); );
clusterInstance.waitTopicCreation(topic, 6); clusterInstance.waitTopicCreation(topic, 6);
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000)); consumer0.poll(pollTimeout);
consumer1.poll(Duration.ofMillis(1000)); consumer1.poll(pollTimeout);
consumer2.poll(Duration.ofMillis(1000)); consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) && consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5))); consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
@ -325,9 +326,9 @@ public class ConsumerIntegrationTest {
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0))) new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
)).all().get(); )).all().get();
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer0.poll(Duration.ofMillis(1000)); consumer0.poll(pollTimeout);
consumer1.poll(Duration.ofMillis(1000)); consumer1.poll(pollTimeout);
consumer2.poll(Duration.ofMillis(1000)); consumer2.poll(pollTimeout);
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) && return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) && consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))); consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));

View File

@ -283,11 +283,13 @@ public class PlaintextConsumerCommitTest {
// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close. // In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close.
// However, in the CONSUMER protocol, the assignment may be changed outside a poll, so // However, in the CONSUMER protocol, the assignment may be changed outside a poll, so
// we need to poll once to ensure the interceptor is called. // we need to poll once to ensure the interceptor is called.
if (groupProtocol == GroupProtocol.CONSUMER) { TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
} return MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance;
},
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance); "Consumer.poll() did not invoke onCommit() before timeout elapse"
);
// verify commits are intercepted on close // verify commits are intercepted on close
var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();

View File

@ -66,7 +66,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT; import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
@ -810,7 +809,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages. // Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener(); var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener); consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP); var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once"); assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist. // Verify the metric exist.
@ -877,7 +876,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages. // Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener(); var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener); consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP); var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once"); assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist. // Verify the metric exist.
@ -944,7 +943,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP)); consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP); var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist. // Verify the metric exist.
Map<String, String> tags = Map.of( Map<String, String> tags = Map.of(
@ -958,7 +957,7 @@ public class PlaintextConsumerTest {
assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count()); assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count());
consumer.assign(List.of(tp2)); consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2); ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))); assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)));
} }
} }
@ -999,7 +998,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP)); consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP); var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist. // Verify the metric exist.
Map<String, String> tags = Map.of( Map<String, String> tags = Map.of(
@ -1014,7 +1013,7 @@ public class PlaintextConsumerTest {
var expectedLag = numMessages - records.count(); var expectedLag = numMessages - records.count();
assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag); assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag);
consumer.assign(List.of(tp2)); consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2); ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags))); assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags)));
assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))); assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)));
} }
@ -1058,7 +1057,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP)); consumer.assign(List.of(TP));
awaitNonEmptyRecords(consumer, TP); ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist. // Verify the metric exist.
Map<String, String> tags = Map.of( Map<String, String> tags = Map.of(
@ -1203,12 +1202,21 @@ public class PlaintextConsumerTest {
consumer3.assign(List.of(TP)); consumer3.assign(List.of(TP));
consumer3.seek(TP, 1); consumer3.seek(TP, 1);
var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count(); TestUtils.waitForCondition(
() -> consumer1.poll(Duration.ofMillis(5000)).count() == 3,
"consumer1 did not consume from earliest offset"
);
assertThrows(InvalidGroupIdException.class, consumer1::commitSync); assertThrows(InvalidGroupIdException.class, consumer1::commitSync);
assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP))); assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP)));
var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count(); TestUtils.waitForCondition(
var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count(); () -> consumer2.poll(Duration.ofMillis(5000)).count() == 0,
"Expected consumer2 to consume from latest offset"
);
TestUtils.waitForCondition(
() -> consumer3.poll(Duration.ofMillis(5000)).count() == 2,
"Expected consumer3 to consume from offset 1"
);
consumer1.unsubscribe(); consumer1.unsubscribe();
consumer2.unsubscribe(); consumer2.unsubscribe();
@ -1217,14 +1225,6 @@ public class PlaintextConsumerTest {
assertTrue(consumer1.assignment().isEmpty()); assertTrue(consumer1.assignment().isEmpty());
assertTrue(consumer2.assignment().isEmpty()); assertTrue(consumer2.assignment().isEmpty());
assertTrue(consumer3.assignment().isEmpty()); assertTrue(consumer3.assignment().isEmpty());
consumer1.close();
consumer2.close();
consumer3.close();
assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset");
assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset");
assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1");
} }
} }
@ -1654,7 +1654,7 @@ public class PlaintextConsumerTest {
consumer.subscribe(List.of(testTopic)); consumer.subscribe(List.of(testTopic));
// This is here to allow the consumer time to settle the group membership/assignment. // This is here to allow the consumer time to settle the group membership/assignment.
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0)); ConsumerPollTestUtils.waitForRecords(consumer);
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't // Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
// exceed the delay threshold defined above. // exceed the delay threshold defined above.
@ -1674,24 +1674,6 @@ public class PlaintextConsumerTest {
} }
} }
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp
) throws Exception {
AtomicReference<ConsumerRecords<byte[], byte[]>> result = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
var polledRecords = consumer.poll(Duration.ofSeconds(10));
boolean hasRecords = !polledRecords.isEmpty();
if (hasRecords) {
result.set(polledRecords);
}
return hasRecords;
}, "Timed out waiting for non-empty records from topic " + tp.topic() + " partition " + tp.partition());
return result.get();
}
public static class SerializerImpl implements Serializer<byte[]> { public static class SerializerImpl implements Serializer<byte[]> {
private final ByteArraySerializer serializer = new ByteArraySerializer(); private final ByteArraySerializer serializer = new ByteArraySerializer();

View File

@ -41,6 +41,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
@ -59,7 +60,6 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent; import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent; import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
@ -325,8 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the constructor // Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty(); private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private AsyncPollEvent inflightPoll;
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
@ -464,7 +463,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata, metadata,
subscriptions, subscriptions,
requestManagersSupplier); requestManagersSupplier
);
this.applicationEventHandler = applicationEventHandlerFactory.build( this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext, logContext,
time, time,
@ -623,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
new RebalanceCallbackMetricsManager(metrics) new RebalanceCallbackMetricsManager(metrics)
); );
ApiVersions apiVersions = new ApiVersions(); ApiVersions apiVersions = new ApiVersions();
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate( Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(
time, time,
config, config,
logContext, logContext,
@ -833,22 +833,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
} }
do { do {
PollEvent event = new PollEvent(timer.currentTimeMs());
// Make sure to let the background thread know that we are still polling.
// This will trigger async auto-commits of consumed positions when hitting
// the interval time or reconciling new assignments
applicationEventHandler.add(event);
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
// retrieve the positions to commit before proceeding with fetching new records
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
// We must not allow wake-ups between polling for fetches and returning the records. // We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling // If the polled fetches are not empty the consumed position has already been updated in the polling
// of the fetches. A wakeup between returned fetches and returning records would lead to never // of the fetches. A wakeup between returned fetches and returning records would lead to never
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup(); wakeupTrigger.maybeTriggerWakeup();
updateAssignmentMetadataIfNeeded(timer); checkInflightPoll(timer);
final Fetch<K, V> fetch = pollForFetches(timer); final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) { if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches // before returning the fetched records, we can send off the next round of fetches
@ -876,6 +867,71 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
} }
} }
/**
* {@code checkInflightPoll()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is
* called when no event is currently processing, it will start a new event processing asynchronously. A check
* is made during each invocation to see if the <em>inflight</em> event has completed. If it has, it will be
* processed accordingly.
*/
public void checkInflightPoll(Timer timer) {
boolean newlySubmittedEvent = false;
if (inflightPoll == null) {
inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), time.milliseconds());
newlySubmittedEvent = true;
if (log.isTraceEnabled()) {
log.trace(
"Submitting new inflight event {} with {} remaining on timer",
inflightPoll,
timer.remainingMs()
);
}
applicationEventHandler.add(inflightPoll);
}
try {
// Note: this is calling user-supplied code, so make sure that any errors thrown here are caught and
// the inflight event is cleared.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
if (inflightPoll.isComplete()) {
Optional<KafkaException> errorOpt = inflightPoll.error();
// The async poll event has completed, either successfully or not. In either case, clear out the
// inflight request.
log.trace("Inflight event {} completed, clearing", inflightPoll);
inflightPoll = null;
if (errorOpt.isPresent()) {
throw errorOpt.get();
}
} else if (!newlySubmittedEvent) {
if (timer.isExpired()) {
// The inflight event is expired...
log.trace("Inflight event {} expired without completing, clearing", inflightPoll);
inflightPoll = null;
} else {
if (log.isTraceEnabled()) {
log.trace(
"Inflight event {} is incomplete with {} remaining on timer",
inflightPoll,
timer.remainingMs()
);
}
}
}
} catch (Throwable t) {
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
// because the error effectively renders it complete.
log.debug("Inflight event {} failed due to {}, clearing", inflightPoll, String.valueOf(t));
inflightPoll = null;
throw ConsumerUtils.maybeWrapAsKafkaException(t);
}
}
/** /**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions. * partitions.
@ -1771,15 +1827,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return fetch; return fetch;
} }
// send any new fetches (won't resend pending fetches)
sendFetches(timer);
// We do not want to be stuck blocking in poll if we are missing some positions // We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure // since the offset lookup may be backing off after a failure
if (pollTimeout > retryBackoffMs) {
// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs; pollTimeout = retryBackoffMs;
} }
@ -1809,19 +1859,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
* of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and * of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and
* the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch} * the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch}
* for returning. * for returning.
*
* <p/>
*
* This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
*/ */
private Fetch<K, V> collectFetch() { private Fetch<K, V> collectFetch() {
final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); return fetchCollector.collectFetch(fetchBuffer);
// Notify the network thread to wake up and start the next round of fetching.
applicationEventHandler.wakeupNetworkThread();
return fetch;
} }
/** /**
@ -1834,11 +1874,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
* defined * defined
*/ */
private boolean updateFetchPositions(final Timer timer) { private boolean updateFetchPositions(final Timer timer) {
cachedSubscriptionHasAllFetchPositions = false;
try { try {
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer)); CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future()); wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent); applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
} catch (TimeoutException e) { } catch (TimeoutException e) {
return false; return false;
} finally { } finally {
@ -1856,41 +1895,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return groupMetadata.get().isPresent(); return groupMetadata.get().isPresent();
} }
/**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
*
* <p/>
*
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
* of the same name:
*
* <ul>
* <li>
* The method will wait for confirmation of the request creation before continuing.
* </li>
* <li>
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
* </li>
* <li>
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
* as it can handle requests that are created after the timeout.
* </li>
* </ul>
*
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
*/
private void sendFetches(Timer timer) {
try {
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
} catch (TimeoutException swallow) {
// Can be ignored, per above comments.
}
}
/** /**
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the * This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread * pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread

View File

@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser; import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractRequest;
@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -193,10 +194,13 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
try { try {
if (event instanceof CompletableEvent) { if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event); applicationEventReaper.add((CompletableEvent<?>) event);
}
// Check if there are any metadata errors and fail the CompletableEvent if an error is present. // Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state, // This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away. // so metadata errors need to be checked and handled right away.
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event)); if (event instanceof MetadataErrorNotifiableEvent) {
if (maybeFailOnMetadataError(List.of(event)))
continue;
} }
applicationEventProcessor.process(event); applicationEventProcessor.process(event);
} catch (Throwable t) { } catch (Throwable t) {
@ -368,18 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
/** /**
* If there is a metadata error, complete all uncompleted events that require subscription metadata. * If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/ */
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { private boolean maybeFailOnMetadataError(List<?> events) {
List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>(); List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
for (CompletableEvent<?> ce : events) { for (Object obj : events) {
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) if (obj instanceof MetadataErrorNotifiableEvent) {
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce); filteredEvents.add((MetadataErrorNotifiableEvent) obj);
}
} }
if (subscriptionMetadataEvent.isEmpty()) // Don't get-and-clear the metadata error if there are no events that will be notified.
return; if (filteredEvents.isEmpty())
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> return false;
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
); Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
if (andClearMetadataError.isPresent()) {
Exception metadataError = andClearMetadataError.get();
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
return true;
} else {
return false;
}
} }
} }

View File

@ -210,6 +210,30 @@ public class NetworkClientDelegate implements AutoCloseable {
} }
ClientRequest request = makeClientRequest(r, node, currentTimeMs); ClientRequest request = makeClientRequest(r, node, currentTimeMs);
if (!client.ready(node, currentTimeMs)) { if (!client.ready(node, currentTimeMs)) {
AuthenticationException authenticationException = client.authenticationException(node);
// The client may not be ready because it hit an unrecoverable authentication error. In that case, there's
// no benefit from retrying, so propagate the error here.
if (authenticationException != null) {
request.callback().onComplete(
new ClientResponse(
request.makeHeader(
request.requestBuilder().latestAllowedVersion()
),
request.callback(),
request.destination(),
request.createdTimeMs(),
currentTimeMs,
true,
null,
authenticationException,
null
)
);
return false;
}
// enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration // enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration
// of the event loop // of the event loop
log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r); log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r);
@ -471,4 +495,33 @@ public class NetworkClientDelegate implements AutoCloseable {
} }
}; };
} }
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
*/
public static Supplier<NetworkClientDelegate> supplier(final Time time,
final ConsumerConfig config,
final LogContext logContext,
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
return new CachedSupplier<>() {
@Override
protected NetworkClientDelegate create() {
return new NetworkClientDelegate(
time,
config,
logContext,
client,
metadata,
backgroundEventHandler,
notifyMetadataErrorsViaErrorQueue,
asyncConsumerMetrics
);
}
};
}
} }

View File

@ -50,6 +50,16 @@ public class OffsetCommitCallbackInvoker {
} }
} }
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return callbackQueue.size();
}
public void enqueueUserCallbackInvocation(final OffsetCommitCallback callback, public void enqueueUserCallbackInvocation(final OffsetCommitCallback callback,
final Map<TopicPartition, OffsetAndMetadata> offsets, final Map<TopicPartition, OffsetAndMetadata> offsets,
final Exception exception) { final Exception exception) {

View File

@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@ -384,7 +384,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
backgroundEventQueue, time, asyncConsumerMetrics); backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier = final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config, config,
@ -583,7 +583,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
do { do {
// Make sure the network thread can tell the application is actively polling // Make sure the network thread can tell the application is actively polling
applicationEventHandler.add(new PollEvent(timer.currentTimeMs())); applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
processBackgroundEvents(); processBackgroundEvents();

View File

@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> { public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements MetadataErrorNotifiableEvent {
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) { protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
super(type, deadlineMs); super(type, deadlineMs);
} }
@Override @Override
public boolean requireSubscriptionMetadata() { public void onMetadataError(Exception metadataError) {
return true; future().completeExceptionally(metadataError);
} }
} }

View File

@ -28,14 +28,14 @@ import java.util.Objects;
public abstract class ApplicationEvent { public abstract class ApplicationEvent {
public enum Type { public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE, UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE, COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG, PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE, SHARE_ACKNOWLEDGE_ON_CLOSE,
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,

View File

@ -20,11 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements; import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.IsolationLevel;
@ -45,6 +49,7 @@ import java.util.Map;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -53,6 +58,7 @@ import java.util.stream.Collectors;
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread} * An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
* which processes {@link ApplicationEvent application events} generated by the application thread. * which processes {@link ApplicationEvent application events} generated by the application thread.
*/ */
@SuppressWarnings({"ClassFanOutComplexity"})
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> { public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {
private final Logger log; private final Logger log;
@ -76,6 +82,14 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
@Override @Override
public void process(ApplicationEvent event) { public void process(ApplicationEvent event) {
switch (event.type()) { switch (event.type()) {
case ASYNC_POLL:
process((AsyncPollEvent) event);
return;
case SHARE_POLL:
process((SharePollEvent) event);
return;
case COMMIT_ASYNC: case COMMIT_ASYNC:
process((AsyncCommitEvent) event); process((AsyncCommitEvent) event);
return; return;
@ -84,10 +98,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
process((SyncCommitEvent) event); process((SyncCommitEvent) event);
return; return;
case POLL:
process((PollEvent) event);
return;
case FETCH_COMMITTED_OFFSETS: case FETCH_COMMITTED_OFFSETS:
process((FetchCommittedOffsetsEvent) event); process((FetchCommittedOffsetsEvent) event);
return; return;
@ -217,36 +227,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
} }
} }
private void process(final PollEvent event) { private void process(final SharePollEvent event) {
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts in the app thread
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true)); consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// all commit request generation points have been passed,
// so it's safe to notify the app thread could proceed and start fetching
event.markReconcileAndAutoCommitComplete();
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
} else {
// safe to unblock - no auto-commit risk here:
// 1. commitRequestManager is not present
// 2. shareConsumer has no auto-commit mechanism
event.markReconcileAndAutoCommitComplete();
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll(); ShareMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs()); hrm.resetPollTimer(event.pollTimeMs());
}); });
} }
}
private void process(final CreateFetchRequestsEvent event) { private void process(final CreateFetchRequestsEvent event) {
CompletableFuture<Void> future = requestManagers.fetchRequestManager.createFetchRequests(); CompletableFuture<Void> future = requestManagers.fetchRequestManager.createFetchRequests();
@ -352,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (subscriptions.subscribe(event.topics(), event.listener())) { if (subscriptions.subscribe(event.topics(), event.listener())) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
} }
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated(); requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
event.future().complete(null); event.future().complete(null);
} catch (Exception e) { } catch (Exception e) {
event.future().completeExceptionally(e); event.future().completeExceptionally(e);
@ -375,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
try { try {
subscriptions.subscribe(event.pattern(), event.listener()); subscriptions.subscribe(event.pattern(), event.listener());
metadata.requestUpdateForNewTopics(); metadata.requestUpdateForNewTopics();
updatePatternSubscription(metadata.fetch()); requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
});
event.future().complete(null); event.future().complete(null);
} catch (Exception e) { } catch (Exception e) {
event.future().completeExceptionally(e); event.future().completeExceptionally(e);
@ -409,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next poll. * This will make the consumer send the updated subscription on the next poll.
*/ */
private void process(final UpdatePatternSubscriptionEvent event) { private void process(final UpdatePatternSubscriptionEvent event) {
if (!subscriptions.hasPatternSubscription()) { requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(metadata.fetch());
}
event.future().complete(null); event.future().complete(null);
} }
@ -726,6 +713,75 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event); requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
} }
private void process(final AsyncPollEvent event) {
log.trace("Processing poll logic for {}", event);
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
ConsumerMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
StreamsMembershipManager membershipManager = hrm.membershipManager();
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
membershipManager.onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
log.trace("Processing check and update positions logic for {}", event);
CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError))
return;
log.trace("Processing create fetch requests logic for {}", event);
// Create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, fetchError))
return;
event.completeSuccessfully();
log.trace("Completed event processing for {}", event);
});
});
}
/**
* If there's an error to report to the user, the current event will be completed and this method will
* return {@code true}. Otherwise, it will return {@code false}.
*/
private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) {
if (t == null)
return false;
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
return false;
}
if (t instanceof CompletionException) {
t = t.getCause();
}
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
event.completeExceptionally(e);
log.trace("Failing event processing for {}", event, e);
return true;
}
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) { private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
return (value, exception) -> { return (value, exception) -> {
if (exception != null) if (exception != null)
@ -757,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}; };
} }
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
if (!subscriptions.hasPatternSubscription()) {
return;
}
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
updatePatternSubscription(callback, metadata.fetch());
}
}
/** /**
* This function evaluates the regex that the consumer subscribed to * This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates * against the list of topic names from metadata, and updates
@ -764,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
* *
* @param cluster Cluster from which we get the topics * @param cluster Cluster from which we get the topics
*/ */
private void updatePatternSubscription(Cluster cluster) { private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
log.warn("Group membership manager not present when processing a subscribe event");
return;
}
final Set<String> topicsToSubscribe = cluster.topics().stream() final Set<String> topicsToSubscribe = cluster.topics().stream()
.filter(subscriptions::matchesSubscribedPattern) .filter(subscriptions::matchesSubscribedPattern)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) { if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
} }
// Join the group if not already part of it, or just send the updated subscription // Join the group if not already part of it, or just send the updated subscription
// to the broker on the next poll. Note that this is done even if no topics matched // to the broker on the next poll. Note that this is done even if no topics matched
// the regex, to ensure the member joins the group if needed (with empty subscription). // the regex, to ensure the member joins the group if needed (with empty subscription).
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated(); callback.onSubscriptionUpdated();
} }
// Visible for testing // Visible for testing
int metadataVersionSnapshot() { int metadataVersionSnapshot() {
return metadataVersionSnapshot; return metadataVersionSnapshot;
} }
private interface OnSubscriptionUpdatedCallback {
void onSubscriptionUpdated();
}
} }

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import java.time.Duration;
import java.util.Optional;
/**
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
*
* <ul>
* <li>Polling</li>
* <li>{@link CheckAndUpdatePositionsEvent}</li>
* <li>{@link CreateFetchRequestsEvent}</li>
* </ul>
*
* {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a non-blocking design to ensure performance is
* at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event is submitted in {@code poll()}, but
* there are no blocking waits for the "result" of the event. Checks are made for the result at certain points, but
* they do not block. The logic for the previously-mentioned events is executed sequentially on the background thread.
*/
public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiableEvent {
private final long deadlineMs;
private final long pollTimeMs;
private volatile KafkaException error;
private volatile boolean isComplete;
/**
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
*
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
* {@link Duration} passed to {@link Consumer#poll(Duration)}
* @param pollTimeMs Time, in milliseconds, at which point the event was created
*/
public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
super(Type.ASYNC_POLL);
this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
}
public long deadlineMs() {
return deadlineMs;
}
public long pollTimeMs() {
return pollTimeMs;
}
public Optional<KafkaException> error() {
return Optional.ofNullable(error);
}
public boolean isComplete() {
return isComplete;
}
public void completeSuccessfully() {
isComplete = true;
}
public void completeExceptionally(KafkaException e) {
isComplete = true;
this.error = e;
}
@Override
public void onMetadataError(Exception metadataError) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
}
@Override
protected String toStringBase() {
return super.toStringBase() +
", deadlineMs=" + deadlineMs +
", pollTimeMs=" + pollTimeMs +
", error=" + error +
", isComplete=" + isComplete;
}
}

View File

@ -45,6 +45,16 @@ public class BackgroundEventHandler {
this.asyncConsumerMetrics = asyncConsumerMetrics; this.asyncConsumerMetrics = asyncConsumerMetrics;
} }
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return backgroundEventQueue.size();
}
/** /**
* Add a {@link BackgroundEvent} to the handler. * Add a {@link BackgroundEvent} to the handler.
* *

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events; package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -30,7 +31,7 @@ import java.time.Duration;
* The event completes with a boolean indicating if all assigned partitions have valid fetch positions * The event completes with a boolean indicating if all assigned partitions have valid fetch positions
* (based on {@link SubscriptionState#hasAllFetchPositions()}). * (based on {@link SubscriptionState#hasAllFetchPositions()}).
*/ */
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> { public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
public CheckAndUpdatePositionsEvent(long deadlineMs) { public CheckAndUpdatePositionsEvent(long deadlineMs) {
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs); super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
/** /**
* Indicates that this event requires subscription metadata to be present * Indicates that this event requires subscription metadata to be present
* for its execution. This is used to ensure that metadata errors are * for its execution. This is used to ensure that metadata errors are
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll} * handled correctly during the {@link Consumer#poll(Duration) poll}
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process. * or {@link Consumer#position(TopicPartition) position} process.
*/ */
@Override @Override
public boolean requireSubscriptionMetadata() { public void onMetadataError(Exception metadataError) {
return true; future().completeExceptionally(metadataError);
} }
} }

View File

@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
protected String toStringBase() { protected String toStringBase() {
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs; return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
} }
public boolean requireSubscriptionMetadata() {
return false;
}
} }

View File

@ -32,7 +32,7 @@ import java.util.Map;
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
* or equals to the target timestamp) * or equals to the target timestamp)
*/ */
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> { public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> implements MetadataErrorNotifiableEvent {
private final Map<TopicPartition, Long> timestampsToSearch; private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps; private final boolean requireTimestamps;
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
} }
@Override @Override
public boolean requireSubscriptionMetadata() { public void onMetadataError(Exception metadataError) {
return true; future().completeExceptionally(metadataError);
} }
@Override @Override

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
/**
* This interface is used for events that need to be notified when the
* {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
*/
public interface MetadataErrorNotifiableEvent {
/**
* The background thread detects metadata errors on every call to {@link NetworkClientDelegate#poll(long, long)}.
* {@link NetworkClientDelegate} calls {@link Metadata#maybeThrowAnyException()} and stores the result.
* The presence of a metadata error is checked in the {@link ConsumerNetworkThread}'s loop by calling
* {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two places in the loop in which the
* metadata error is checked:
*
* <ul>
* <li>
* At the very top of the {@link ConsumerNetworkThread}'s loop, the {@link ApplicationEventHandler}'s
* queue is drained. Before processing each event via
* {@link ApplicationEventProcessor#process(ApplicationEvent)}, if a metadata error occurred, this method
* will be invoked on the event if it implements this interface.
* <p/>
* <em>Note</em>: for an event on which this method is invoked, it will <em>not</em> be passed to the
* {@link ApplicationEventProcessor#process(ApplicationEvent)} method.
* </li>
* <li>
* At the very bottom of the {@link ConsumerNetworkThread}'s loop, the {@link CompletableEventReaper}
* is executed and any outstanding event is returned. If a metadata error occurred, this method
* will be invoked on all unexpired events if it implements this interface.
* </li>
* </ul>
*
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
*/
void onMetadataError(Exception metadataError);
}

View File

@ -16,28 +16,12 @@
*/ */
package org.apache.kafka.clients.consumer.internals.events; package org.apache.kafka.clients.consumer.internals.events;
import java.util.concurrent.CompletableFuture; public class SharePollEvent extends ApplicationEvent {
public class PollEvent extends ApplicationEvent {
private final long pollTimeMs; private final long pollTimeMs;
/** public SharePollEvent(final long pollTimeMs) {
* A future that represents the completion of reconciliation and auto-commit super(Type.SHARE_POLL);
* processing.
* This future is completed when all commit request generation points have
* been passed, including:
* <ul>
* <li>auto-commit on rebalance</li>
* <li>auto-commit on the interval</li>
* </ul>
* Once completed, it signals that it's safe for the consumer to proceed with
* fetching new records.
*/
private final CompletableFuture<Void> reconcileAndAutoCommit = new CompletableFuture<>();
public PollEvent(final long pollTimeMs) {
super(Type.POLL);
this.pollTimeMs = pollTimeMs; this.pollTimeMs = pollTimeMs;
} }
@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
return pollTimeMs; return pollTimeMs;
} }
public CompletableFuture<Void> reconcileAndAutoCommit() {
return reconcileAndAutoCommit;
}
public void markReconcileAndAutoCommitComplete() {
reconcileAndAutoCommit.complete(null);
}
@Override @Override
public String toStringBase() { public String toStringBase() {
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs; return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
/**
* This class provides utilities for tests to wait for a call to {@link Consumer#poll(Duration)} to produce a
* result (error, records, specific condition, etc.). This is mostly due to the subtle difference in behavior
* of the non-blocking {@link AsyncKafkaConsumer}. A single pass of {@link AsyncKafkaConsumer#poll(Duration)}
* may not be sufficient to provide an immediate result.
*/
public class ConsumerPollTestUtils {
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} to return records from the given {@link Consumer}.
*/
public static <T> ConsumerRecords<T, T> waitForRecords(Consumer<?, ?> consumer) {
Timer timer = Time.SYSTEM.timer(DEFAULT_MAX_WAIT_MS);
while (timer.notExpired()) {
@SuppressWarnings("unchecked")
ConsumerRecords<T, T> records = (ConsumerRecords<T, T>) consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty())
return records;
timer.update();
}
throw new TimeoutException("no records to return");
}
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to produce the side effect
* that causes {@link Supplier condition} to evaluate to {@code true}.
*/
public static void waitForCondition(Consumer<?, ?> consumer,
Supplier<Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
consumer.poll(Duration.ZERO);
return testCondition.get();
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
/**
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to throw an exception that,
* when tested against the {@link Function condition}, will evaluate to {@code true}.
*/
public static void waitForException(Consumer<?, ?> consumer,
Function<Throwable, Boolean> testCondition,
String conditionDetails) {
try {
TestUtils.waitForCondition(
() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (Throwable t) {
return testCondition.apply(t);
}
},
conditionDetails
);
} catch (InterruptedException e) {
throw new InterruptException(e);
}
}
}

View File

@ -935,7 +935,6 @@ public class KafkaConsumerTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(GroupProtocol.class) @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) { public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription); ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata); MockClient client = new MockClient(time, metadata);
@ -951,7 +950,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 50L))); client.prepareResponse(listOffsetsResponse(Map.of(tp0, 50L)));
client.prepareResponse(fetchResponse(tp0, 50L, 5)); client.prepareResponse(fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1)); ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(5, records.count()); assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0)); assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());
@ -1045,8 +1044,7 @@ public class KafkaConsumerTest {
}, fetchResponse(tp0, 50L, 5)); }, fetchResponse(tp0, 50L, 5));
@SuppressWarnings("unchecked") ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count()); assertEquals(5, records.count());
assertEquals(Set.of(tp0), records.partitions()); assertEquals(Set.of(tp0), records.partitions());
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());
@ -1065,7 +1063,7 @@ public class KafkaConsumerTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(value = GroupProtocol.class) @EnumSource(value = GroupProtocol.class)
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException { public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
ConsumerMetadata metadata = createMetadata(subscription); ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata); MockClient client = new MockClient(time, metadata);
@ -1081,15 +1079,14 @@ public class KafkaConsumerTest {
true, groupId, groupInstanceId, false); true, groupId, groupInstanceId, false);
consumer.assign(List.of(tp0)); consumer.assign(List.of(tp0));
if (groupProtocol == GroupProtocol.CONSUMER) { // Consumer.poll(0) needs to wait for the offset fetch event added by a call to poll, to be processed
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
// by the background thread, so it can realize there are no committed offsets and then // by the background thread, so it can realize there are no committed offsets and then
// throw the NoOffsetForPartitionException // throw the NoOffsetForPartitionException.
assertPollEventuallyThrows(consumer, NoOffsetForPartitionException.class, ConsumerPollTestUtils.waitForException(
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"); consumer,
} else { NoOffsetForPartitionException.class::isInstance,
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO)); "Consumer was not able to update fetch positions on continuous calls with 0 timeout"
} );
} }
@ParameterizedTest @ParameterizedTest
@ -1731,7 +1728,6 @@ public class KafkaConsumerTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(GroupProtocol.class) @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) { public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription); ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata); MockClient client = new MockClient(time, metadata);
@ -1766,7 +1762,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L))); client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1)); client.prepareResponse(fetchResponse(tp0, 10L, 1));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(100)); ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, records.count()); assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0)); assertEquals(11L, consumer.position(tp0));
@ -1825,8 +1821,7 @@ public class KafkaConsumerTest {
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L))); client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
client.prepareResponse(fetchResponse(tp0, 10L, 1)); client.prepareResponse(fetchResponse(tp0, 10L, 1));
@SuppressWarnings("unchecked") ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(1, records.count()); assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0)); assertEquals(11L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());
@ -2121,7 +2116,7 @@ public class KafkaConsumerTest {
time.sleep(heartbeatIntervalMs); time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs); Thread.sleep(heartbeatIntervalMs);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO); final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertFalse(records.isEmpty()); assertFalse(records.isEmpty());
assertFalse(records.nextOffsets().isEmpty()); assertFalse(records.nextOffsets().isEmpty());
} }
@ -2271,19 +2266,18 @@ public class KafkaConsumerTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(GroupProtocol.class) @EnumSource(GroupProtocol.class)
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws InterruptedException { public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol); final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
consumer.subscribe(Set.of(topic)); consumer.subscribe(Set.of(topic));
if (groupProtocol == GroupProtocol.CONSUMER) { // Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is authentication fail and then // by the background thread, so it can realize there is authentication fail and then
// throw the AuthenticationException // throw the AuthenticationException.
assertPollEventuallyThrows(consumer, AuthenticationException.class, ConsumerPollTestUtils.waitForException(
"this consumer was not able to discover metadata errors during continuous polling."); consumer,
} else { AuthenticationException.class::isInstance,
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO)); "this consumer was not able to discover metadata errors during continuous polling."
} );
} }
// TODO: this test triggers a bug with the CONSUMER group protocol implementation. // TODO: this test triggers a bug with the CONSUMER group protocol implementation.
@ -2655,7 +2649,6 @@ public class KafkaConsumerTest {
@ParameterizedTest @ParameterizedTest
@EnumSource(GroupProtocol.class) @EnumSource(GroupProtocol.class)
@SuppressWarnings("unchecked")
public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException { public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException {
final ConsumerMetadata metadata = createMetadata(subscription); final ConsumerMetadata metadata = createMetadata(subscription);
final MockClient client = new MockClient(time, metadata); final MockClient client = new MockClient(time, metadata);
@ -2670,9 +2663,11 @@ public class KafkaConsumerTest {
consumer.assign(Set.of(tp0)); consumer.assign(Set.of(tp0));
// poll once to update with the current metadata // poll once to update with the current metadata
consumer.poll(Duration.ofMillis(0)); ConsumerPollTestUtils.waitForCondition(
TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), consumer,
"No metadata requests sent"); () -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
"No metadata requests sent"
);
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
// no error for no current position // no error for no current position
@ -2685,13 +2680,12 @@ public class KafkaConsumerTest {
} }
// poll once again, which should send the list-offset request // poll once again, which should send the list-offset request
consumer.seek(tp0, 50L); consumer.seek(tp0, 50L);
consumer.poll(Duration.ofMillis(0)); // requests: list-offset
// requests: list-offset, fetch ConsumerPollTestUtils.waitForCondition(
TestUtils.waitForCondition(() -> { consumer,
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); () -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); "No list-offset sent"
return hasListOffsetRequest && hasFetchRequest; );
}, "No list-offset & fetch request sent");
// no error for no end offset (so unknown lag) // no error for no end offset (so unknown lag)
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
@ -2700,7 +2694,12 @@ public class KafkaConsumerTest {
// and hence next call would return correct lag result // and hence next call would return correct lag result
ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS); ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(tp0, 90L))); client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(tp0, 90L)));
consumer.poll(Duration.ofMillis(0)); // requests: fetch
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> requestGenerated(client, ApiKeys.FETCH),
"No fetch sent"
);
// For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated. // For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated.
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
@ -2715,7 +2714,7 @@ public class KafkaConsumerTest {
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
client.respondToRequest(fetchRequest, fetchResponse(Map.of(tp0, fetchInfo))); client.respondToRequest(fetchRequest, fetchResponse(Map.of(tp0, fetchInfo)));
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1)); final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(5, records.count()); assertEquals(5, records.count());
assertEquals(55L, consumer.position(tp0)); assertEquals(55L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());
@ -3194,27 +3193,14 @@ public class KafkaConsumerTest {
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId); KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(Set.of(invalidTopicName), getConsumerRebalanceListener(consumer)); consumer.subscribe(Set.of(invalidTopicName), getConsumerRebalanceListener(consumer));
if (groupProtocol == GroupProtocol.CONSUMER) { // Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
// by the background thread, so it can realize there is invalid topics and then // by the background thread, so it can realize there is invalid topics and then
// throw the InvalidTopicException // throw the InvalidTopicException.
assertPollEventuallyThrows(consumer, InvalidTopicException.class, ConsumerPollTestUtils.waitForException(
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"); consumer,
} else { InvalidTopicException.class::isInstance,
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO)); "Consumer was not able to update fetch positions on continuous calls with 0 timeout"
} );
}
private static <T extends Throwable> void assertPollEventuallyThrows(KafkaConsumer<?, ?> consumer,
Class<T> expectedException, String errMsg) throws InterruptedException {
TestUtils.waitForCondition(() -> {
try {
consumer.poll(Duration.ZERO);
return false;
} catch (Throwable exception) {
return expectedException.isInstance(exception);
}
}, errMsg);
} }
@ParameterizedTest @ParameterizedTest
@ -3654,7 +3640,11 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
service.execute(() -> consumer.poll(Duration.ofSeconds(5))); service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
try { try {
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ZERO)); ConsumerPollTestUtils.waitForException(
consumer,
t -> t instanceof ConcurrentModificationException,
"Consumer did not throw ConcurrentModificationException within timeout"
);
client.wakeup(); client.wakeup();
consumer.wakeup(); consumer.wakeup();
} finally { } finally {

View File

@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
asyncConsumerMetrics asyncConsumerMetrics
)) { )) {
// add event // add event
applicationEventHandler.add(new PollEvent(time.milliseconds())); applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds()));
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
} }
} }

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CloseOptions; import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPollTestUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent; import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
@ -43,13 +45,11 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableApplication
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
@ -112,6 +112,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
@ -154,6 +155,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -424,7 +426,7 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup(); consumer.wakeup();
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
} }
@ -444,7 +446,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
} }
@ -468,7 +470,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call // the previously ignored wake-up should not be ignored in the next call
@ -505,9 +507,12 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully(); completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener); consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO); ConsumerPollTestUtils.waitForCondition(
assertTrue(callbackExecuted.get()); consumer,
callbackExecuted::get,
"Consumer.poll() did not execute callback within timeout"
);
} }
@Test @Test
@ -527,7 +532,7 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(tp)); consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -673,8 +678,12 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback); ConsumerPollTestUtils.waitForCondition(
consumer,
() -> callback.invoked == 1 && callback.exception == null,
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
);
} }
@Test @Test
@ -1455,7 +1464,7 @@ public class AsyncKafkaConsumerTest {
int expectedRevokedCount, int expectedRevokedCount,
int expectedAssignedCount, int expectedAssignedCount,
int expectedLostCount, int expectedLostCount,
Optional<RuntimeException> expectedException Optional<RuntimeException> expectedExceptionOpt
) { ) {
consumer = newConsumer(); consumer = newConsumer();
CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(
@ -1473,13 +1482,18 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(e); backgroundEventQueue.add(e);
} }
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
// This will trigger the background event queue to process our background event message. // This will trigger the background event queue to process our background event message.
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
if (expectedException.isPresent()) { if (expectedExceptionOpt.isPresent()) {
Exception exception = assertThrows(expectedException.get().getClass(), () -> consumer.poll(Duration.ZERO)); Exception expectedException = expectedExceptionOpt.get();
assertEquals(expectedException.get().getMessage(), exception.getMessage()); ConsumerPollTestUtils.waitForException(
assertEquals(expectedException.get().getCause(), exception.getCause()); consumer,
t -> Objects.equals(t.getClass(), expectedException.getClass()) &&
Objects.equals(t.getMessage(), expectedException.getMessage()) &&
Objects.equals(t.getCause(), expectedException.getCause()),
"Consumer.poll() did not throw the expected exception " + expectedException
);
} else { } else {
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -1543,10 +1557,12 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent); backgroundEventQueue.add(errorEvent);
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0))); consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); ConsumerPollTestUtils.waitForException(
consumer,
assertEquals(expectedException.getMessage(), exception.getMessage()); t -> t.getMessage().equals(expectedException.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
);
} }
@Test @Test
@ -1562,10 +1578,12 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue.add(errorEvent2); backgroundEventQueue.add(errorEvent2);
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0))); consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); ConsumerPollTestUtils.waitForException(
consumer,
assertEquals(expectedException1.getMessage(), exception.getMessage()); t -> t.getMessage().equals(expectedException1.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
);
assertTrue(backgroundEventQueue.isEmpty()); assertTrue(backgroundEventQueue.isEmpty());
} }
@ -1645,10 +1663,9 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully(); completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1")); consumer.subscribe(singletonList("topic1"));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ofMillis(100)); consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class)); verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class));
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
} }
private Properties requiredConsumerConfigAndGroupId(final String groupId) { private Properties requiredConsumerConfigAndGroupId(final String groupId) {
@ -1664,11 +1681,8 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully(); completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1))); consumer.assign(singleton(new TopicPartition("t1", 1)));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
} }
@Test @Test
@ -1701,7 +1715,7 @@ public class AsyncKafkaConsumerTest {
).when(fetchCollector).collectFetch(any(FetchBuffer.class)); ).when(fetchCollector).collectFetch(any(FetchBuffer.class));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
// And then poll for up to 10000ms, which should return 2 records without timing out // And then poll for up to 10000ms, which should return 2 records without timing out
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000)); ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count()); assertEquals(2, returnedRecords.count());
@ -1805,7 +1819,7 @@ public class AsyncKafkaConsumerTest {
// interrupt the thread and call poll // interrupt the thread and call poll
try { try {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
} finally { } finally {
// clear interrupted state again since this thread may be reused by JUnit // clear interrupted state again since this thread may be reused by JUnit
@ -1837,8 +1851,13 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully(); completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList("topic")); consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO);
ConsumerPollTestUtils.waitForCondition(
consumer,
() -> backgroundEventReaper.size() == 0,
"Consumer.poll() did not reap background events within timeout"
);
verify(backgroundEventReaper).reap(time.milliseconds()); verify(backgroundEventReaper).reap(time.milliseconds());
} }
@ -1900,7 +1919,7 @@ public class AsyncKafkaConsumerTest {
completeUnsubscribeApplicationEventSuccessfully(); completeUnsubscribeApplicationEventSuccessfully();
consumer.assign(singleton(new TopicPartition("topic1", 0))); consumer.assign(singleton(new TopicPartition("topic1", 0)));
markReconcileAndAutoCommitCompleteForPollEvent(); completeAsyncPollEventSuccessfully();
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@ -1908,7 +1927,6 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Pattern.compile("t*")); consumer.subscribe(Pattern.compile("t*"));
consumer.poll(Duration.ZERO); consumer.poll(Duration.ZERO);
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
} }
@Test @Test
@ -2275,11 +2293,11 @@ public class AsyncKafkaConsumerTest {
} }
} }
private void markReconcileAndAutoCommitCompleteForPollEvent() { private void completeAsyncPollEventSuccessfully() {
doAnswer(invocation -> { doAnswer(invocation -> {
PollEvent event = invocation.getArgument(0); AsyncPollEvent event = invocation.getArgument(0);
event.markReconcileAndAutoCommitComplete(); event.completeSuccessfully();
return null; return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
} }
} }

View File

@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
)) { )) {
consumerNetworkThread.initializeResources(); consumerNetworkThread.initializeResources();
PollEvent event = new PollEvent(0); AsyncPollEvent event = new AsyncPollEvent(10, 0);
event.setEnqueuedMs(time.milliseconds()); event.setEnqueuedMs(time.milliseconds());
applicationEventQueue.add(event); applicationEventQueue.add(event);
asyncConsumerMetrics.recordApplicationEventQueueSize(1); asyncConsumerMetrics.recordApplicationEventQueueSize(1);

View File

@ -24,11 +24,11 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
@ -680,7 +680,7 @@ public class ShareConsumerImplTest {
consumer.subscribe(subscriptionTopic); consumer.subscribe(subscriptionTopic);
consumer.poll(Duration.ofMillis(100)); consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(PollEvent.class)); verify(applicationEventHandler).add(any(SharePollEvent.class));
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class)); verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); completeShareAcknowledgeOnCloseApplicationEventSuccessfully();

View File

@ -171,7 +171,7 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() { private static Stream<Arguments> applicationEvents() {
return Stream.of( return Stream.of(
Arguments.of(new PollEvent(100)), Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)),
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))), Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
Arguments.of(new CheckAndUpdatePositionsEvent(500)), Arguments.of(new CheckAndUpdatePositionsEvent(500)),
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
@ -265,12 +265,12 @@ public class ApplicationEventProcessorTest {
@Test @Test
public void testPollEvent() { public void testPollEvent() {
PollEvent event = new PollEvent(12345); AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
setupProcessor(true); setupProcessor(true);
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(new CompletableFuture<>());
processor.process(event); processor.process(event);
assertTrue(event.reconcileAndAutoCommit().isDone());
verify(commitRequestManager).updateTimerAndMaybeCommit(12345); verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
verify(membershipManager).onConsumerPoll(); verify(membershipManager).onConsumerPoll();
verify(heartbeatRequestManager).resetPollTimer(12345); verify(heartbeatRequestManager).resetPollTimer(12345);

View File

@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val consumer = createConsumer() val consumer = createConsumer()
consumer.assign(java.util.List.of(tp)) consumer.assign(java.util.List.of(tp))
assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer))
} }
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)

View File

@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer} import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig} import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig}
@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
try { try {
consumer.assign(util.Set.of(tp)) consumer.assign(util.Set.of(tp))
consumer.seekToBeginning(util.Set.of(tp)) consumer.seekToBeginning(util.Set.of(tp))
val records = consumer.poll(time.Duration.ofSeconds(3)) def verifyRecordCount(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
assertEquals(expectedNumber, records.count()) expectedNumber == records.count()
}
TestUtils.pollRecordsUntilTrue(
consumer,
verifyRecordCount,
s"Consumer.poll() did not return the expected number of records ($expectedNumber) within the timeout",
pollTimeoutMs = 3000
)
} finally consumer.close() } finally consumer.close()
} }
@ -4585,7 +4592,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareRecords(testTopicName) prepareRecords(testTopicName)
// Producer sends messages // Producer sends messages
for (i <- 1 to 20) { val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send( val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4594,18 +4603,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic") }, "Fail to produce record to topic")
} }
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup( val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName, inputTopic = testTopicName,
streamsGroupId = streamsGroupId, streamsGroupId = streamsGroupId,
) )
try { try {
TestUtils.waitUntilTrue(() => { var counter = 0
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
}, "Consumer not assigned to partitions") counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync() streams.commitSync()
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
@ -4645,7 +4664,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareTopics(List(testTopicName), testNumPartitions) prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName) prepareRecords(testTopicName)
// Producer sends messages // Producer sends messages
for (i <- 1 to 20) { val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send( val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4654,18 +4675,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic") }, "Fail to produce record to topic")
} }
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup( val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName, inputTopic = testTopicName,
streamsGroupId = streamsGroupId, streamsGroupId = streamsGroupId,
) )
try { try {
TestUtils.waitUntilTrue(() => { var counter = 0
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
}, "Consumer not assigned to partitions") counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync() streams.commitSync()
// List streams group offsets // List streams group offsets
@ -4722,7 +4753,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
prepareRecords(testTopicName) prepareRecords(testTopicName)
// Producer sends messages // Producer sends messages
for (i <- 1 to 20) { val numRecords = 20
for (i <- 1 to numRecords) {
TestUtils.waitUntilTrue(() => { TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send( val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes())) new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
@ -4731,18 +4764,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}, "Fail to produce record to topic") }, "Fail to produce record to topic")
} }
val consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val streams = createStreamsGroup( val streams = createStreamsGroup(
configOverrides = consumerConfig,
inputTopic = testTopicName, inputTopic = testTopicName,
streamsGroupId = streamsGroupId, streamsGroupId = streamsGroupId,
) )
try { try {
TestUtils.waitUntilTrue(() => { var counter = 0
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
}, "Consumer not assigned to partitions") counter += records.count()
counter >= numRecords
}
TestUtils.pollRecordsUntilTrue(
streams,
verifyRecordCount,
s"Consumer not assigned to partitions"
)
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync() streams.commitSync()
// List streams group offsets // List streams group offsets

View File

@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
} }
private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = { private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = {
verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000))) val startMs = System.currentTimeMillis
TestUtils.pollUntilException(
consumer,
_ => true,
s"Consumer.poll() did not throw an exception within the timeout",
pollTimeoutMs = 1000
)
val elapsedMs = System.currentTimeMillis - startMs
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
verifyAuthenticationException(consumer.partitionsFor(topic)) verifyAuthenticationException(consumer.partitionsFor(topic))
createClientCredential() createClientCredential()
val producer = createProducer() val producer = createProducer()
verifyWithRetry(sendOneRecord(producer))() verifyWithRetry(sendOneRecord(producer))()
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1) TestUtils.waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(1000)).count() == 1
} catch {
case _:Throwable => false
}
}, msg = s"Consumer.poll() did not read the expected number of records within the timeout")
} }
@Test @Test

View File

@ -19,7 +19,6 @@
package kafka.server package kafka.server
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.time.Duration
import java.util.Properties import java.util.Properties
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import javax.security.auth.login.LoginContext import javax.security.auth.login.LoginContext
@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
consumer.assign(java.util.List.of(tp)) consumer.assign(java.util.List.of(tp))
val startMs = System.currentTimeMillis() val startMs = System.currentTimeMillis()
assertThrows(classOf[SaslAuthenticationException], () => consumer.poll(Duration.ofMillis(50))) TestUtils.pollUntilException(
consumer,
t => t.isInstanceOf[SaslAuthenticationException],
"Consumer.poll() did not trigger a SaslAuthenticationException within timeout",
pollTimeoutMs = 50
)
val endMs = System.currentTimeMillis() val endMs = System.currentTimeMillis()
require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client") require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client")
consumer.close() consumer.close()

View File

@ -690,6 +690,21 @@ object TestUtils extends Logging {
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs) }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
} }
def pollUntilException(consumer: Consumer[_, _],
action: Throwable => Boolean,
msg: => String,
waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
pollTimeoutMs: Long = 100): Unit = {
waitUntilTrue(() => {
try {
consumer.poll(Duration.ofMillis(pollTimeoutMs))
false
} catch {
case t: Throwable => action(t)
}
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
}
def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V], def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
action: ConsumerRecords[K, V] => Boolean, action: ConsumerRecords[K, V] => Boolean,
msg: => String, msg: => String,

View File

@ -15,13 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.server; package org.apache.kafka.server;
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata; import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint; import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.TestKitDefaults;
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest { public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws Exception { static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get(); FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) { ).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
}); });
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) { ).setStandalone(true).build()) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel()); checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
}); });
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) { ) {
cluster.format(); cluster.format();
cluster.startup(); cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) { try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin); Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
} }
} }
} }
@Test
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
}
}
}
@Test
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
final var nodes = new TestKitNodes.Builder()
.setClusterId("test-cluster")
.setNumBrokerNodes(1)
.setNumControllerNodes(3)
.build();
final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
try (var admin = Admin.create(cluster.clientProperties())) {
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
dirId,
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
}
}
}
} }