mirror of https://github.com/apache/kafka.git
Compare commits
90 Commits
e14101ef06
...
4305cfd51f
Author | SHA1 | Date |
---|---|---|
|
4305cfd51f | |
|
f660b15e52 | |
|
4a5aa37169 | |
|
5215030a4c | |
|
68f5bc19d3 | |
|
2e5a982c1d | |
|
4985c7de17 | |
|
e7b53865fd | |
|
91409bc9d3 | |
|
d939675641 | |
|
b04b0a2c73 | |
|
19ce01afa3 | |
|
5be72105ba | |
|
fa784e5f25 | |
|
cac80ef38f | |
|
b5f4c8683c | |
|
dcbe761869 | |
|
ddf042306c | |
|
f4eb1619fd | |
|
767316ba60 | |
|
eace3ee1d6 | |
|
7ca4cc835e | |
|
e1cf7b7056 | |
|
a8ccdb6f48 | |
|
9fb9ee9e6e | |
|
461ffdd9b0 | |
|
2b2f70c36e | |
|
a233e90d5d | |
|
0256bdf274 | |
|
f6864a3ac4 | |
|
5041a36e6a | |
|
5d1a34d27f | |
|
e9dbc61bff | |
|
bc660d6462 | |
|
7ba60474c0 | |
|
cc49db81ba | |
|
ab42fca4f4 | |
|
0aed4aff89 | |
|
5c99d81b18 | |
|
72472c5fc6 | |
|
164dfdd5ff | |
|
a157071a0d | |
|
8235ed2256 | |
|
f45b70e688 | |
|
bfcd7ec0f8 | |
|
52c08455c1 | |
|
5af8fc4357 | |
|
985bbd7582 | |
|
1f1ae24538 | |
|
2aaca8db9d | |
|
8b33f081ce | |
|
91e881f2b4 | |
|
2302427116 | |
|
b1937702d2 | |
|
2d21fa0fdf | |
|
1e52282c41 | |
|
1570f6997f | |
|
abaa4dc639 | |
|
f40a4ac27f | |
|
71120224f4 | |
|
81598844bd | |
|
81b707e745 | |
|
702b25753b | |
|
99304db9e8 | |
|
9d65fa22f5 | |
|
56062f5b01 | |
|
ea99a13021 | |
|
18f4fa11f3 | |
|
2c3547e06a | |
|
6775aacc2c | |
|
ae0ddcc4c0 | |
|
0ac19f96b9 | |
|
00f9069c98 | |
|
524782cd79 | |
|
c6a7923280 | |
|
784aad2d4c | |
|
529aab3316 | |
|
3e0b920399 | |
|
40f6754810 | |
|
aaefbeff32 | |
|
d253b847e6 | |
|
464d5bafb9 | |
|
5e794ce079 | |
|
09f8cb57cf | |
|
dbc4773a34 | |
|
d3fa910d10 | |
|
9fd7e5870a | |
|
d4802c78e3 | |
|
b5d7d01dbc | |
|
34932e2222 |
|
@ -1,132 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.admin;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
@Tag("integration")
|
||||
public class ReconfigurableQuorumIntegrationTest {
|
||||
|
||||
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
|
||||
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
|
||||
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.waitForCondition(() -> {
|
||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
||||
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
|
||||
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
TestUtils.waitForCondition(() -> {
|
||||
Map<Integer, Uuid> voters = descVoterDirs(admin);
|
||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
|
||||
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
|
||||
|
||||
admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
var removeFuture = admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||
|
||||
var addFuture = admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -122,7 +122,7 @@ public class ConsumerIntegrationTest {
|
|||
}
|
||||
});
|
||||
|
||||
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1,
|
||||
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1,
|
||||
5000,
|
||||
"failed to poll data");
|
||||
}
|
||||
|
@ -266,10 +266,11 @@ public class ConsumerIntegrationTest {
|
|||
consumer1.subscribe(List.of(topic));
|
||||
consumer2.subscribe(List.of(topic));
|
||||
|
||||
Duration pollTimeout = Duration.ofMillis(100);
|
||||
TestUtils.waitForCondition(() -> {
|
||||
consumer0.poll(Duration.ofMillis(1000));
|
||||
consumer1.poll(Duration.ofMillis(1000));
|
||||
consumer2.poll(Duration.ofMillis(1000));
|
||||
consumer0.poll(pollTimeout);
|
||||
consumer1.poll(pollTimeout);
|
||||
consumer2.poll(pollTimeout);
|
||||
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
|
||||
consumer1.assignment().isEmpty() &&
|
||||
consumer2.assignment().isEmpty();
|
||||
|
@ -284,9 +285,9 @@ public class ConsumerIntegrationTest {
|
|||
);
|
||||
clusterInstance.waitTopicCreation(topic, 3);
|
||||
TestUtils.waitForCondition(() -> {
|
||||
consumer0.poll(Duration.ofMillis(1000));
|
||||
consumer1.poll(Duration.ofMillis(1000));
|
||||
consumer2.poll(Duration.ofMillis(1000));
|
||||
consumer0.poll(pollTimeout);
|
||||
consumer1.poll(pollTimeout);
|
||||
consumer2.poll(pollTimeout);
|
||||
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
|
||||
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
|
||||
consumer2.assignment().isEmpty();
|
||||
|
@ -301,9 +302,9 @@ public class ConsumerIntegrationTest {
|
|||
);
|
||||
clusterInstance.waitTopicCreation(topic, 6);
|
||||
TestUtils.waitForCondition(() -> {
|
||||
consumer0.poll(Duration.ofMillis(1000));
|
||||
consumer1.poll(Duration.ofMillis(1000));
|
||||
consumer2.poll(Duration.ofMillis(1000));
|
||||
consumer0.poll(pollTimeout);
|
||||
consumer1.poll(pollTimeout);
|
||||
consumer2.poll(pollTimeout);
|
||||
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) &&
|
||||
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) &&
|
||||
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5)));
|
||||
|
@ -325,9 +326,9 @@ public class ConsumerIntegrationTest {
|
|||
new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0)))
|
||||
)).all().get();
|
||||
TestUtils.waitForCondition(() -> {
|
||||
consumer0.poll(Duration.ofMillis(1000));
|
||||
consumer1.poll(Duration.ofMillis(1000));
|
||||
consumer2.poll(Duration.ofMillis(1000));
|
||||
consumer0.poll(pollTimeout);
|
||||
consumer1.poll(pollTimeout);
|
||||
consumer2.poll(pollTimeout);
|
||||
return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) &&
|
||||
consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) &&
|
||||
consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
|
||||
|
|
|
@ -283,11 +283,13 @@ public class PlaintextConsumerCommitTest {
|
|||
// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close.
|
||||
// However, in the CONSUMER protocol, the assignment may be changed outside a poll, so
|
||||
// we need to poll once to ensure the interceptor is called.
|
||||
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||
consumer.poll(Duration.ZERO);
|
||||
}
|
||||
|
||||
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance);
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance;
|
||||
},
|
||||
"Consumer.poll() did not invoke onCommit() before timeout elapse"
|
||||
);
|
||||
|
||||
// verify commits are intercepted on close
|
||||
var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
|
||||
|
|
|
@ -66,7 +66,6 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
|
||||
|
@ -810,7 +809,7 @@ public class PlaintextConsumerTest {
|
|||
// Create a consumer and consumer some messages.
|
||||
var listener = new TestConsumerReassignmentListener();
|
||||
consumer.subscribe(List.of(TOPIC, topic2), listener);
|
||||
var records = awaitNonEmptyRecords(consumer, TP);
|
||||
var records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(1, listener.callsToAssigned, "should be assigned once");
|
||||
|
||||
// Verify the metric exist.
|
||||
|
@ -877,7 +876,7 @@ public class PlaintextConsumerTest {
|
|||
// Create a consumer and consumer some messages.
|
||||
var listener = new TestConsumerReassignmentListener();
|
||||
consumer.subscribe(List.of(TOPIC, topic2), listener);
|
||||
var records = awaitNonEmptyRecords(consumer, TP);
|
||||
var records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(1, listener.callsToAssigned, "should be assigned once");
|
||||
|
||||
// Verify the metric exist.
|
||||
|
@ -944,7 +943,7 @@ public class PlaintextConsumerTest {
|
|||
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
|
||||
|
||||
consumer.assign(List.of(TP));
|
||||
var records = awaitNonEmptyRecords(consumer, TP);
|
||||
var records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
|
||||
// Verify the metric exist.
|
||||
Map<String, String> tags = Map.of(
|
||||
|
@ -958,7 +957,7 @@ public class PlaintextConsumerTest {
|
|||
assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count());
|
||||
|
||||
consumer.assign(List.of(tp2));
|
||||
awaitNonEmptyRecords(consumer, tp2);
|
||||
ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)));
|
||||
}
|
||||
}
|
||||
|
@ -999,7 +998,7 @@ public class PlaintextConsumerTest {
|
|||
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
|
||||
|
||||
consumer.assign(List.of(TP));
|
||||
var records = awaitNonEmptyRecords(consumer, TP);
|
||||
var records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
|
||||
// Verify the metric exist.
|
||||
Map<String, String> tags = Map.of(
|
||||
|
@ -1014,7 +1013,7 @@ public class PlaintextConsumerTest {
|
|||
var expectedLag = numMessages - records.count();
|
||||
assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag);
|
||||
consumer.assign(List.of(tp2));
|
||||
awaitNonEmptyRecords(consumer, tp2);
|
||||
ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags)));
|
||||
assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)));
|
||||
}
|
||||
|
@ -1058,7 +1057,7 @@ public class PlaintextConsumerTest {
|
|||
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
|
||||
|
||||
consumer.assign(List.of(TP));
|
||||
awaitNonEmptyRecords(consumer, TP);
|
||||
ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
|
||||
// Verify the metric exist.
|
||||
Map<String, String> tags = Map.of(
|
||||
|
@ -1203,12 +1202,21 @@ public class PlaintextConsumerTest {
|
|||
consumer3.assign(List.of(TP));
|
||||
consumer3.seek(TP, 1);
|
||||
|
||||
var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count();
|
||||
TestUtils.waitForCondition(
|
||||
() -> consumer1.poll(Duration.ofMillis(5000)).count() == 3,
|
||||
"consumer1 did not consume from earliest offset"
|
||||
);
|
||||
assertThrows(InvalidGroupIdException.class, consumer1::commitSync);
|
||||
assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP)));
|
||||
|
||||
var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count();
|
||||
var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count();
|
||||
TestUtils.waitForCondition(
|
||||
() -> consumer2.poll(Duration.ofMillis(5000)).count() == 0,
|
||||
"Expected consumer2 to consume from latest offset"
|
||||
);
|
||||
TestUtils.waitForCondition(
|
||||
() -> consumer3.poll(Duration.ofMillis(5000)).count() == 2,
|
||||
"Expected consumer3 to consume from offset 1"
|
||||
);
|
||||
|
||||
consumer1.unsubscribe();
|
||||
consumer2.unsubscribe();
|
||||
|
@ -1217,14 +1225,6 @@ public class PlaintextConsumerTest {
|
|||
assertTrue(consumer1.assignment().isEmpty());
|
||||
assertTrue(consumer2.assignment().isEmpty());
|
||||
assertTrue(consumer3.assignment().isEmpty());
|
||||
|
||||
consumer1.close();
|
||||
consumer2.close();
|
||||
consumer3.close();
|
||||
|
||||
assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset");
|
||||
assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset");
|
||||
assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1654,7 +1654,7 @@ public class PlaintextConsumerTest {
|
|||
consumer.subscribe(List.of(testTopic));
|
||||
|
||||
// This is here to allow the consumer time to settle the group membership/assignment.
|
||||
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
|
||||
ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
|
||||
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
|
||||
// exceed the delay threshold defined above.
|
||||
|
@ -1674,24 +1674,6 @@ public class PlaintextConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
|
||||
Consumer<byte[], byte[]> consumer,
|
||||
TopicPartition tp
|
||||
) throws Exception {
|
||||
AtomicReference<ConsumerRecords<byte[], byte[]>> result = new AtomicReference<>();
|
||||
|
||||
TestUtils.waitForCondition(() -> {
|
||||
var polledRecords = consumer.poll(Duration.ofSeconds(10));
|
||||
boolean hasRecords = !polledRecords.isEmpty();
|
||||
if (hasRecords) {
|
||||
result.set(polledRecords);
|
||||
}
|
||||
return hasRecords;
|
||||
}, "Timed out waiting for non-empty records from topic " + tp.topic() + " partition " + tp.partition());
|
||||
|
||||
return result.get();
|
||||
}
|
||||
|
||||
public static class SerializerImpl implements Serializer<byte[]> {
|
||||
private final ByteArraySerializer serializer = new ByteArraySerializer();
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
|
||||
|
@ -59,7 +60,6 @@ import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsE
|
|||
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
|
||||
|
@ -325,8 +325,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// Init value is needed to avoid NPE in case of exception raised in the constructor
|
||||
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
|
||||
|
||||
// to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates
|
||||
private boolean cachedSubscriptionHasAllFetchPositions;
|
||||
private AsyncPollEvent inflightPoll;
|
||||
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
|
||||
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
|
||||
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
||||
|
@ -464,7 +463,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
|
||||
metadata,
|
||||
subscriptions,
|
||||
requestManagersSupplier);
|
||||
requestManagersSupplier
|
||||
);
|
||||
this.applicationEventHandler = applicationEventHandlerFactory.build(
|
||||
logContext,
|
||||
time,
|
||||
|
@ -623,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
new RebalanceCallbackMetricsManager(metrics)
|
||||
);
|
||||
ApiVersions apiVersions = new ApiVersions();
|
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> new NetworkClientDelegate(
|
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(
|
||||
time,
|
||||
config,
|
||||
logContext,
|
||||
|
@ -833,22 +833,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
|
||||
do {
|
||||
PollEvent event = new PollEvent(timer.currentTimeMs());
|
||||
// Make sure to let the background thread know that we are still polling.
|
||||
// This will trigger async auto-commits of consumed positions when hitting
|
||||
// the interval time or reconciling new assignments
|
||||
applicationEventHandler.add(event);
|
||||
// Wait for reconciliation and auto-commit to be triggered, to ensure all commit requests
|
||||
// retrieve the positions to commit before proceeding with fetching new records
|
||||
ConsumerUtils.getResult(event.reconcileAndAutoCommit(), defaultApiTimeoutMs.toMillis());
|
||||
|
||||
// We must not allow wake-ups between polling for fetches and returning the records.
|
||||
// If the polled fetches are not empty the consumed position has already been updated in the polling
|
||||
// of the fetches. A wakeup between returned fetches and returning records would lead to never
|
||||
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
|
||||
wakeupTrigger.maybeTriggerWakeup();
|
||||
|
||||
updateAssignmentMetadataIfNeeded(timer);
|
||||
checkInflightPoll(timer);
|
||||
final Fetch<K, V> fetch = pollForFetches(timer);
|
||||
if (!fetch.isEmpty()) {
|
||||
// before returning the fetched records, we can send off the next round of fetches
|
||||
|
@ -876,6 +867,71 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code checkInflightPoll()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is
|
||||
* called when no event is currently processing, it will start a new event processing asynchronously. A check
|
||||
* is made during each invocation to see if the <em>inflight</em> event has completed. If it has, it will be
|
||||
* processed accordingly.
|
||||
*/
|
||||
public void checkInflightPoll(Timer timer) {
|
||||
boolean newlySubmittedEvent = false;
|
||||
|
||||
if (inflightPoll == null) {
|
||||
inflightPoll = new AsyncPollEvent(calculateDeadlineMs(timer), time.milliseconds());
|
||||
newlySubmittedEvent = true;
|
||||
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace(
|
||||
"Submitting new inflight event {} with {} remaining on timer",
|
||||
inflightPoll,
|
||||
timer.remainingMs()
|
||||
);
|
||||
}
|
||||
|
||||
applicationEventHandler.add(inflightPoll);
|
||||
}
|
||||
|
||||
try {
|
||||
// Note: this is calling user-supplied code, so make sure that any errors thrown here are caught and
|
||||
// the inflight event is cleared.
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
processBackgroundEvents();
|
||||
|
||||
if (inflightPoll.isComplete()) {
|
||||
Optional<KafkaException> errorOpt = inflightPoll.error();
|
||||
|
||||
// The async poll event has completed, either successfully or not. In either case, clear out the
|
||||
// inflight request.
|
||||
log.trace("Inflight event {} completed, clearing", inflightPoll);
|
||||
inflightPoll = null;
|
||||
|
||||
if (errorOpt.isPresent()) {
|
||||
throw errorOpt.get();
|
||||
}
|
||||
} else if (!newlySubmittedEvent) {
|
||||
if (timer.isExpired()) {
|
||||
// The inflight event is expired...
|
||||
log.trace("Inflight event {} expired without completing, clearing", inflightPoll);
|
||||
inflightPoll = null;
|
||||
} else {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace(
|
||||
"Inflight event {} is incomplete with {} remaining on timer",
|
||||
inflightPoll,
|
||||
timer.remainingMs()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
|
||||
// because the error effectively renders it complete.
|
||||
log.debug("Inflight event {} failed due to {}, clearing", inflightPoll, String.valueOf(t));
|
||||
inflightPoll = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
|
||||
* partitions.
|
||||
|
@ -1771,15 +1827,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return fetch;
|
||||
}
|
||||
|
||||
// send any new fetches (won't resend pending fetches)
|
||||
sendFetches(timer);
|
||||
|
||||
// We do not want to be stuck blocking in poll if we are missing some positions
|
||||
// since the offset lookup may be backing off after a failure
|
||||
|
||||
// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
|
||||
// updateAssignmentMetadataIfNeeded before this method.
|
||||
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
|
||||
if (pollTimeout > retryBackoffMs) {
|
||||
pollTimeout = retryBackoffMs;
|
||||
}
|
||||
|
||||
|
@ -1809,19 +1859,9 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
* of the {@link #fetchBuffer}, converting it to a well-formed {@link CompletedFetch}, validating that it and
|
||||
* the internal {@link SubscriptionState state} are correct, and then converting it all into a {@link Fetch}
|
||||
* for returning.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* This method will {@link ConsumerNetworkThread#wakeup() wake up the network thread} before returning. This is
|
||||
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
|
||||
*/
|
||||
private Fetch<K, V> collectFetch() {
|
||||
final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
|
||||
|
||||
// Notify the network thread to wake up and start the next round of fetching.
|
||||
applicationEventHandler.wakeupNetworkThread();
|
||||
|
||||
return fetch;
|
||||
return fetchCollector.collectFetch(fetchBuffer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1834,11 +1874,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
* defined
|
||||
*/
|
||||
private boolean updateFetchPositions(final Timer timer) {
|
||||
cachedSubscriptionHasAllFetchPositions = false;
|
||||
try {
|
||||
CheckAndUpdatePositionsEvent checkAndUpdatePositionsEvent = new CheckAndUpdatePositionsEvent(calculateDeadlineMs(timer));
|
||||
wakeupTrigger.setActiveTask(checkAndUpdatePositionsEvent.future());
|
||||
cachedSubscriptionHasAllFetchPositions = applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
|
||||
applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);
|
||||
} catch (TimeoutException e) {
|
||||
return false;
|
||||
} finally {
|
||||
|
@ -1856,41 +1895,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
return groupMetadata.get().isPresent();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests}.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* This method takes the following steps to maintain compatibility with the {@link ClassicKafkaConsumer} method
|
||||
* of the same name:
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* The method will wait for confirmation of the request creation before continuing.
|
||||
* </li>
|
||||
* <li>
|
||||
* The method will throw exceptions encountered during request creation to the user <b>immediately</b>.
|
||||
* </li>
|
||||
* <li>
|
||||
* The method will suppress {@link TimeoutException}s that occur while waiting for the confirmation.
|
||||
* Timeouts during request creation are a byproduct of this consumer's thread communication mechanisms.
|
||||
* That exception type isn't thrown in the request creation step of the {@link ClassicKafkaConsumer}.
|
||||
* Additionally, timeouts will not impact the logic of {@link #pollForFetches(Timer) blocking requests}
|
||||
* as it can handle requests that are created after the timeout.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @param timer Timer used to bound how long the consumer waits for the requests to be created, which in practice
|
||||
* is used to avoid using {@link Long#MAX_VALUE} to wait "forever"
|
||||
*/
|
||||
private void sendFetches(Timer timer) {
|
||||
try {
|
||||
applicationEventHandler.addAndGet(new CreateFetchRequestsEvent(calculateDeadlineMs(timer)));
|
||||
} catch (TimeoutException swallow) {
|
||||
// Can be ignored, per above comments.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method signals the background thread to {@link CreateFetchRequestsEvent create fetch requests} for the
|
||||
* pre-fetch case, i.e. right before {@link #poll(Duration)} exits. In the pre-fetch case, the application thread
|
||||
|
|
|
@ -20,9 +20,9 @@ import org.apache.kafka.clients.KafkaClient;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.MetadataErrorNotifiableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
|
||||
import org.apache.kafka.common.internals.IdempotentCloser;
|
||||
import org.apache.kafka.common.requests.AbstractRequest;
|
||||
|
@ -40,6 +40,7 @@ import java.util.Collection;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
@ -193,10 +194,13 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
try {
|
||||
if (event instanceof CompletableEvent) {
|
||||
applicationEventReaper.add((CompletableEvent<?>) event);
|
||||
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
|
||||
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
||||
// so metadata errors need to be checked and handled right away.
|
||||
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
|
||||
}
|
||||
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
|
||||
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
|
||||
// so metadata errors need to be checked and handled right away.
|
||||
if (event instanceof MetadataErrorNotifiableEvent) {
|
||||
if (maybeFailOnMetadataError(List.of(event)))
|
||||
continue;
|
||||
}
|
||||
applicationEventProcessor.process(event);
|
||||
} catch (Throwable t) {
|
||||
|
@ -368,18 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
/**
|
||||
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
|
||||
*/
|
||||
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
|
||||
List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>();
|
||||
private boolean maybeFailOnMetadataError(List<?> events) {
|
||||
List<MetadataErrorNotifiableEvent> filteredEvents = new ArrayList<>();
|
||||
|
||||
for (CompletableEvent<?> ce : events) {
|
||||
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
|
||||
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
|
||||
for (Object obj : events) {
|
||||
if (obj instanceof MetadataErrorNotifiableEvent) {
|
||||
filteredEvents.add((MetadataErrorNotifiableEvent) obj);
|
||||
}
|
||||
}
|
||||
|
||||
if (subscriptionMetadataEvent.isEmpty())
|
||||
return;
|
||||
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
|
||||
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
|
||||
);
|
||||
// Don't get-and-clear the metadata error if there are no events that will be notified.
|
||||
if (filteredEvents.isEmpty())
|
||||
return false;
|
||||
|
||||
Optional<Exception> andClearMetadataError = networkClientDelegate.getAndClearMetadataError();
|
||||
|
||||
if (andClearMetadataError.isPresent()) {
|
||||
Exception metadataError = andClearMetadataError.get();
|
||||
filteredEvents.forEach(e -> e.onMetadataError(metadataError));
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -210,6 +210,30 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
}
|
||||
ClientRequest request = makeClientRequest(r, node, currentTimeMs);
|
||||
if (!client.ready(node, currentTimeMs)) {
|
||||
AuthenticationException authenticationException = client.authenticationException(node);
|
||||
|
||||
// The client may not be ready because it hit an unrecoverable authentication error. In that case, there's
|
||||
// no benefit from retrying, so propagate the error here.
|
||||
if (authenticationException != null) {
|
||||
request.callback().onComplete(
|
||||
new ClientResponse(
|
||||
request.makeHeader(
|
||||
request.requestBuilder().latestAllowedVersion()
|
||||
),
|
||||
request.callback(),
|
||||
request.destination(),
|
||||
request.createdTimeMs(),
|
||||
currentTimeMs,
|
||||
true,
|
||||
null,
|
||||
authenticationException,
|
||||
null
|
||||
)
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// enqueue the request again if the node isn't ready yet. The request will be handled in the next iteration
|
||||
// of the event loop
|
||||
log.debug("Node is not ready, handle the request in the next event loop: node={}, request={}", node, r);
|
||||
|
@ -471,4 +495,33 @@ public class NetworkClientDelegate implements AutoCloseable {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link Supplier} for deferred creation during invocation by
|
||||
* {@link ConsumerNetworkThread}.
|
||||
*/
|
||||
public static Supplier<NetworkClientDelegate> supplier(final Time time,
|
||||
final ConsumerConfig config,
|
||||
final LogContext logContext,
|
||||
final KafkaClient client,
|
||||
final Metadata metadata,
|
||||
final BackgroundEventHandler backgroundEventHandler,
|
||||
final boolean notifyMetadataErrorsViaErrorQueue,
|
||||
final AsyncConsumerMetrics asyncConsumerMetrics) {
|
||||
return new CachedSupplier<>() {
|
||||
@Override
|
||||
protected NetworkClientDelegate create() {
|
||||
return new NetworkClientDelegate(
|
||||
time,
|
||||
config,
|
||||
logContext,
|
||||
client,
|
||||
metadata,
|
||||
backgroundEventHandler,
|
||||
notifyMetadataErrorsViaErrorQueue,
|
||||
asyncConsumerMetrics
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,16 @@ public class OffsetCommitCallbackInvoker {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
|
||||
* itself to return to the application thread for processing.
|
||||
*
|
||||
* @return Current size of queue
|
||||
*/
|
||||
public int size() {
|
||||
return callbackQueue.size();
|
||||
}
|
||||
|
||||
public void enqueueUserCallbackInvocation(final OffsetCommitCallback callback,
|
||||
final Map<TopicPartition, OffsetAndMetadata> offsets,
|
||||
final Exception exception) {
|
||||
|
|
|
@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
|
||||
|
@ -384,7 +384,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
backgroundEventQueue, time, asyncConsumerMetrics);
|
||||
|
||||
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
|
||||
() -> new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
|
||||
NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
|
||||
|
||||
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
|
||||
config,
|
||||
|
@ -583,7 +583,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
|
||||
do {
|
||||
// Make sure the network thread can tell the application is actively polling
|
||||
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
|
||||
applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
|
||||
|
||||
processBackgroundEvents();
|
||||
|
||||
|
|
|
@ -21,14 +21,14 @@ import org.apache.kafka.common.PartitionInfo;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
|
||||
public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> implements MetadataErrorNotifiableEvent {
|
||||
|
||||
protected AbstractTopicMetadataEvent(final Type type, final long deadlineMs) {
|
||||
super(type, deadlineMs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return true;
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,14 +28,14 @@ import java.util.Objects;
|
|||
public abstract class ApplicationEvent {
|
||||
|
||||
public enum Type {
|
||||
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA,
|
||||
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
|
||||
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
|
||||
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
|
||||
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
|
||||
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
|
||||
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||
SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
|
||||
SHARE_ACKNOWLEDGE_ON_CLOSE,
|
||||
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
|
||||
|
|
|
@ -20,11 +20,15 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|||
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
|
||||
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
|
||||
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
|
||||
import org.apache.kafka.clients.consumer.internals.RequestManagers;
|
||||
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
|
||||
import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
|
||||
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
import org.apache.kafka.common.IsolationLevel;
|
||||
|
@ -45,6 +49,7 @@ import java.util.Map;
|
|||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -53,6 +58,7 @@ import java.util.stream.Collectors;
|
|||
* An {@link EventProcessor} that is created and executes in the {@link ConsumerNetworkThread network thread}
|
||||
* which processes {@link ApplicationEvent application events} generated by the application thread.
|
||||
*/
|
||||
@SuppressWarnings({"ClassFanOutComplexity"})
|
||||
public class ApplicationEventProcessor implements EventProcessor<ApplicationEvent> {
|
||||
|
||||
private final Logger log;
|
||||
|
@ -76,6 +82,14 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
@Override
|
||||
public void process(ApplicationEvent event) {
|
||||
switch (event.type()) {
|
||||
case ASYNC_POLL:
|
||||
process((AsyncPollEvent) event);
|
||||
return;
|
||||
|
||||
case SHARE_POLL:
|
||||
process((SharePollEvent) event);
|
||||
return;
|
||||
|
||||
case COMMIT_ASYNC:
|
||||
process((AsyncCommitEvent) event);
|
||||
return;
|
||||
|
@ -84,10 +98,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
process((SyncCommitEvent) event);
|
||||
return;
|
||||
|
||||
case POLL:
|
||||
process((PollEvent) event);
|
||||
return;
|
||||
|
||||
case FETCH_COMMITTED_OFFSETS:
|
||||
process((FetchCommittedOffsetsEvent) event);
|
||||
return;
|
||||
|
@ -217,35 +227,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
}
|
||||
}
|
||||
|
||||
private void process(final PollEvent event) {
|
||||
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
|
||||
// as we're processing before any new fetching starts in the app thread
|
||||
private void process(final SharePollEvent event) {
|
||||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||
consumerMembershipManager.maybeReconcile(true));
|
||||
if (requestManagers.commitRequestManager.isPresent()) {
|
||||
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
|
||||
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
||||
// all commit request generation points have been passed,
|
||||
// so it's safe to notify the app thread could proceed and start fetching
|
||||
event.markReconcileAndAutoCommitComplete();
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
} else {
|
||||
// safe to unblock - no auto-commit risk here:
|
||||
// 1. commitRequestManager is not present
|
||||
// 2. shareConsumer has no auto-commit mechanism
|
||||
event.markReconcileAndAutoCommitComplete();
|
||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
ShareMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
||||
private void process(final CreateFetchRequestsEvent event) {
|
||||
|
@ -352,7 +342,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
if (subscriptions.subscribe(event.topics(), event.listener())) {
|
||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||
}
|
||||
requestManagers.streamsMembershipManager.get().onSubscriptionUpdated();
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
||||
event.future().complete(null);
|
||||
} catch (Exception e) {
|
||||
event.future().completeExceptionally(e);
|
||||
|
@ -375,7 +365,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
try {
|
||||
subscriptions.subscribe(event.pattern(), event.listener());
|
||||
metadata.requestUpdateForNewTopics();
|
||||
updatePatternSubscription(metadata.fetch());
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||
updatePatternSubscription(membershipManager::onSubscriptionUpdated, metadata.fetch());
|
||||
});
|
||||
event.future().complete(null);
|
||||
} catch (Exception e) {
|
||||
event.future().completeExceptionally(e);
|
||||
|
@ -409,13 +402,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
* This will make the consumer send the updated subscription on the next poll.
|
||||
*/
|
||||
private void process(final UpdatePatternSubscriptionEvent event) {
|
||||
if (!subscriptions.hasPatternSubscription()) {
|
||||
return;
|
||||
}
|
||||
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||
updatePatternSubscription(metadata.fetch());
|
||||
}
|
||||
requestManagers.consumerMembershipManager.ifPresent(mm -> maybeUpdatePatternSubscription(mm::onSubscriptionUpdated));
|
||||
event.future().complete(null);
|
||||
}
|
||||
|
||||
|
@ -726,6 +713,75 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
|
||||
}
|
||||
|
||||
private void process(final AsyncPollEvent event) {
|
||||
log.trace("Processing poll logic for {}", event);
|
||||
|
||||
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
|
||||
// as we're processing before any new fetching starts
|
||||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||
consumerMembershipManager.maybeReconcile(true));
|
||||
|
||||
if (requestManagers.commitRequestManager.isPresent()) {
|
||||
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
|
||||
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
||||
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
ConsumerMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
StreamsMembershipManager membershipManager = hrm.membershipManager();
|
||||
maybeUpdatePatternSubscription(membershipManager::onSubscriptionUpdated);
|
||||
membershipManager.onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
||||
log.trace("Processing check and update positions logic for {}", event);
|
||||
CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
|
||||
|
||||
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
|
||||
if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError))
|
||||
return;
|
||||
|
||||
log.trace("Processing create fetch requests logic for {}", event);
|
||||
|
||||
// Create a fetch request if there's no data in the FetchBuffer.
|
||||
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
|
||||
if (maybeCompleteAsyncPollEventExceptionally(event, fetchError))
|
||||
return;
|
||||
|
||||
event.completeSuccessfully();
|
||||
log.trace("Completed event processing for {}", event);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* If there's an error to report to the user, the current event will be completed and this method will
|
||||
* return {@code true}. Otherwise, it will return {@code false}.
|
||||
*/
|
||||
private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) {
|
||||
if (t == null)
|
||||
return false;
|
||||
|
||||
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
|
||||
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (t instanceof CompletionException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
|
||||
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
event.completeExceptionally(e);
|
||||
log.trace("Failing event processing for {}", event, e);
|
||||
return true;
|
||||
}
|
||||
|
||||
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
|
||||
return (value, exception) -> {
|
||||
if (exception != null)
|
||||
|
@ -757,6 +813,16 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
};
|
||||
}
|
||||
|
||||
private void maybeUpdatePatternSubscription(OnSubscriptionUpdatedCallback callback) {
|
||||
if (!subscriptions.hasPatternSubscription()) {
|
||||
return;
|
||||
}
|
||||
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
|
||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||
updatePatternSubscription(callback, metadata.fetch());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function evaluates the regex that the consumer subscribed to
|
||||
* against the list of topic names from metadata, and updates
|
||||
|
@ -764,26 +830,26 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
*
|
||||
* @param cluster Cluster from which we get the topics
|
||||
*/
|
||||
private void updatePatternSubscription(Cluster cluster) {
|
||||
if (requestManagers.consumerHeartbeatRequestManager.isEmpty()) {
|
||||
log.warn("Group membership manager not present when processing a subscribe event");
|
||||
return;
|
||||
}
|
||||
private void updatePatternSubscription(OnSubscriptionUpdatedCallback callback, Cluster cluster) {
|
||||
final Set<String> topicsToSubscribe = cluster.topics().stream()
|
||||
.filter(subscriptions::matchesSubscribedPattern)
|
||||
.collect(Collectors.toSet());
|
||||
if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
|
||||
this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics();
|
||||
|
||||
}
|
||||
// Join the group if not already part of it, or just send the updated subscription
|
||||
// to the broker on the next poll. Note that this is done even if no topics matched
|
||||
// the regex, to ensure the member joins the group if needed (with empty subscription).
|
||||
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().onSubscriptionUpdated();
|
||||
callback.onSubscriptionUpdated();
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
int metadataVersionSnapshot() {
|
||||
return metadataVersionSnapshot;
|
||||
}
|
||||
|
||||
private interface OnSubscriptionUpdatedCallback {
|
||||
|
||||
void onSubscriptionUpdated();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
|
||||
*
|
||||
* <ul>
|
||||
* <li>Polling</li>
|
||||
* <li>{@link CheckAndUpdatePositionsEvent}</li>
|
||||
* <li>{@link CreateFetchRequestsEvent}</li>
|
||||
* </ul>
|
||||
*
|
||||
* {@link AsyncKafkaConsumer#poll(Duration)} is implemented using a non-blocking design to ensure performance is
|
||||
* at the same level as {@link ClassicKafkaConsumer#poll(Duration)}. The event is submitted in {@code poll()}, but
|
||||
* there are no blocking waits for the "result" of the event. Checks are made for the result at certain points, but
|
||||
* they do not block. The logic for the previously-mentioned events is executed sequentially on the background thread.
|
||||
*/
|
||||
public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiableEvent {
|
||||
|
||||
private final long deadlineMs;
|
||||
private final long pollTimeMs;
|
||||
private volatile KafkaException error;
|
||||
private volatile boolean isComplete;
|
||||
|
||||
/**
|
||||
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
|
||||
*
|
||||
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
|
||||
* {@link Duration} passed to {@link Consumer#poll(Duration)}
|
||||
* @param pollTimeMs Time, in milliseconds, at which point the event was created
|
||||
*/
|
||||
public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
|
||||
super(Type.ASYNC_POLL);
|
||||
this.deadlineMs = deadlineMs;
|
||||
this.pollTimeMs = pollTimeMs;
|
||||
}
|
||||
|
||||
public long deadlineMs() {
|
||||
return deadlineMs;
|
||||
}
|
||||
|
||||
public long pollTimeMs() {
|
||||
return pollTimeMs;
|
||||
}
|
||||
|
||||
public Optional<KafkaException> error() {
|
||||
return Optional.ofNullable(error);
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
return isComplete;
|
||||
}
|
||||
|
||||
public void completeSuccessfully() {
|
||||
isComplete = true;
|
||||
}
|
||||
|
||||
public void completeExceptionally(KafkaException e) {
|
||||
isComplete = true;
|
||||
this.error = e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String toStringBase() {
|
||||
return super.toStringBase() +
|
||||
", deadlineMs=" + deadlineMs +
|
||||
", pollTimeMs=" + pollTimeMs +
|
||||
", error=" + error +
|
||||
", isComplete=" + isComplete;
|
||||
}
|
||||
}
|
|
@ -45,6 +45,16 @@ public class BackgroundEventHandler {
|
|||
this.asyncConsumerMetrics = asyncConsumerMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
|
||||
* itself to return to the application thread for processing.
|
||||
*
|
||||
* @return Current size of queue
|
||||
*/
|
||||
public int size() {
|
||||
return backgroundEventQueue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a {@link BackgroundEvent} to the handler.
|
||||
*
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
|
@ -30,7 +31,7 @@ import java.time.Duration;
|
|||
* The event completes with a boolean indicating if all assigned partitions have valid fetch positions
|
||||
* (based on {@link SubscriptionState#hasAllFetchPositions()}).
|
||||
*/
|
||||
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> {
|
||||
public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Boolean> implements MetadataErrorNotifiableEvent {
|
||||
|
||||
public CheckAndUpdatePositionsEvent(long deadlineMs) {
|
||||
super(Type.CHECK_AND_UPDATE_POSITIONS, deadlineMs);
|
||||
|
@ -39,11 +40,11 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent<Bo
|
|||
/**
|
||||
* Indicates that this event requires subscription metadata to be present
|
||||
* for its execution. This is used to ensure that metadata errors are
|
||||
* handled correctly during the {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#poll(Duration) poll}
|
||||
* or {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer#position(TopicPartition) position} process.
|
||||
* handled correctly during the {@link Consumer#poll(Duration) poll}
|
||||
* or {@link Consumer#position(TopicPartition) position} process.
|
||||
*/
|
||||
@Override
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return true;
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
}
|
|
@ -52,8 +52,4 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im
|
|||
protected String toStringBase() {
|
||||
return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs;
|
||||
}
|
||||
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Map;
|
|||
* {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than
|
||||
* or equals to the target timestamp)
|
||||
*/
|
||||
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> {
|
||||
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> implements MetadataErrorNotifiableEvent {
|
||||
private final Map<TopicPartition, Long> timestampsToSearch;
|
||||
private final boolean requireTimestamps;
|
||||
|
||||
|
@ -65,8 +65,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean requireSubscriptionMetadata() {
|
||||
return true;
|
||||
public void onMetadataError(Exception metadataError) {
|
||||
future().completeExceptionally(metadataError);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.Metadata;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
|
||||
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
|
||||
|
||||
/**
|
||||
* This interface is used for events that need to be notified when the
|
||||
* {@link NetworkClientDelegate#getAndClearMetadataError()} has an error.
|
||||
*/
|
||||
public interface MetadataErrorNotifiableEvent {
|
||||
|
||||
/**
|
||||
* The background thread detects metadata errors on every call to {@link NetworkClientDelegate#poll(long, long)}.
|
||||
* {@link NetworkClientDelegate} calls {@link Metadata#maybeThrowAnyException()} and stores the result.
|
||||
* The presence of a metadata error is checked in the {@link ConsumerNetworkThread}'s loop by calling
|
||||
* {@link NetworkClientDelegate#getAndClearMetadataError()}. There are two places in the loop in which the
|
||||
* metadata error is checked:
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* At the very top of the {@link ConsumerNetworkThread}'s loop, the {@link ApplicationEventHandler}'s
|
||||
* queue is drained. Before processing each event via
|
||||
* {@link ApplicationEventProcessor#process(ApplicationEvent)}, if a metadata error occurred, this method
|
||||
* will be invoked on the event if it implements this interface.
|
||||
* <p/>
|
||||
* <em>Note</em>: for an event on which this method is invoked, it will <em>not</em> be passed to the
|
||||
* {@link ApplicationEventProcessor#process(ApplicationEvent)} method.
|
||||
* </li>
|
||||
* <li>
|
||||
* At the very bottom of the {@link ConsumerNetworkThread}'s loop, the {@link CompletableEventReaper}
|
||||
* is executed and any outstanding event is returned. If a metadata error occurred, this method
|
||||
* will be invoked on all unexpired events if it implements this interface.
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()}
|
||||
*/
|
||||
void onMetadataError(Exception metadataError);
|
||||
}
|
|
@ -16,28 +16,12 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class PollEvent extends ApplicationEvent {
|
||||
public class SharePollEvent extends ApplicationEvent {
|
||||
|
||||
private final long pollTimeMs;
|
||||
|
||||
/**
|
||||
* A future that represents the completion of reconciliation and auto-commit
|
||||
* processing.
|
||||
* This future is completed when all commit request generation points have
|
||||
* been passed, including:
|
||||
* <ul>
|
||||
* <li>auto-commit on rebalance</li>
|
||||
* <li>auto-commit on the interval</li>
|
||||
* </ul>
|
||||
* Once completed, it signals that it's safe for the consumer to proceed with
|
||||
* fetching new records.
|
||||
*/
|
||||
private final CompletableFuture<Void> reconcileAndAutoCommit = new CompletableFuture<>();
|
||||
|
||||
public PollEvent(final long pollTimeMs) {
|
||||
super(Type.POLL);
|
||||
public SharePollEvent(final long pollTimeMs) {
|
||||
super(Type.SHARE_POLL);
|
||||
this.pollTimeMs = pollTimeMs;
|
||||
}
|
||||
|
||||
|
@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
|
|||
return pollTimeMs;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> reconcileAndAutoCommit() {
|
||||
return reconcileAndAutoCommit;
|
||||
}
|
||||
|
||||
public void markReconcileAndAutoCommitComplete() {
|
||||
reconcileAndAutoCommit.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toStringBase() {
|
||||
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
|
||||
|
||||
/**
|
||||
* This class provides utilities for tests to wait for a call to {@link Consumer#poll(Duration)} to produce a
|
||||
* result (error, records, specific condition, etc.). This is mostly due to the subtle difference in behavior
|
||||
* of the non-blocking {@link AsyncKafkaConsumer}. A single pass of {@link AsyncKafkaConsumer#poll(Duration)}
|
||||
* may not be sufficient to provide an immediate result.
|
||||
*/
|
||||
public class ConsumerPollTestUtils {
|
||||
|
||||
/**
|
||||
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} to return records from the given {@link Consumer}.
|
||||
*/
|
||||
public static <T> ConsumerRecords<T, T> waitForRecords(Consumer<?, ?> consumer) {
|
||||
Timer timer = Time.SYSTEM.timer(DEFAULT_MAX_WAIT_MS);
|
||||
|
||||
while (timer.notExpired()) {
|
||||
@SuppressWarnings("unchecked")
|
||||
ConsumerRecords<T, T> records = (ConsumerRecords<T, T>) consumer.poll(Duration.ofMillis(1000));
|
||||
|
||||
if (!records.isEmpty())
|
||||
return records;
|
||||
|
||||
timer.update();
|
||||
}
|
||||
|
||||
throw new TimeoutException("no records to return");
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to produce the side effect
|
||||
* that causes {@link Supplier condition} to evaluate to {@code true}.
|
||||
*/
|
||||
public static void waitForCondition(Consumer<?, ?> consumer,
|
||||
Supplier<Boolean> testCondition,
|
||||
String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return testCondition.get();
|
||||
},
|
||||
conditionDetails
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait up to {@link TestUtils#DEFAULT_MAX_WAIT_MS} for the {@link Consumer} to throw an exception that,
|
||||
* when tested against the {@link Function condition}, will evaluate to {@code true}.
|
||||
*/
|
||||
public static void waitForException(Consumer<?, ?> consumer,
|
||||
Function<Throwable, Boolean> testCondition,
|
||||
String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
try {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return false;
|
||||
} catch (Throwable t) {
|
||||
return testCondition.apply(t);
|
||||
}
|
||||
},
|
||||
conditionDetails
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -935,7 +935,6 @@ public class KafkaConsumerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(GroupProtocol.class)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
|
||||
ConsumerMetadata metadata = createMetadata(subscription);
|
||||
MockClient client = new MockClient(time, metadata);
|
||||
|
@ -951,7 +950,7 @@ public class KafkaConsumerTest {
|
|||
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 50L)));
|
||||
client.prepareResponse(fetchResponse(tp0, 50L, 5));
|
||||
|
||||
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
|
||||
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(5, records.count());
|
||||
assertEquals(55L, consumer.position(tp0));
|
||||
assertEquals(1, records.nextOffsets().size());
|
||||
|
@ -1045,8 +1044,7 @@ public class KafkaConsumerTest {
|
|||
|
||||
}, fetchResponse(tp0, 50L, 5));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
|
||||
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(5, records.count());
|
||||
assertEquals(Set.of(tp0), records.partitions());
|
||||
assertEquals(1, records.nextOffsets().size());
|
||||
|
@ -1065,7 +1063,7 @@ public class KafkaConsumerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = GroupProtocol.class)
|
||||
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) throws InterruptedException {
|
||||
public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
|
||||
SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
|
||||
ConsumerMetadata metadata = createMetadata(subscription);
|
||||
MockClient client = new MockClient(time, metadata);
|
||||
|
@ -1081,15 +1079,14 @@ public class KafkaConsumerTest {
|
|||
true, groupId, groupInstanceId, false);
|
||||
consumer.assign(List.of(tp0));
|
||||
|
||||
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||
// New consumer poll(ZERO) needs to wait for the offset fetch event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there are no committed offsets and then
|
||||
// throw the NoOffsetForPartitionException
|
||||
assertPollEventuallyThrows(consumer, NoOffsetForPartitionException.class,
|
||||
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
|
||||
} else {
|
||||
assertThrows(NoOffsetForPartitionException.class, () -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
// Consumer.poll(0) needs to wait for the offset fetch event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there are no committed offsets and then
|
||||
// throw the NoOffsetForPartitionException.
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
NoOffsetForPartitionException.class::isInstance,
|
||||
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -1731,7 +1728,6 @@ public class KafkaConsumerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(GroupProtocol.class)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
|
||||
ConsumerMetadata metadata = createMetadata(subscription);
|
||||
MockClient client = new MockClient(time, metadata);
|
||||
|
@ -1766,7 +1762,7 @@ public class KafkaConsumerTest {
|
|||
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
|
||||
client.prepareResponse(fetchResponse(tp0, 10L, 1));
|
||||
|
||||
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(100));
|
||||
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
|
||||
assertEquals(1, records.count());
|
||||
assertEquals(11L, consumer.position(tp0));
|
||||
|
@ -1825,8 +1821,7 @@ public class KafkaConsumerTest {
|
|||
client.prepareResponse(listOffsetsResponse(Map.of(tp0, 10L)));
|
||||
client.prepareResponse(fetchResponse(tp0, 10L, 1));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
|
||||
ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(1, records.count());
|
||||
assertEquals(11L, consumer.position(tp0));
|
||||
assertEquals(1, records.nextOffsets().size());
|
||||
|
@ -2121,7 +2116,7 @@ public class KafkaConsumerTest {
|
|||
time.sleep(heartbeatIntervalMs);
|
||||
Thread.sleep(heartbeatIntervalMs);
|
||||
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
|
||||
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ZERO);
|
||||
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertFalse(records.isEmpty());
|
||||
assertFalse(records.nextOffsets().isEmpty());
|
||||
}
|
||||
|
@ -2271,19 +2266,18 @@ public class KafkaConsumerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(GroupProtocol.class)
|
||||
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) throws InterruptedException {
|
||||
public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
|
||||
final KafkaConsumer<String, String> consumer = consumerWithPendingAuthenticationError(groupProtocol);
|
||||
consumer.subscribe(Set.of(topic));
|
||||
|
||||
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there is authentication fail and then
|
||||
// throw the AuthenticationException
|
||||
assertPollEventuallyThrows(consumer, AuthenticationException.class,
|
||||
"this consumer was not able to discover metadata errors during continuous polling.");
|
||||
} else {
|
||||
assertThrows(AuthenticationException.class, () -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there is authentication fail and then
|
||||
// throw the AuthenticationException.
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
AuthenticationException.class::isInstance,
|
||||
"this consumer was not able to discover metadata errors during continuous polling."
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: this test triggers a bug with the CONSUMER group protocol implementation.
|
||||
|
@ -2655,7 +2649,6 @@ public class KafkaConsumerTest {
|
|||
|
||||
@ParameterizedTest
|
||||
@EnumSource(GroupProtocol.class)
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCurrentLag(GroupProtocol groupProtocol) throws InterruptedException {
|
||||
final ConsumerMetadata metadata = createMetadata(subscription);
|
||||
final MockClient client = new MockClient(time, metadata);
|
||||
|
@ -2670,9 +2663,11 @@ public class KafkaConsumerTest {
|
|||
consumer.assign(Set.of(tp0));
|
||||
|
||||
// poll once to update with the current metadata
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
|
||||
"No metadata requests sent");
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
|
||||
"No metadata requests sent"
|
||||
);
|
||||
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
|
||||
|
||||
// no error for no current position
|
||||
|
@ -2685,13 +2680,12 @@ public class KafkaConsumerTest {
|
|||
}
|
||||
// poll once again, which should send the list-offset request
|
||||
consumer.seek(tp0, 50L);
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
// requests: list-offset, fetch
|
||||
TestUtils.waitForCondition(() -> {
|
||||
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
|
||||
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
|
||||
return hasListOffsetRequest && hasFetchRequest;
|
||||
}, "No list-offset & fetch request sent");
|
||||
// requests: list-offset
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> requestGenerated(client, ApiKeys.LIST_OFFSETS),
|
||||
"No list-offset sent"
|
||||
);
|
||||
|
||||
// no error for no end offset (so unknown lag)
|
||||
assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
|
||||
|
@ -2700,7 +2694,12 @@ public class KafkaConsumerTest {
|
|||
// and hence next call would return correct lag result
|
||||
ClientRequest listOffsetRequest = findRequest(client, ApiKeys.LIST_OFFSETS);
|
||||
client.respondToRequest(listOffsetRequest, listOffsetsResponse(Map.of(tp0, 90L)));
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
// requests: fetch
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> requestGenerated(client, ApiKeys.FETCH),
|
||||
"No fetch sent"
|
||||
);
|
||||
|
||||
// For AsyncKafkaConsumer, subscription state is updated in background, so the result will eventually be updated.
|
||||
TestUtils.waitForCondition(() -> {
|
||||
|
@ -2715,7 +2714,7 @@ public class KafkaConsumerTest {
|
|||
final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
|
||||
client.respondToRequest(fetchRequest, fetchResponse(Map.of(tp0, fetchInfo)));
|
||||
|
||||
final ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
|
||||
final ConsumerRecords<String, String> records = ConsumerPollTestUtils.waitForRecords(consumer);
|
||||
assertEquals(5, records.count());
|
||||
assertEquals(55L, consumer.position(tp0));
|
||||
assertEquals(1, records.nextOffsets().size());
|
||||
|
@ -3194,27 +3193,14 @@ public class KafkaConsumerTest {
|
|||
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
|
||||
consumer.subscribe(Set.of(invalidTopicName), getConsumerRebalanceListener(consumer));
|
||||
|
||||
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||
// New consumer poll(ZERO) needs to wait for the event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there is invalid topics and then
|
||||
// throw the InvalidTopicException
|
||||
assertPollEventuallyThrows(consumer, InvalidTopicException.class,
|
||||
"Consumer was not able to update fetch positions on continuous calls with 0 timeout");
|
||||
} else {
|
||||
assertThrows(InvalidTopicException.class, () -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
}
|
||||
|
||||
private static <T extends Throwable> void assertPollEventuallyThrows(KafkaConsumer<?, ?> consumer,
|
||||
Class<T> expectedException, String errMsg) throws InterruptedException {
|
||||
TestUtils.waitForCondition(() -> {
|
||||
try {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return false;
|
||||
} catch (Throwable exception) {
|
||||
return expectedException.isInstance(exception);
|
||||
}
|
||||
}, errMsg);
|
||||
// Consumer.poll(0) needs to wait for the event added by a call to poll, to be processed
|
||||
// by the background thread, so it can realize there is invalid topics and then
|
||||
// throw the InvalidTopicException.
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
InvalidTopicException.class::isInstance,
|
||||
"Consumer was not able to update fetch positions on continuous calls with 0 timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -3654,7 +3640,11 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
|
|||
service.execute(() -> consumer.poll(Duration.ofSeconds(5)));
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
assertThrows(ConcurrentModificationException.class, () -> consumer.poll(Duration.ZERO));
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t instanceof ConcurrentModificationException,
|
||||
"Consumer did not throw ConcurrentModificationException within timeout"
|
||||
);
|
||||
client.wakeup();
|
||||
consumer.wakeup();
|
||||
} finally {
|
||||
|
|
|
@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
|
|||
asyncConsumerMetrics
|
||||
)) {
|
||||
// add event
|
||||
applicationEventHandler.add(new PollEvent(time.milliseconds()));
|
||||
applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds()));
|
||||
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.clients.NodeApiVersions;
|
|||
import org.apache.kafka.clients.consumer.CloseOptions;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
||||
import org.apache.kafka.clients.consumer.ConsumerPollTestUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
|
||||
|
@ -43,13 +45,11 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableApplication
|
|||
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
|
||||
|
@ -112,6 +112,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
@ -154,6 +155,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.clearInvocations;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
@ -424,7 +426,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
consumer.wakeup();
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
@ -444,7 +446,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
@ -468,7 +470,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
|
||||
// the previously ignored wake-up should not be ignored in the next call
|
||||
|
@ -505,9 +507,12 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList(topicName), listener);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
consumer.poll(Duration.ZERO);
|
||||
assertTrue(callbackExecuted.get());
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
callbackExecuted::get,
|
||||
"Consumer.poll() did not execute callback within timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -527,7 +532,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
|
@ -673,8 +678,12 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> callback.invoked == 1 && callback.exception == null,
|
||||
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1455,7 +1464,7 @@ public class AsyncKafkaConsumerTest {
|
|||
int expectedRevokedCount,
|
||||
int expectedAssignedCount,
|
||||
int expectedLostCount,
|
||||
Optional<RuntimeException> expectedException
|
||||
Optional<RuntimeException> expectedExceptionOpt
|
||||
) {
|
||||
consumer = newConsumer();
|
||||
CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(
|
||||
|
@ -1473,13 +1482,18 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(e);
|
||||
}
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
// This will trigger the background event queue to process our background event message.
|
||||
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
|
||||
if (expectedException.isPresent()) {
|
||||
Exception exception = assertThrows(expectedException.get().getClass(), () -> consumer.poll(Duration.ZERO));
|
||||
assertEquals(expectedException.get().getMessage(), exception.getMessage());
|
||||
assertEquals(expectedException.get().getCause(), exception.getCause());
|
||||
if (expectedExceptionOpt.isPresent()) {
|
||||
Exception expectedException = expectedExceptionOpt.get();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> Objects.equals(t.getClass(), expectedException.getClass()) &&
|
||||
Objects.equals(t.getMessage(), expectedException.getMessage()) &&
|
||||
Objects.equals(t.getCause(), expectedException.getCause()),
|
||||
"Consumer.poll() did not throw the expected exception " + expectedException
|
||||
);
|
||||
} else {
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
|
@ -1543,10 +1557,12 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(errorEvent);
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
|
||||
|
||||
assertEquals(expectedException.getMessage(), exception.getMessage());
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t.getMessage().equals(expectedException.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1562,10 +1578,12 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(errorEvent2);
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
|
||||
|
||||
assertEquals(expectedException1.getMessage(), exception.getMessage());
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t.getMessage().equals(expectedException1.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
|
||||
);
|
||||
assertTrue(backgroundEventQueue.isEmpty());
|
||||
}
|
||||
|
||||
|
@ -1645,10 +1663,9 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(singletonList("topic1"));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ofMillis(100));
|
||||
verify(applicationEventHandler).add(any(PollEvent.class));
|
||||
verify(applicationEventHandler).add(any(CreateFetchRequestsEvent.class));
|
||||
verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class));
|
||||
}
|
||||
|
||||
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
|
||||
|
@ -1664,11 +1681,8 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(new TopicPartition("t1", 1)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
|
||||
verify(applicationEventHandler, atLeast(1))
|
||||
.addAndGet(ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1701,7 +1715,7 @@ public class AsyncKafkaConsumerTest {
|
|||
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
// And then poll for up to 10000ms, which should return 2 records without timing out
|
||||
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
|
||||
assertEquals(2, returnedRecords.count());
|
||||
|
@ -1805,7 +1819,7 @@ public class AsyncKafkaConsumerTest {
|
|||
// interrupt the thread and call poll
|
||||
try {
|
||||
Thread.currentThread().interrupt();
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
|
||||
} finally {
|
||||
// clear interrupted state again since this thread may be reused by JUnit
|
||||
|
@ -1837,8 +1851,13 @@ public class AsyncKafkaConsumerTest {
|
|||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList("topic"));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
consumer.poll(Duration.ZERO);
|
||||
completeAsyncPollEventSuccessfully();
|
||||
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> backgroundEventReaper.size() == 0,
|
||||
"Consumer.poll() did not reap background events within timeout"
|
||||
);
|
||||
verify(backgroundEventReaper).reap(time.milliseconds());
|
||||
}
|
||||
|
||||
|
@ -1900,7 +1919,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
|
||||
consumer.assign(singleton(new TopicPartition("topic1", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
|
||||
|
||||
|
@ -1908,7 +1927,6 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
consumer.subscribe(Pattern.compile("t*"));
|
||||
consumer.poll(Duration.ZERO);
|
||||
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2275,11 +2293,11 @@ public class AsyncKafkaConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void markReconcileAndAutoCommitCompleteForPollEvent() {
|
||||
private void completeAsyncPollEventSuccessfully() {
|
||||
doAnswer(invocation -> {
|
||||
PollEvent event = invocation.getArgument(0);
|
||||
event.markReconcileAndAutoCommitComplete();
|
||||
AsyncPollEvent event = invocation.getArgument(0);
|
||||
event.completeSuccessfully();
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
|
|||
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
|
@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
|
|||
)) {
|
||||
consumerNetworkThread.initializeResources();
|
||||
|
||||
PollEvent event = new PollEvent(0);
|
||||
AsyncPollEvent event = new AsyncPollEvent(10, 0);
|
||||
event.setEnqueuedMs(time.milliseconds());
|
||||
applicationEventQueue.add(event);
|
||||
asyncConsumerMetrics.recordApplicationEventQueueSize(1);
|
||||
|
|
|
@ -24,11 +24,11 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
|
|||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
|
||||
|
@ -680,7 +680,7 @@ public class ShareConsumerImplTest {
|
|||
consumer.subscribe(subscriptionTopic);
|
||||
|
||||
consumer.poll(Duration.ofMillis(100));
|
||||
verify(applicationEventHandler).add(any(PollEvent.class));
|
||||
verify(applicationEventHandler).add(any(SharePollEvent.class));
|
||||
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
|
||||
|
||||
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
|
||||
|
|
|
@ -171,7 +171,7 @@ public class ApplicationEventProcessorTest {
|
|||
|
||||
private static Stream<Arguments> applicationEvents() {
|
||||
return Stream.of(
|
||||
Arguments.of(new PollEvent(100)),
|
||||
Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)),
|
||||
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
|
||||
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
|
||||
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
|
||||
|
@ -265,12 +265,12 @@ public class ApplicationEventProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testPollEvent() {
|
||||
PollEvent event = new PollEvent(12345);
|
||||
AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
|
||||
|
||||
setupProcessor(true);
|
||||
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
|
||||
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(new CompletableFuture<>());
|
||||
processor.process(event);
|
||||
assertTrue(event.reconcileAndAutoCommit().isDone());
|
||||
verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
|
||||
verify(membershipManager).onConsumerPoll();
|
||||
verify(heartbeatRequestManager).resetPollTimer(12345);
|
||||
|
|
|
@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
|||
|
||||
val consumer = createConsumer()
|
||||
consumer.assign(java.util.List.of(tp))
|
||||
assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
|
||||
assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer))
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
|
|||
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
|
||||
import org.apache.kafka.clients.admin._
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer
|
||||
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
|
||||
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRecords, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
|
||||
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig}
|
||||
|
@ -568,8 +568,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
try {
|
||||
consumer.assign(util.Set.of(tp))
|
||||
consumer.seekToBeginning(util.Set.of(tp))
|
||||
val records = consumer.poll(time.Duration.ofSeconds(3))
|
||||
assertEquals(expectedNumber, records.count())
|
||||
def verifyRecordCount(records: ConsumerRecords[Array[Byte], Array[Byte]]): Boolean = {
|
||||
expectedNumber == records.count()
|
||||
}
|
||||
TestUtils.pollRecordsUntilTrue(
|
||||
consumer,
|
||||
verifyRecordCount,
|
||||
s"Consumer.poll() did not return the expected number of records ($expectedNumber) within the timeout",
|
||||
pollTimeoutMs = 3000
|
||||
)
|
||||
} finally consumer.close()
|
||||
}
|
||||
|
||||
|
@ -4585,7 +4592,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
prepareRecords(testTopicName)
|
||||
|
||||
// Producer sends messages
|
||||
for (i <- 1 to 20) {
|
||||
val numRecords = 20
|
||||
|
||||
for (i <- 1 to numRecords) {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
val producerRecord = producer.send(
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
|
||||
|
@ -4594,18 +4603,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
}, "Fail to produce record to topic")
|
||||
}
|
||||
|
||||
val consumerConfig = new Properties();
|
||||
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
val streams = createStreamsGroup(
|
||||
configOverrides = consumerConfig,
|
||||
inputTopic = testTopicName,
|
||||
streamsGroupId = streamsGroupId,
|
||||
)
|
||||
|
||||
try {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
streams.poll(JDuration.ofMillis(100L))
|
||||
!streams.assignment().isEmpty
|
||||
}, "Consumer not assigned to partitions")
|
||||
var counter = 0
|
||||
|
||||
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
|
||||
counter += records.count()
|
||||
counter >= numRecords
|
||||
}
|
||||
TestUtils.pollRecordsUntilTrue(
|
||||
streams,
|
||||
verifyRecordCount,
|
||||
s"Consumer not assigned to partitions"
|
||||
)
|
||||
|
||||
streams.poll(JDuration.ofMillis(1000L))
|
||||
streams.commitSync()
|
||||
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
|
@ -4645,7 +4664,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
prepareTopics(List(testTopicName), testNumPartitions)
|
||||
prepareRecords(testTopicName)
|
||||
// Producer sends messages
|
||||
for (i <- 1 to 20) {
|
||||
val numRecords = 20
|
||||
|
||||
for (i <- 1 to numRecords) {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
val producerRecord = producer.send(
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
|
||||
|
@ -4654,18 +4675,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
}, "Fail to produce record to topic")
|
||||
}
|
||||
|
||||
val consumerConfig = new Properties();
|
||||
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
val streams = createStreamsGroup(
|
||||
configOverrides = consumerConfig,
|
||||
inputTopic = testTopicName,
|
||||
streamsGroupId = streamsGroupId,
|
||||
)
|
||||
|
||||
try {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
streams.poll(JDuration.ofMillis(100L))
|
||||
!streams.assignment().isEmpty
|
||||
}, "Consumer not assigned to partitions")
|
||||
var counter = 0
|
||||
|
||||
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
|
||||
counter += records.count()
|
||||
counter >= numRecords
|
||||
}
|
||||
TestUtils.pollRecordsUntilTrue(
|
||||
streams,
|
||||
verifyRecordCount,
|
||||
s"Consumer not assigned to partitions"
|
||||
)
|
||||
|
||||
streams.poll(JDuration.ofMillis(1000L))
|
||||
streams.commitSync()
|
||||
|
||||
// List streams group offsets
|
||||
|
@ -4722,7 +4753,9 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
prepareRecords(testTopicName)
|
||||
|
||||
// Producer sends messages
|
||||
for (i <- 1 to 20) {
|
||||
val numRecords = 20
|
||||
|
||||
for (i <- 1 to numRecords) {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
val producerRecord = producer.send(
|
||||
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
|
||||
|
@ -4731,18 +4764,28 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
|
|||
}, "Fail to produce record to topic")
|
||||
}
|
||||
|
||||
val consumerConfig = new Properties();
|
||||
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
|
||||
|
||||
val streams = createStreamsGroup(
|
||||
configOverrides = consumerConfig,
|
||||
inputTopic = testTopicName,
|
||||
streamsGroupId = streamsGroupId,
|
||||
)
|
||||
|
||||
try {
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
streams.poll(JDuration.ofMillis(100L))
|
||||
!streams.assignment().isEmpty
|
||||
}, "Consumer not assigned to partitions")
|
||||
var counter = 0
|
||||
|
||||
def verifyRecordCount(records: ConsumerRecords[Nothing, Nothing]): Boolean = {
|
||||
counter += records.count()
|
||||
counter >= numRecords
|
||||
}
|
||||
TestUtils.pollRecordsUntilTrue(
|
||||
streams,
|
||||
verifyRecordCount,
|
||||
s"Consumer not assigned to partitions"
|
||||
)
|
||||
|
||||
streams.poll(JDuration.ofMillis(1000L))
|
||||
streams.commitSync()
|
||||
|
||||
// List streams group offsets
|
||||
|
|
|
@ -145,13 +145,27 @@ class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
|
|||
}
|
||||
|
||||
private def verifyConsumerWithAuthenticationFailure(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = {
|
||||
verifyAuthenticationException(consumer.poll(Duration.ofMillis(1000)))
|
||||
val startMs = System.currentTimeMillis
|
||||
TestUtils.pollUntilException(
|
||||
consumer,
|
||||
_ => true,
|
||||
s"Consumer.poll() did not throw an exception within the timeout",
|
||||
pollTimeoutMs = 1000
|
||||
)
|
||||
val elapsedMs = System.currentTimeMillis - startMs
|
||||
assertTrue(elapsedMs <= 5000, s"Poll took too long, elapsed=$elapsedMs")
|
||||
verifyAuthenticationException(consumer.partitionsFor(topic))
|
||||
|
||||
createClientCredential()
|
||||
val producer = createProducer()
|
||||
verifyWithRetry(sendOneRecord(producer))()
|
||||
verifyWithRetry(consumer.poll(Duration.ofMillis(1000)))(_.count == 1)
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
try {
|
||||
consumer.poll(Duration.ofMillis(1000)).count() == 1
|
||||
} catch {
|
||||
case _:Throwable => false
|
||||
}
|
||||
}, msg = s"Consumer.poll() did not read the expected number of records within the timeout")
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package kafka.server
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
import java.time.Duration
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
|
||||
import javax.security.auth.login.LoginContext
|
||||
|
@ -185,7 +184,12 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
|
|||
consumer.assign(java.util.List.of(tp))
|
||||
|
||||
val startMs = System.currentTimeMillis()
|
||||
assertThrows(classOf[SaslAuthenticationException], () => consumer.poll(Duration.ofMillis(50)))
|
||||
TestUtils.pollUntilException(
|
||||
consumer,
|
||||
t => t.isInstanceOf[SaslAuthenticationException],
|
||||
"Consumer.poll() did not trigger a SaslAuthenticationException within timeout",
|
||||
pollTimeoutMs = 50
|
||||
)
|
||||
val endMs = System.currentTimeMillis()
|
||||
require(endMs - startMs < failedAuthenticationDelayMs, "Failed authentication must not be delayed on the client")
|
||||
consumer.close()
|
||||
|
|
|
@ -690,6 +690,21 @@ object TestUtils extends Logging {
|
|||
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
|
||||
}
|
||||
|
||||
def pollUntilException(consumer: Consumer[_, _],
|
||||
action: Throwable => Boolean,
|
||||
msg: => String,
|
||||
waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS,
|
||||
pollTimeoutMs: Long = 100): Unit = {
|
||||
waitUntilTrue(() => {
|
||||
try {
|
||||
consumer.poll(Duration.ofMillis(pollTimeoutMs))
|
||||
false
|
||||
} catch {
|
||||
case t: Throwable => action(t)
|
||||
}
|
||||
}, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
|
||||
}
|
||||
|
||||
def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
|
||||
action: ConsumerRecords[K, V] => Boolean,
|
||||
msg: => String,
|
||||
|
|
|
@ -15,13 +15,16 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.server;
|
||||
package org.apache.kafka.server;
|
||||
|
||||
import org.apache.kafka.clients.admin.AddRaftVoterOptions;
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.FeatureMetadata;
|
||||
import org.apache.kafka.clients.admin.QuorumInfo;
|
||||
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
|
||||
import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.InconsistentClusterIdException;
|
||||
import org.apache.kafka.common.test.KafkaClusterTestKit;
|
||||
import org.apache.kafka.common.test.TestKitNodes;
|
||||
import org.apache.kafka.common.test.api.TestKitDefaults;
|
||||
|
@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
|
|||
import org.apache.kafka.server.common.KRaftVersion;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
|
|||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@Tag("integration")
|
||||
public class ReconfigurableQuorumIntegrationTest {
|
||||
static void checkKRaftVersions(Admin admin, short finalized) throws Exception {
|
||||
FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get();
|
||||
|
@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel());
|
||||
});
|
||||
|
@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
).setStandalone(true).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
|
||||
checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel());
|
||||
});
|
||||
|
@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet());
|
||||
|
@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (Admin admin = Admin.create(cluster.clientProperties())) {
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
|
@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
|
||||
for (int replicaId : new int[] {3000, 3001, 3002}) {
|
||||
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||
}
|
||||
});
|
||||
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
|
||||
Map<Integer, Uuid> voters = findVoterDirs(admin);
|
||||
assertEquals(Set.of(3001, 3002), voters.keySet());
|
||||
for (int replicaId : new int[] {3001, 3002}) {
|
||||
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
|
||||
}
|
||||
});
|
||||
|
||||
admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
|
||||
).all().get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
|
||||
final var nodes = new TestKitNodes.Builder()
|
||||
.setClusterId("test-cluster")
|
||||
.setNumBrokerNodes(1)
|
||||
.setNumControllerNodes(3)
|
||||
.build();
|
||||
|
||||
final Map<Integer, Uuid> initialVoters = new HashMap<>();
|
||||
for (final var controllerNode : nodes.controllerNodes().values()) {
|
||||
initialVoters.put(
|
||||
controllerNode.id(),
|
||||
controllerNode.metadataDirectoryId()
|
||||
);
|
||||
}
|
||||
|
||||
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
|
||||
cluster.format();
|
||||
cluster.startup();
|
||||
try (var admin = Admin.create(cluster.clientProperties())) {
|
||||
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
|
||||
var removeFuture = admin.removeRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
|
||||
|
||||
var addFuture = admin.addRaftVoter(
|
||||
3000,
|
||||
dirId,
|
||||
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
|
||||
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
|
||||
).all();
|
||||
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue