diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 7f2792bcc53..651c3033f29 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static org.apache.kafka.clients.ClientsTestUtils.TestClusterResourceListenerDeserializer.UPDATE_CONSUMER_COUNT; import static org.apache.kafka.clients.ClientsTestUtils.TestClusterResourceListenerSerializer.UPDATE_PRODUCER_COUNT; @@ -107,6 +108,17 @@ public class ClientsTestUtils { ); } + public static void pollUntilTrue( + Consumer consumer, + Supplier testCondition, + long waitTimeMs, String msg + ) throws InterruptedException { + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)); + return testCondition.get(); + }, waitTimeMs, msg); + } + public static void consumeAndVerifyRecords( Consumer consumer, TopicPartition tp, diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java new file mode 100644 index 00000000000..b12b6dacb4d --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the consumer that covers logic related to manual assignment. + */ +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = PlaintextConsumerAssignTest.BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerAssignTest { + + public static final int BROKER_COUNT = 3; + + private final ClusterInstance clusterInstance; + private final String topic = "topic"; + private final int partition = 0; + TopicPartition tp = new TopicPartition(topic, partition); + + PlaintextConsumerAssignTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT); + } + + @ClusterTest + public void testClassicAssignAndCommitAsyncNotCommitted() throws Exception { + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndCommitAsyncNotCommitted() throws Exception { + testAssignAndCommitAsyncNotCommitted(GroupProtocol.CONSUMER); + } + + private void testAssignAndCommitAsyncNotCommitted(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 10000; + long startingTimestamp = System.currentTimeMillis(); + CountConsumerCommitCallback cb = new CountConsumerCommitCallback(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + consumer.commitAsync(cb); + ClientsTestUtils.pollUntilTrue(consumer, () -> cb.successCount >= 1 || cb.lastError.isPresent(), + 10000, "Failed to observe commit callback before timeout"); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)); + assertTrue(consumer.assignment().contains(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndCommitSyncNotCommitted() throws Exception { + testAssignAndCommitSyncNotCommitted(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndCommitSyncNotCommitted() { + testAssignAndCommitSyncNotCommitted(GroupProtocol.CONSUMER); + } + + private void testAssignAndCommitSyncNotCommitted(GroupProtocol groupProtocol) { + int numRecords = 10000; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + consumer.commitSync(); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to + // tp. The committed offset should be null. This is intentional. + assertNull(committedOffset.get(tp)); + assertTrue(consumer.assignment().contains(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndCommitSyncAllConsumed() throws Exception { + testAssignAndCommitSyncAllConsumed(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndCommitSyncAllConsumed() throws Exception { + testAssignAndCommitSyncAllConsumed(GroupProtocol.CONSUMER); + } + + private void testAssignAndCommitSyncAllConsumed(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 10000; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); + + consumer.commitSync(); + Map committedOffset = consumer.committed(Set.of(tp)); + assertNotNull(committedOffset); + assertNotNull(committedOffset.get(tp)); + assertEquals(numRecords, committedOffset.get(tp).offset()); + } + } + + @ClusterTest + public void testClassicAssignAndConsume() throws InterruptedException { + testAssignAndConsume(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndConsume() throws InterruptedException { + testAssignAndConsume(GroupProtocol.CONSUMER); + } + + private void testAssignAndConsume(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 10; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); + + assertEquals(numRecords, consumer.position(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndConsumeSkippingPosition() throws InterruptedException { + testAssignAndConsumeSkippingPosition(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndConsumeSkippingPosition() throws InterruptedException { + testAssignAndConsumeSkippingPosition(GroupProtocol.CONSUMER); + } + + private void testAssignAndConsumeSkippingPosition(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 10; + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + long startingTimestamp = System.currentTimeMillis(); + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + int offset = 1; + consumer.seek(tp, offset); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); + + assertEquals(numRecords, consumer.position(tp)); + } + } + + @ClusterTest + public void testClassicAssignAndFetchCommittedOffsets() throws InterruptedException { + testAssignAndFetchCommittedOffsets(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndFetchCommittedOffsets() throws InterruptedException { + testAssignAndFetchCommittedOffsets(GroupProtocol.CONSUMER); + } + + private void testAssignAndFetchCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 100; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + // First consumer consumes and commits offsets + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); + consumer.commitSync(); + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + } + + // We should see the committed offsets from another consumer + try (Consumer anotherConsumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"))) { + anotherConsumer.assign(List.of(tp)); + assertEquals(numRecords, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + } + } + + @ClusterTest + public void testClassicAssignAndConsumeFromCommittedOffsets() throws InterruptedException { + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndConsumeFromCommittedOffsets() throws InterruptedException { + testAssignAndConsumeFromCommittedOffsets(GroupProtocol.CONSUMER); + } + + private void testAssignAndConsumeFromCommittedOffsets(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 100; + int offset = 10; + long startingTimestamp = System.currentTimeMillis(); + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name, GROUP_ID_CONFIG, "group1"); + + try (Consumer consumer = clusterInstance.consumer(config)) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + consumer.commitSync(Map.of(tp, new OffsetAndMetadata(offset))); + assertEquals(offset, consumer.committed(Set.of(tp)).get(tp).offset()); + } + + // We should see the committed offsets from another consumer + try (Consumer anotherConsumer = clusterInstance.consumer(config)) { + assertEquals(offset, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + anotherConsumer.assign(List.of(tp)); + ClientsTestUtils.consumeAndVerifyRecords(anotherConsumer, tp, numRecords - offset, offset, offset, startingTimestamp + offset); + } + } + + @ClusterTest + public void testClassicAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncAssignAndRetrievingCommittedOffsetsMultipleTimes() throws InterruptedException { + testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol.CONSUMER); + } + + private void testAssignAndRetrievingCommittedOffsetsMultipleTimes(GroupProtocol groupProtocol) throws InterruptedException { + int numRecords = 100; + long startingTimestamp = System.currentTimeMillis(); + + try (Consumer consumer = clusterInstance.consumer(Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name))) { + ClientsTestUtils.sendRecords(clusterInstance, tp, numRecords, startingTimestamp); + consumer.assign(List.of(tp)); + + // Consume and commit offsets + consumer.seek(tp, 0); + ClientsTestUtils.consumeAndVerifyRecords(consumer, tp, numRecords, 0, 0, startingTimestamp); + consumer.commitSync(); + + // Check committed offsets twice with same consumer + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(numRecords, consumer.committed(Set.of(tp)).get(tp).offset()); + } + } + + private static class CountConsumerCommitCallback implements OffsetCommitCallback { + int successCount = 0; + int failCount = 0; + Optional lastError = Optional.empty(); + + public void onComplete(Map offsets, Exception exception) { + if (exception == null) { + successCount += 1; + } else { + failCount += 1; + lastError = Optional.of(exception); + } + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala deleted file mode 100644 index 6673a1e0334..00000000000 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerAssignTest.scala +++ /dev/null @@ -1,203 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import kafka.utils.{TestInfoUtils, TestUtils} -import java.util.Properties -import org.apache.kafka.clients.consumer._ -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -/** - * Integration tests for the consumer that covers logic related to manual assignment. - */ -@Timeout(600) -class PlaintextConsumerAssignTest extends AbstractConsumerTest { - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitAsyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - val cb = new CountConsumerCommitCallback - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(java.util.List.of(tp)) - consumer.commitAsync(cb) - TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, - "Failed to observe commit callback before timeout", waitTimeMs = 10000) - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncNotCommitted(groupProtocol: String): Unit = { - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - val producer = createProducer() - val numRecords = 10000 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - consumer.assign(java.util.List.of(tp)) - consumer.commitSync() - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - // No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to - // tp. The committed offset should be null. This is intentional. - assertNull(committedOffset.get(tp)) - assertTrue(consumer.assignment.contains(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { - val numRecords = 10000 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - consumer.commitSync() - val committedOffset = consumer.committed(java.util.Set.of(tp)) - assertNotNull(committedOffset) - assertNotNull(committedOffset.get(tp)) - assertEquals(numRecords, committedOffset.get(tp).offset()) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsume(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { - val numRecords = 10 - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - consumer.assign(java.util.List.of(tp)) - val offset = 1 - consumer.seek(tp, offset) - consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset = offset, - startingKeyAndValueIndex = offset, startingTimestamp = startingTimestamp + offset) - - assertEquals(numRecords, consumer.position(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndFetchCommittedOffsets(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - // First consumer consumes and commits offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - // We should see the committed offsets from another consumer - val anotherConsumer = createConsumer(configOverrides = props) - anotherConsumer.assign(java.util.List.of(tp)) - assertEquals(numRecords, anotherConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndConsumeFromCommittedOffsets(groupProtocol: String): Unit = { - val producer = createProducer() - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = numRecords, tp, startingTimestamp = startingTimestamp) - - // Commit offset with first consumer - val props = new Properties() - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1") - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - val offset = 10 - consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(offset))) - assertEquals(offset, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - consumer.close() - - // Consume from committed offsets with another consumer in same group - val anotherConsumer = createConsumer(configOverrides = props) - assertEquals(offset, anotherConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - anotherConsumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = anotherConsumer, numRecords - offset, - startingOffset = offset, startingKeyAndValueIndex = offset, - startingTimestamp = startingTimestamp + offset) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAssignAndRetrievingCommittedOffsetsMultipleTimes(groupProtocol: String): Unit = { - val numRecords = 100 - val startingTimestamp = System.currentTimeMillis() - val producer = createProducer() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val props = new Properties() - val consumer = createConsumer(configOverrides = props) - consumer.assign(java.util.List.of(tp)) - - // Consume and commit offsets - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, - startingTimestamp = startingTimestamp) - consumer.commitSync() - - // Check committed offsets twice with same consumer - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - assertEquals(numRecords, consumer.committed(java.util.Set.of(tp)).get(tp).offset) - } - -}