mirror of https://github.com/apache/kafka.git
KAFKA-19042 Move PlaintextConsumerCommitTest to client-integration-tests module (#19389)
Use Java to rewrite `PlaintextConsumerCommitTest` by new test infra and move it to client-integration-tests module. Reviewers: PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
c493d89334
commit
6573b4ace1
|
@ -0,0 +1,594 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.clients.consumer;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.internals.Topic;
|
||||||
|
import org.apache.kafka.common.test.ClusterInstance;
|
||||||
|
import org.apache.kafka.common.test.TestUtils;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterConfigProperty;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterTest;
|
||||||
|
import org.apache.kafka.common.test.api.ClusterTestDefaults;
|
||||||
|
import org.apache.kafka.common.test.api.Type;
|
||||||
|
import org.apache.kafka.test.MockConsumerInterceptor;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment;
|
||||||
|
import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
|
||||||
|
import static org.apache.kafka.clients.ClientsTestUtils.sendRecords;
|
||||||
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
|
||||||
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
|
||||||
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
|
||||||
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
|
||||||
|
import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT;
|
||||||
|
import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS;
|
||||||
|
import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION;
|
||||||
|
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
|
||||||
|
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
|
||||||
|
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
@ClusterTestDefaults(
|
||||||
|
types = {Type.KRAFT},
|
||||||
|
brokers = BROKER_COUNT,
|
||||||
|
serverProperties = {
|
||||||
|
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = OFFSETS_TOPIC_PARTITIONS),
|
||||||
|
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = OFFSETS_TOPIC_REPLICATION),
|
||||||
|
@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
public class PlaintextConsumerCommitTest {
|
||||||
|
|
||||||
|
public static final int BROKER_COUNT = 3;
|
||||||
|
public static final String OFFSETS_TOPIC_PARTITIONS = "1";
|
||||||
|
public static final String OFFSETS_TOPIC_REPLICATION = "3";
|
||||||
|
private final ClusterInstance cluster;
|
||||||
|
private final String topic = "topic";
|
||||||
|
private final TopicPartition tp = new TopicPartition(topic, 0);
|
||||||
|
private final TopicPartition tp1 = new TopicPartition(topic, 1);
|
||||||
|
|
||||||
|
public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) {
|
||||||
|
this.cluster = clusterInstance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setup() throws InterruptedException {
|
||||||
|
cluster.createTopic(topic, 2, (short) BROKER_COUNT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerAutoCommitOnClose() throws InterruptedException {
|
||||||
|
testAutoCommitOnClose(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerAutoCommitOnClose() throws InterruptedException {
|
||||||
|
testAutoCommitOnClose(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
try (var consumer = createConsumer(groupProtocol, true)) {
|
||||||
|
sendRecords(cluster, tp, 1000);
|
||||||
|
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
awaitAssignment(consumer, Set.of(tp, tp1));
|
||||||
|
// should auto-commit sought positions before closing
|
||||||
|
consumer.seek(tp, 300);
|
||||||
|
consumer.seek(tp1, 500);
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we should see the committed positions from another consumer
|
||||||
|
try (var anotherConsumer = createConsumer(groupProtocol, true)) {
|
||||||
|
assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException {
|
||||||
|
testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException {
|
||||||
|
testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
try (var consumer = createConsumer(groupProtocol, true)) {
|
||||||
|
sendRecords(cluster, tp, 1000);
|
||||||
|
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
awaitAssignment(consumer, Set.of(tp, tp1));
|
||||||
|
|
||||||
|
// should auto-commit sought positions before closing
|
||||||
|
consumer.seek(tp, 300);
|
||||||
|
consumer.seek(tp1, 500);
|
||||||
|
|
||||||
|
// wakeup the consumer before closing to simulate trying to break a poll
|
||||||
|
// loop from another thread
|
||||||
|
consumer.wakeup();
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we should see the committed positions from another consumer
|
||||||
|
try (var anotherConsumer = createConsumer(groupProtocol, true)) {
|
||||||
|
assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerCommitMetadata() throws InterruptedException {
|
||||||
|
testCommitMetadata(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerCommitMetadata() throws InterruptedException {
|
||||||
|
testCommitMetadata(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCommitMetadata(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
try (var consumer = createConsumer(groupProtocol, true)) {
|
||||||
|
consumer.assign(List.of(tp));
|
||||||
|
// sync commit
|
||||||
|
var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo");
|
||||||
|
consumer.commitSync(Map.of(tp, syncMetadata));
|
||||||
|
assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp));
|
||||||
|
|
||||||
|
// async commit
|
||||||
|
var asyncMetadata = new OffsetAndMetadata(10, "bar");
|
||||||
|
sendAndAwaitAsyncCommit(consumer, Map.of(tp, asyncMetadata));
|
||||||
|
assertEquals(asyncMetadata, consumer.committed(Set.of(tp)).get(tp));
|
||||||
|
|
||||||
|
// handle null metadata
|
||||||
|
var nullMetadata = new OffsetAndMetadata(5, null);
|
||||||
|
consumer.commitSync(Map.of(tp, nullMetadata));
|
||||||
|
assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerAsyncCommit() throws InterruptedException {
|
||||||
|
testAsyncCommit(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerAsyncCommit() throws InterruptedException {
|
||||||
|
testAsyncCommit(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAsyncCommit(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
// Ensure the __consumer_offsets topic is created to prevent transient issues,
|
||||||
|
// such as RetriableCommitFailedException during async offset commits.
|
||||||
|
cluster.createTopic(
|
||||||
|
Topic.GROUP_METADATA_TOPIC_NAME,
|
||||||
|
Integer.parseInt(OFFSETS_TOPIC_PARTITIONS),
|
||||||
|
Short.parseShort(OFFSETS_TOPIC_REPLICATION)
|
||||||
|
);
|
||||||
|
try (var consumer = createConsumer(groupProtocol, false)) {
|
||||||
|
consumer.assign(List.of(tp));
|
||||||
|
|
||||||
|
var callback = new CountConsumerCommitCallback();
|
||||||
|
var count = 5;
|
||||||
|
for (var i = 1; i <= count; i++)
|
||||||
|
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), callback);
|
||||||
|
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
consumer.poll(Duration.ofMillis(100));
|
||||||
|
return callback.successCount >= count || callback.lastError.isPresent();
|
||||||
|
}, "Failed to observe commit callback before timeout");
|
||||||
|
|
||||||
|
assertEquals(Optional.empty(), callback.lastError);
|
||||||
|
assertEquals(count, callback.successCount);
|
||||||
|
assertEquals(new OffsetAndMetadata(count), consumer.committed(Set.of(tp)).get(tp));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerAutoCommitIntercept() throws InterruptedException {
|
||||||
|
testAutoCommitIntercept(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerAutoCommitIntercept() throws InterruptedException {
|
||||||
|
testAutoCommitIntercept(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAutoCommitIntercept(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
var topic2 = "topic2";
|
||||||
|
cluster.createTopic(topic2, 2, (short) 3);
|
||||||
|
var numRecords = 100;
|
||||||
|
try (var producer = cluster.producer();
|
||||||
|
// create consumer with interceptor
|
||||||
|
Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(
|
||||||
|
GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT),
|
||||||
|
ENABLE_AUTO_COMMIT_CONFIG, "true",
|
||||||
|
INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor"
|
||||||
|
))
|
||||||
|
) {
|
||||||
|
// produce records
|
||||||
|
for (var i = 0; i < numRecords; i++) {
|
||||||
|
producer.send(new ProducerRecord<>(tp.topic(), tp.partition(), ("key " + i).getBytes(), ("value " + i).getBytes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
var rebalanceListener = new ConsumerRebalanceListener() {
|
||||||
|
@Override
|
||||||
|
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||||
|
// keep partitions paused in this test so that we can verify the commits based on specific seeks
|
||||||
|
consumer.pause(partitions);
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||||
|
// No-op
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
changeConsumerSubscriptionAndValidateAssignment(
|
||||||
|
consumer,
|
||||||
|
List.of(topic),
|
||||||
|
Set.of(tp, tp1),
|
||||||
|
rebalanceListener
|
||||||
|
);
|
||||||
|
consumer.seek(tp, 10);
|
||||||
|
consumer.seek(tp1, 20);
|
||||||
|
|
||||||
|
// change subscription to trigger rebalance
|
||||||
|
var commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
|
||||||
|
var expectedAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1));
|
||||||
|
changeConsumerSubscriptionAndValidateAssignment(
|
||||||
|
consumer,
|
||||||
|
List.of(topic, topic2),
|
||||||
|
expectedAssignment,
|
||||||
|
rebalanceListener
|
||||||
|
);
|
||||||
|
|
||||||
|
// after rebalancing, we should have reset to the committed positions
|
||||||
|
var committed1 = consumer.committed(Set.of(tp));
|
||||||
|
assertEquals(10, committed1.get(tp).offset());
|
||||||
|
var committed2 = consumer.committed(Set.of(tp1));
|
||||||
|
assertEquals(20, committed2.get(tp1).offset());
|
||||||
|
|
||||||
|
// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close.
|
||||||
|
// However, in the CONSUMER protocol, the assignment may be changed outside a poll, so
|
||||||
|
// we need to poll once to ensure the interceptor is called.
|
||||||
|
if (groupProtocol == GroupProtocol.CONSUMER) {
|
||||||
|
consumer.poll(Duration.ZERO);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance);
|
||||||
|
|
||||||
|
// verify commits are intercepted on close
|
||||||
|
var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
|
||||||
|
consumer.close();
|
||||||
|
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeClose);
|
||||||
|
producer.close();
|
||||||
|
// cleanup
|
||||||
|
MockConsumerInterceptor.resetCounters();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerCommitSpecifiedOffsets() throws InterruptedException {
|
||||||
|
testCommitSpecifiedOffsets(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerCommitSpecifiedOffsets() throws InterruptedException {
|
||||||
|
testCommitSpecifiedOffsets(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCommitSpecifiedOffsets(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
try (Producer<byte[], byte[]> producer = cluster.producer();
|
||||||
|
var consumer = createConsumer(groupProtocol, false)
|
||||||
|
) {
|
||||||
|
sendRecords(producer, tp, 5, System.currentTimeMillis());
|
||||||
|
sendRecords(producer, tp1, 7, System.currentTimeMillis());
|
||||||
|
|
||||||
|
consumer.assign(List.of(tp, tp1));
|
||||||
|
|
||||||
|
var pos1 = consumer.position(tp);
|
||||||
|
var pos2 = consumer.position(tp1);
|
||||||
|
|
||||||
|
consumer.commitSync(Map.of(tp, new OffsetAndMetadata(3L)));
|
||||||
|
|
||||||
|
assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertNull(consumer.committed(Collections.singleton(tp1)).get(tp1));
|
||||||
|
|
||||||
|
// Positions should not change
|
||||||
|
assertEquals(pos1, consumer.position(tp));
|
||||||
|
assertEquals(pos2, consumer.position(tp1));
|
||||||
|
|
||||||
|
consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(5L)));
|
||||||
|
|
||||||
|
assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(5, consumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
|
||||||
|
// Using async should pick up the committed changes after commit completes
|
||||||
|
sendAndAwaitAsyncCommit(consumer, Map.of(tp1, new OffsetAndMetadata(7L)));
|
||||||
|
assertEquals(7, consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerAutoCommitOnRebalance() throws InterruptedException {
|
||||||
|
testAutoCommitOnRebalance(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerAutoCommitOnRebalance() throws InterruptedException {
|
||||||
|
testAutoCommitOnRebalance(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testAutoCommitOnRebalance(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
var topic2 = "topic2";
|
||||||
|
cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
|
||||||
|
try (var consumer = createConsumer(groupProtocol, true)) {
|
||||||
|
sendRecords(cluster, tp, 1000);
|
||||||
|
|
||||||
|
var rebalanceListener = new ConsumerRebalanceListener() {
|
||||||
|
@Override
|
||||||
|
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
|
||||||
|
// keep partitions paused in this test so that we can verify the commits based on specific seeks
|
||||||
|
consumer.pause(partitions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
consumer.subscribe(List.of(topic), rebalanceListener);
|
||||||
|
awaitAssignment(consumer, Set.of(tp, tp1));
|
||||||
|
|
||||||
|
consumer.seek(tp, 300);
|
||||||
|
consumer.seek(tp1, 500);
|
||||||
|
// change subscription to trigger rebalance
|
||||||
|
consumer.subscribe(List.of(topic, topic2), rebalanceListener);
|
||||||
|
|
||||||
|
var newAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1));
|
||||||
|
awaitAssignment(consumer, newAssignment);
|
||||||
|
|
||||||
|
// after rebalancing, we should have reset to the committed positions
|
||||||
|
assertEquals(300, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(500, consumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerSubscribeAndCommitSync() throws InterruptedException {
|
||||||
|
testSubscribeAndCommitSync(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerSubscribeAndCommitSync() throws InterruptedException {
|
||||||
|
testSubscribeAndCommitSync(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testSubscribeAndCommitSync(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
// This test ensure that the member ID is propagated from the group coordinator when the
|
||||||
|
// assignment is received into a subsequent offset commit
|
||||||
|
try (var consumer = createConsumer(groupProtocol, false)) {
|
||||||
|
assertEquals(0, consumer.assignment().size());
|
||||||
|
consumer.subscribe(List.of(topic));
|
||||||
|
awaitAssignment(consumer, Set.of(tp, tp1));
|
||||||
|
|
||||||
|
consumer.seek(tp, 0);
|
||||||
|
consumer.commitSync();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testClassicConsumerPositionAndCommit() throws InterruptedException {
|
||||||
|
testPositionAndCommit(GroupProtocol.CLASSIC);
|
||||||
|
}
|
||||||
|
|
||||||
|
@ClusterTest
|
||||||
|
public void testAsyncConsumerPositionAndCommit() throws InterruptedException {
|
||||||
|
testPositionAndCommit(GroupProtocol.CONSUMER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testPositionAndCommit(GroupProtocol groupProtocol) throws InterruptedException {
|
||||||
|
try (Producer<byte[], byte[]> producer = cluster.producer();
|
||||||
|
var consumer = createConsumer(groupProtocol, false);
|
||||||
|
var otherConsumer = createConsumer(groupProtocol, false)
|
||||||
|
) {
|
||||||
|
var startingTimestamp = System.currentTimeMillis();
|
||||||
|
sendRecords(producer, tp, 5, startingTimestamp);
|
||||||
|
|
||||||
|
var topicPartition = new TopicPartition(topic, 15);
|
||||||
|
assertNull(consumer.committed(Collections.singleton(topicPartition)).get(topicPartition));
|
||||||
|
|
||||||
|
// position() on a partition that we aren't subscribed to throws an exception
|
||||||
|
assertThrows(IllegalStateException.class, () -> consumer.position(topicPartition));
|
||||||
|
|
||||||
|
consumer.assign(List.of(tp));
|
||||||
|
|
||||||
|
assertEquals(0L, consumer.position(tp), "position() on a partition that we are subscribed to should reset the offset");
|
||||||
|
consumer.commitSync();
|
||||||
|
assertEquals(0L, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
consumeAndVerifyRecords(consumer, tp, 5, 0, 0, startingTimestamp);
|
||||||
|
assertEquals(5L, consumer.position(tp), "After consuming 5 records, position should be 5");
|
||||||
|
consumer.commitSync();
|
||||||
|
assertEquals(5L, consumer.committed(Set.of(tp)).get(tp).offset(), "Committed offset should be returned");
|
||||||
|
|
||||||
|
startingTimestamp = System.currentTimeMillis();
|
||||||
|
sendRecords(producer, tp, 1, startingTimestamp);
|
||||||
|
|
||||||
|
// another consumer in the same group should get the same position
|
||||||
|
otherConsumer.assign(List.of(tp));
|
||||||
|
consumeAndVerifyRecords(otherConsumer, tp, 1, 5, 0, startingTimestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
|
||||||
|
@ClusterTest
|
||||||
|
public void testCommitAsyncCompletedBeforeConsumerCloses() {
|
||||||
|
// This is testing the contract that asynchronous offset commit are completed before the consumer
|
||||||
|
// is closed, even when no commit sync is performed as part of the close (due to auto-commit
|
||||||
|
// disabled, or simply because there are no consumed offsets).
|
||||||
|
try (Producer<byte[], byte[]> producer = cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all"));
|
||||||
|
var consumer = createConsumer(GroupProtocol.CONSUMER, false)
|
||||||
|
) {
|
||||||
|
sendRecords(producer, tp, 3, System.currentTimeMillis());
|
||||||
|
sendRecords(producer, tp1, 3, System.currentTimeMillis());
|
||||||
|
consumer.assign(List.of(tp, tp1));
|
||||||
|
|
||||||
|
// Try without looking up the coordinator first
|
||||||
|
var cb = new CountConsumerCommitCallback();
|
||||||
|
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
|
||||||
|
consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), cb);
|
||||||
|
|
||||||
|
consumer.close();
|
||||||
|
assertEquals(2, cb.successCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
|
||||||
|
@ClusterTest
|
||||||
|
public void testCommitAsyncCompletedBeforeCommitSyncReturns() {
|
||||||
|
// This is testing the contract that asynchronous offset commits sent previously with the
|
||||||
|
// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of
|
||||||
|
// `commitSync` (given that it does not time out).
|
||||||
|
try (Producer<byte[], byte[]> producer = cluster.producer();
|
||||||
|
var consumer = createConsumer(GroupProtocol.CONSUMER, false)
|
||||||
|
) {
|
||||||
|
sendRecords(producer, tp, 3, System.currentTimeMillis());
|
||||||
|
sendRecords(producer, tp1, 3, System.currentTimeMillis());
|
||||||
|
|
||||||
|
consumer.assign(List.of(tp, tp1));
|
||||||
|
|
||||||
|
// Try without looking up the coordinator first
|
||||||
|
var cb = new CountConsumerCommitCallback();
|
||||||
|
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
|
||||||
|
consumer.commitSync(Map.of());
|
||||||
|
|
||||||
|
assertEquals(1, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(1, cb.successCount);
|
||||||
|
|
||||||
|
// Try with coordinator known
|
||||||
|
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(2L)), cb);
|
||||||
|
consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(2L)));
|
||||||
|
|
||||||
|
assertEquals(2, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
assertEquals(2, cb.successCount);
|
||||||
|
|
||||||
|
// Try with empty sync commit
|
||||||
|
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(3L)), cb);
|
||||||
|
consumer.commitSync(Map.of());
|
||||||
|
|
||||||
|
assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
|
||||||
|
assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
|
||||||
|
assertEquals(3, cb.successCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol, boolean enableAutoCommit) {
|
||||||
|
return cluster.consumer(Map.of(
|
||||||
|
GROUP_ID_CONFIG, "test-group",
|
||||||
|
GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
|
||||||
|
ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendAndAwaitAsyncCommit(
|
||||||
|
Consumer<byte[], byte[]> consumer,
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> offsetsOpt
|
||||||
|
) throws InterruptedException {
|
||||||
|
var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
|
||||||
|
|
||||||
|
commitCallback.sendAsyncCommit();
|
||||||
|
TestUtils.waitForCondition(() -> {
|
||||||
|
consumer.poll(Duration.ofMillis(100));
|
||||||
|
return commitCallback.isComplete;
|
||||||
|
}, "Failed to observe commit callback before timeout"
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(Optional.empty(), commitCallback.error);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class RetryCommitCallback implements OffsetCommitCallback {
|
||||||
|
private boolean isComplete = false;
|
||||||
|
private Optional<Exception> error = Optional.empty();
|
||||||
|
|
||||||
|
private final Consumer<byte[], byte[]> consumer;
|
||||||
|
private final Map<TopicPartition, OffsetAndMetadata> offsetsOpt;
|
||||||
|
|
||||||
|
public RetryCommitCallback(
|
||||||
|
Consumer<byte[], byte[]> consumer,
|
||||||
|
Map<TopicPartition, OffsetAndMetadata> offsetsOpt
|
||||||
|
) {
|
||||||
|
this.consumer = consumer;
|
||||||
|
this.offsetsOpt = offsetsOpt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
|
||||||
|
if (exception instanceof RetriableCommitFailedException) {
|
||||||
|
sendAsyncCommit();
|
||||||
|
} else {
|
||||||
|
isComplete = true;
|
||||||
|
error = Optional.ofNullable(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void sendAsyncCommit() {
|
||||||
|
consumer.commitAsync(offsetsOpt, this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CountConsumerCommitCallback implements OffsetCommitCallback {
|
||||||
|
private int successCount = 0;
|
||||||
|
private Optional<Exception> lastError = Optional.empty();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
|
||||||
|
if (exception == null) {
|
||||||
|
successCount += 1;
|
||||||
|
} else {
|
||||||
|
lastError = Optional.of(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void changeConsumerSubscriptionAndValidateAssignment(
|
||||||
|
Consumer<byte[], byte[]> consumer,
|
||||||
|
List<String> topicsToSubscribe,
|
||||||
|
Set<TopicPartition> expectedAssignment,
|
||||||
|
ConsumerRebalanceListener rebalanceListener
|
||||||
|
) throws InterruptedException {
|
||||||
|
consumer.subscribe(topicsToSubscribe, rebalanceListener);
|
||||||
|
awaitAssignment(consumer, expectedAssignment);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,371 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
|
|
||||||
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
|
||||||
* License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
|
||||||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations under the License.
|
|
||||||
*/
|
|
||||||
package kafka.api
|
|
||||||
|
|
||||||
import kafka.utils.{TestInfoUtils, TestUtils}
|
|
||||||
import org.apache.kafka.clients.consumer._
|
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord
|
|
||||||
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
|
|
||||||
import org.apache.kafka.common.TopicPartition
|
|
||||||
import org.apache.kafka.test.MockConsumerInterceptor
|
|
||||||
import org.junit.jupiter.api.Assertions._
|
|
||||||
import org.junit.jupiter.api.Timeout
|
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
|
||||||
import org.junit.jupiter.params.provider.MethodSource
|
|
||||||
|
|
||||||
import java.time.Duration
|
|
||||||
import java.util
|
|
||||||
import java.util.Optional
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Integration tests for the consumer that covers the logic related to committing offsets.
|
|
||||||
*/
|
|
||||||
@Timeout(600)
|
|
||||||
class PlaintextConsumerCommitTest extends AbstractConsumerTest {
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testAutoCommitOnClose(groupProtocol: String): Unit = {
|
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
|
|
||||||
val consumer = createConsumer()
|
|
||||||
|
|
||||||
val numRecords = 10000
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords, tp)
|
|
||||||
|
|
||||||
consumer.subscribe(List(topic).asJava)
|
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
|
||||||
|
|
||||||
// should auto-commit sought positions before closing
|
|
||||||
consumer.seek(tp, 300)
|
|
||||||
consumer.seek(tp2, 500)
|
|
||||||
consumer.close()
|
|
||||||
|
|
||||||
// now we should see the committed positions from another consumer
|
|
||||||
val anotherConsumer = createConsumer()
|
|
||||||
assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testAutoCommitOnCloseAfterWakeup(groupProtocol: String): Unit = {
|
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
|
|
||||||
val consumer = createConsumer()
|
|
||||||
|
|
||||||
val numRecords = 10000
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords, tp)
|
|
||||||
|
|
||||||
consumer.subscribe(List(topic).asJava)
|
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
|
||||||
|
|
||||||
// should auto-commit sought positions before closing
|
|
||||||
consumer.seek(tp, 300)
|
|
||||||
consumer.seek(tp2, 500)
|
|
||||||
|
|
||||||
// wakeup the consumer before closing to simulate trying to break a poll
|
|
||||||
// loop from another thread
|
|
||||||
consumer.wakeup()
|
|
||||||
consumer.close()
|
|
||||||
|
|
||||||
// now we should see the committed positions from another consumer
|
|
||||||
val anotherConsumer = createConsumer()
|
|
||||||
assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testCommitMetadata(groupProtocol: String): Unit = {
|
|
||||||
val consumer = createConsumer()
|
|
||||||
consumer.assign(List(tp).asJava)
|
|
||||||
|
|
||||||
// sync commit
|
|
||||||
val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
|
|
||||||
consumer.commitSync(Map((tp, syncMetadata)).asJava)
|
|
||||||
assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
|
|
||||||
|
|
||||||
// async commit
|
|
||||||
val asyncMetadata = new OffsetAndMetadata(10, "bar")
|
|
||||||
sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
|
|
||||||
assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
|
|
||||||
|
|
||||||
// handle null metadata
|
|
||||||
val nullMetadata = new OffsetAndMetadata(5, null)
|
|
||||||
consumer.commitSync(Map(tp -> nullMetadata).asJava)
|
|
||||||
assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testAsyncCommit(groupProtocol: String): Unit = {
|
|
||||||
val consumer = createConsumer()
|
|
||||||
consumer.assign(List(tp).asJava)
|
|
||||||
|
|
||||||
val callback = new CountConsumerCommitCallback
|
|
||||||
val count = 5
|
|
||||||
|
|
||||||
for (i <- 1 to count)
|
|
||||||
consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)
|
|
||||||
|
|
||||||
TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || callback.lastError.isDefined,
|
|
||||||
"Failed to observe commit callback before timeout", waitTimeMs = 10000)
|
|
||||||
|
|
||||||
assertEquals(None, callback.lastError)
|
|
||||||
assertEquals(count, callback.successCount)
|
|
||||||
assertEquals(new OffsetAndMetadata(count), consumer.committed(Set(tp).asJava).get(tp))
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testAutoCommitIntercept(groupProtocol: String): Unit = {
|
|
||||||
val topic2 = "topic2"
|
|
||||||
createTopic(topic2, 2, brokerCount)
|
|
||||||
|
|
||||||
// produce records
|
|
||||||
val numRecords = 100
|
|
||||||
val testProducer = createProducer(keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
|
|
||||||
(0 until numRecords).map { i =>
|
|
||||||
testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key $i", s"value $i"))
|
|
||||||
}.foreach(_.get)
|
|
||||||
|
|
||||||
// create consumer with interceptor
|
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
|
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
|
|
||||||
val testConsumer = createConsumer(keyDeserializer = new StringDeserializer, valueDeserializer = new StringDeserializer)
|
|
||||||
val rebalanceListener = new ConsumerRebalanceListener {
|
|
||||||
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
|
|
||||||
// keep partitions paused in this test so that we can verify the commits based on specific seeks
|
|
||||||
testConsumer.pause(partitions)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {}
|
|
||||||
}
|
|
||||||
changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic), Set(tp, tp2), rebalanceListener)
|
|
||||||
testConsumer.seek(tp, 10)
|
|
||||||
testConsumer.seek(tp2, 20)
|
|
||||||
|
|
||||||
// change subscription to trigger rebalance
|
|
||||||
val commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
|
|
||||||
changeConsumerSubscriptionAndValidateAssignment(testConsumer,
|
|
||||||
List(topic, topic2),
|
|
||||||
Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)),
|
|
||||||
rebalanceListener)
|
|
||||||
|
|
||||||
// after rebalancing, we should have reset to the committed positions
|
|
||||||
assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
|
|
||||||
// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close.
|
|
||||||
// However, in the CONSUMER protocol, the assignment may be changed outside of a poll, so
|
|
||||||
// we need to poll once to ensure the interceptor is called.
|
|
||||||
if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) {
|
|
||||||
testConsumer.poll(Duration.ZERO)
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance)
|
|
||||||
|
|
||||||
// verify commits are intercepted on close
|
|
||||||
val commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
|
|
||||||
testConsumer.close()
|
|
||||||
assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeClose)
|
|
||||||
testProducer.close()
|
|
||||||
|
|
||||||
// cleanup
|
|
||||||
MockConsumerInterceptor.resetCounters()
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testCommitSpecifiedOffsets(groupProtocol: String): Unit = {
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords = 5, tp)
|
|
||||||
sendRecords(producer, numRecords = 7, tp2)
|
|
||||||
|
|
||||||
val consumer = createConsumer()
|
|
||||||
consumer.assign(List(tp, tp2).asJava)
|
|
||||||
|
|
||||||
val pos1 = consumer.position(tp)
|
|
||||||
val pos2 = consumer.position(tp2)
|
|
||||||
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava)
|
|
||||||
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertNull(consumer.committed(Set(tp2).asJava).get(tp2))
|
|
||||||
|
|
||||||
// Positions should not change
|
|
||||||
assertEquals(pos1, consumer.position(tp))
|
|
||||||
assertEquals(pos2, consumer.position(tp2))
|
|
||||||
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
|
|
||||||
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
|
|
||||||
// Using async should pick up the committed changes after commit completes
|
|
||||||
sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L))))
|
|
||||||
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testAutoCommitOnRebalance(groupProtocol: String): Unit = {
|
|
||||||
val topic2 = "topic2"
|
|
||||||
createTopic(topic2, 2, brokerCount)
|
|
||||||
|
|
||||||
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
|
|
||||||
val consumer = createConsumer()
|
|
||||||
|
|
||||||
val numRecords = 10000
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords, tp)
|
|
||||||
|
|
||||||
val rebalanceListener = new ConsumerRebalanceListener {
|
|
||||||
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
|
|
||||||
// keep partitions paused in this test so that we can verify the commits based on specific seeks
|
|
||||||
consumer.pause(partitions)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {}
|
|
||||||
}
|
|
||||||
|
|
||||||
consumer.subscribe(List(topic).asJava, rebalanceListener)
|
|
||||||
|
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
|
||||||
|
|
||||||
consumer.seek(tp, 300)
|
|
||||||
consumer.seek(tp2, 500)
|
|
||||||
|
|
||||||
// change subscription to trigger rebalance
|
|
||||||
consumer.subscribe(List(topic, topic2).asJava, rebalanceListener)
|
|
||||||
|
|
||||||
val newAssignment = Set(tp, tp2, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
|
|
||||||
awaitAssignment(consumer, newAssignment)
|
|
||||||
|
|
||||||
// after rebalancing, we should have reset to the committed positions
|
|
||||||
assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testSubscribeAndCommitSync(groupProtocol: String): Unit = {
|
|
||||||
// This test ensure that the member ID is propagated from the group coordinator when the
|
|
||||||
// assignment is received into a subsequent offset commit
|
|
||||||
val consumer = createConsumer()
|
|
||||||
assertEquals(0, consumer.assignment.size)
|
|
||||||
consumer.subscribe(List(topic).asJava)
|
|
||||||
awaitAssignment(consumer, Set(tp, tp2))
|
|
||||||
|
|
||||||
consumer.seek(tp, 0)
|
|
||||||
|
|
||||||
consumer.commitSync()
|
|
||||||
}
|
|
||||||
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
|
||||||
def testPositionAndCommit(groupProtocol: String): Unit = {
|
|
||||||
val producer = createProducer()
|
|
||||||
var startingTimestamp = System.currentTimeMillis()
|
|
||||||
sendRecords(producer, numRecords = 5, tp, startingTimestamp = startingTimestamp)
|
|
||||||
|
|
||||||
val topicPartition = new TopicPartition(topic, 15)
|
|
||||||
val consumer = createConsumer()
|
|
||||||
assertNull(consumer.committed(Set(topicPartition).asJava).get(topicPartition))
|
|
||||||
|
|
||||||
// position() on a partition that we aren't subscribed to throws an exception
|
|
||||||
assertThrows(classOf[IllegalStateException], () => consumer.position(topicPartition))
|
|
||||||
|
|
||||||
consumer.assign(List(tp).asJava)
|
|
||||||
|
|
||||||
assertEquals(0L, consumer.position(tp), "position() on a partition that we are subscribed to should reset the offset")
|
|
||||||
consumer.commitSync()
|
|
||||||
assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0, startingTimestamp = startingTimestamp)
|
|
||||||
assertEquals(5L, consumer.position(tp), "After consuming 5 records, position should be 5")
|
|
||||||
consumer.commitSync()
|
|
||||||
assertEquals(5L, consumer.committed(Set(tp).asJava).get(tp).offset, "Committed offset should be returned")
|
|
||||||
|
|
||||||
startingTimestamp = System.currentTimeMillis()
|
|
||||||
sendRecords(producer, numRecords = 1, tp, startingTimestamp = startingTimestamp)
|
|
||||||
|
|
||||||
// another consumer in the same group should get the same position
|
|
||||||
val otherConsumer = createConsumer()
|
|
||||||
otherConsumer.assign(List(tp).asJava)
|
|
||||||
consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
|
|
||||||
def testCommitAsyncCompletedBeforeConsumerCloses(groupProtocol: String): Unit = {
|
|
||||||
// This is testing the contract that asynchronous offset commit are completed before the consumer
|
|
||||||
// is closed, even when no commit sync is performed as part of the close (due to auto-commit
|
|
||||||
// disabled, or simply because there are no consumed offsets).
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords = 3, tp)
|
|
||||||
sendRecords(producer, numRecords = 3, tp2)
|
|
||||||
|
|
||||||
val consumer = createConsumer()
|
|
||||||
consumer.assign(List(tp, tp2).asJava)
|
|
||||||
|
|
||||||
// Try without looking up the coordinator first
|
|
||||||
val cb = new CountConsumerCommitCallback
|
|
||||||
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb)
|
|
||||||
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb)
|
|
||||||
consumer.close()
|
|
||||||
assertEquals(2, cb.successCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: This only works in the new consumer, but should be fixed for the old consumer as well
|
|
||||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
|
||||||
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
|
|
||||||
def testCommitAsyncCompletedBeforeCommitSyncReturns(groupProtocol: String): Unit = {
|
|
||||||
// This is testing the contract that asynchronous offset commits sent previously with the
|
|
||||||
// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of
|
|
||||||
// `commitSync` (given that it does not time out).
|
|
||||||
val producer = createProducer()
|
|
||||||
sendRecords(producer, numRecords = 3, tp)
|
|
||||||
sendRecords(producer, numRecords = 3, tp2)
|
|
||||||
|
|
||||||
val consumer = createConsumer()
|
|
||||||
consumer.assign(List(tp, tp2).asJava)
|
|
||||||
|
|
||||||
// Try without looking up the coordinator first
|
|
||||||
val cb = new CountConsumerCommitCallback
|
|
||||||
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb)
|
|
||||||
consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
|
|
||||||
assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(1, cb.successCount)
|
|
||||||
|
|
||||||
// Try with coordinator known
|
|
||||||
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb)
|
|
||||||
consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava)
|
|
||||||
assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
assertEquals(2, cb.successCount)
|
|
||||||
|
|
||||||
// Try with empty sync commit
|
|
||||||
consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb)
|
|
||||||
consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
|
|
||||||
assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
|
|
||||||
assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
|
|
||||||
assertEquals(3, cb.successCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: Consumer[K, V],
|
|
||||||
topicsToSubscribe: List[String],
|
|
||||||
expectedAssignment: Set[TopicPartition],
|
|
||||||
rebalanceListener: ConsumerRebalanceListener): Unit = {
|
|
||||||
consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
|
|
||||||
awaitAssignment(consumer, expectedAssignment)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue