KAFKA-18914 Migrate ConsumerRebootstrapTest to use new test infra (#19154)

Migrate ConsumerRebootstrapTest to the new test infra and remove the old
Scala test.

The PR changed three things.
* Migrated `ConsumerRebootstrapTest` to new test infra and removed the
old Scala test.
* Updated the original test case to cover rebootstrap scenarios.
* Integrated `ConsumerRebootstrapTest` into `ClientRebootstrapTest` in
the `client-integration-tests` module.
* Removed the `RebootstrapTest.scala`.

Default `ConsumerRebootstrap` config:
> properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap");

properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"300000");

properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"10000");

properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"30000");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L");
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000L");

The test case for the consumer with enabled rebootstrap
![Screenshot 2025-03-22 at 9 48
13 PM](https://github.com/user-attachments/assets/8470549f-a24c-43fa-ae44-789cbf422a63)


The test case for the consumer with disabled rebootstrap
![Screenshot 2025-03-22 at 9 47
22 PM](https://github.com/user-attachments/assets/0a183464-6a74-449f-8e71-d641a6ea5bb1)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
ClarkChen 2025-03-26 01:53:42 +08:00 committed by GitHub
parent 80d99ea2ba
commit 1547204baa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 129 additions and 219 deletions

View File

@ -17,9 +17,14 @@
package org.apache.kafka.clients;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
@ -38,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ClientRebootstrapTest {
private static final String TOPIC = "topic";
private static final int PARTITIONS = 1;
private static final int REPLICAS = 2;
@ClusterTest(
@ -55,7 +61,7 @@ public class ClientRebootstrapTest {
clusterInstance.shutdownBroker(broker0);
try (var admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) REPLICAS)));
// Only the broker 1 is available for the admin client during the bootstrap.
assertDoesNotThrow(() -> admin.listTopics().names().get(timeout, TimeUnit.SECONDS).contains(TOPIC));
@ -84,7 +90,7 @@ public class ClientRebootstrapTest {
clusterInstance.shutdownBroker(broker0);
var admin = clusterInstance.admin(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none"));
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) REPLICAS)));
// Only the broker 1 is available for the admin client during the bootstrap.
assertDoesNotThrow(() -> admin.listTopics().names().get(60, TimeUnit.SECONDS).contains(TOPIC));
@ -109,7 +115,7 @@ public class ClientRebootstrapTest {
)
public void testProducerRebootstrap(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
try (var admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) REPLICAS)));
}
var broker0 = 0;
@ -144,7 +150,7 @@ public class ClientRebootstrapTest {
)
public void testProducerRebootstrapDisabled(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
try (var admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
admin.createTopics(List.of(new NewTopic(TOPIC, PARTITIONS, (short) REPLICAS)));
}
var broker0 = 0;
@ -168,4 +174,123 @@ public class ClientRebootstrapTest {
// Since the brokers cached during the bootstrap are offline, the producer needs to wait the default timeout for other threads.
producer.close(Duration.ZERO);
}
public void consumerRebootstrap(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
var broker0 = 0;
var broker1 = 1;
var partitions = List.of(new TopicPartition(TOPIC, 0));
try (var producer = clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value 0".getBytes())).get();
assertEquals(0, recordMetadata.offset());
}
clusterInstance.shutdownBroker(broker0);
try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name))) {
// Only the server 1 is available for the consumer during the bootstrap.
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll data.");
// Bring back the server 0 and shut down 1.
clusterInstance.shutdownBroker(broker1);
clusterInstance.startBroker(broker0);
try (var producer = clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value 1".getBytes())).get();
assertEquals(1, recordMetadata.offset());
}
// The server 1 originally cached during the bootstrap, is offline.
// However, the server 0 from the bootstrap list is online.
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll data.");
}
}
@ClusterTest(
brokers = REPLICAS,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
})
public void testClassicConsumerRebootstrap(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
consumerRebootstrap(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest(
brokers = REPLICAS,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
})
public void testConsumerRebootstrap(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
consumerRebootstrap(clusterInstance, GroupProtocol.CONSUMER);
}
public void consumerRebootstrapDisabled(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws InterruptedException, ExecutionException {
clusterInstance.createTopic(TOPIC, PARTITIONS, (short) REPLICAS);
var broker0 = 0;
var broker1 = 1;
var tp = new TopicPartition(TOPIC, 0);
try (var producer = clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value 0".getBytes())).get();
assertEquals(0, recordMetadata.offset());
}
clusterInstance.shutdownBroker(broker0);
try (var consumer = clusterInstance.consumer(Map.of(
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none",
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name)
)) {
// Only the server 1 is available for the consumer during the bootstrap.
consumer.assign(List.of(tp));
consumer.seekToBeginning(List.of(tp));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1, 10 * 1000, "Failed to poll data.");
// Bring back the server 0 and shut down 1.
clusterInstance.shutdownBroker(broker1);
clusterInstance.startBroker(broker0);
try (var producer = clusterInstance.producer(Map.of(ProducerConfig.ACKS_CONFIG, "-1"))) {
var recordMetadata = producer.send(new ProducerRecord<>(TOPIC, "value 1".getBytes())).get();
assertEquals(1, recordMetadata.offset());
}
// The server 1 originally cached during the bootstrap, is offline.
// However, the server 0 from the bootstrap list is online.
assertEquals(0, consumer.poll(Duration.ofMillis(100)).count());
}
}
@ClusterTest(
brokers = REPLICAS,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
}
)
public void testClassicConsumerRebootstrapDisabled(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CLASSIC);
}
@ClusterTest(
brokers = REPLICAS,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
}
)
public void testConsumerRebootstrapDisabled(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
consumerRebootstrapDisabled(clusterInstance, GroupProtocol.CONSUMER);
}
}

View File

@ -1,146 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.api.ConsumerRebootstrapTest._
import kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersAll
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.time.Duration
import java.util.{Collections, stream}
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ConsumerRebootstrapTest extends RebootstrapTest {
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrap(quorum: String, groupProtocol: String, useRebootstrapTriggerMs: Boolean): Unit = {
sendRecords(10, 0)
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server1.shutdown()
server1.awaitShutdown()
val consumer = createConsumer(configOverrides = clientOverrides(useRebootstrapTriggerMs))
// Only the server 0 is available for the consumer during the bootstrap.
consumer.assign(Collections.singleton(tp))
consumeAndVerifyRecords(consumer, 10, 0)
// Bring back the server 1 and shut down 0.
server1.startup()
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server0.shutdown()
server0.awaitShutdown()
sendRecords(10, 10)
// The server 0, originally cached during the bootstrap, is offline.
// However, the server 1 from the bootstrap list is online.
// Should be able to consume records.
consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10, startingTimestamp = 10)
// Bring back the server 0 and shut down 1.
server0.startup()
TestUtils.waitUntilTrue(
() => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
"Timeout waiting for records to be replicated"
)
server1.shutdown()
server1.awaitShutdown()
sendRecords(10, 20)
// The same situation, but the server 1 has gone and server 0 is back.
consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, startingTimestamp = 20)
}
@Disabled
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrapDisabled(quorum: String, groupProtocol: String, useRebootstrapTriggerMs: Boolean): Unit = {
server1.shutdown()
server1.awaitShutdown()
val configOverrides = clientOverrides(useRebootstrapTriggerMs)
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none")
if (useRebootstrapTriggerMs)
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "1000")
val producer = createProducer(configOverrides = configOverrides)
val consumer = createConsumer(configOverrides = configOverrides)
val adminClient = createAdminClient(configOverrides = configOverrides)
// Only the server 0 is available during the bootstrap.
val recordMetadata0 = producer.send(new ProducerRecord(topic, part, 0L, "key 0".getBytes, "value 0".getBytes)).get(15, TimeUnit.SECONDS)
assertEquals(0, recordMetadata0.offset())
adminClient.listTopics().names().get(15, TimeUnit.SECONDS)
consumer.assign(Collections.singleton(tp))
consumeAndVerifyRecords(consumer, 1, 0)
server0.shutdown()
server0.awaitShutdown()
server1.startup()
assertThrows(classOf[TimeoutException], () => producer.send(new ProducerRecord(topic, part, "key 2".getBytes, "value 2".getBytes)).get(5, TimeUnit.SECONDS))
assertThrows(classOf[TimeoutException], () => adminClient.listTopics().names().get(5, TimeUnit.SECONDS))
val producer2 = createProducer(configOverrides = configOverrides)
producer2.send(new ProducerRecord(topic, part, 1L, "key 1".getBytes, "value 1".getBytes)).get(15, TimeUnit.SECONDS)
assertEquals(0, consumer.poll(Duration.ofSeconds(5)).count)
}
private def sendRecords(numRecords: Int, from: Int): Unit = {
val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
(from until (numRecords + from)).foreach { i =>
val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, s"key $i".getBytes, s"value $i".getBytes)
producer.send(record)
}
producer.flush()
producer.close()
}
}
object ConsumerRebootstrapTest {
final val RebootstrapTestName = s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
def rebootstrapTestParams: stream.Stream[Arguments] = {
getTestQuorumAndGroupProtocolParametersAll
.flatMap { baseArgs =>
stream.Stream.of(
Arguments.of((baseArgs.get :+ true):_*),
Arguments.of((baseArgs.get :+ false):_*)
)
}
}
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.server.{KafkaBroker, KafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import java.util.Properties
abstract class RebootstrapTest extends AbstractConsumerTest {
override def brokerCount: Int = 2
def server0: KafkaBroker = serverForId(0).get
def server1: KafkaBroker = serverForId(1).get
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.doSetup(testInfo, createOffsetsTopic = true)
// Enable unclean leader election for the test topic
val topicProps = new Properties
topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
// create the test topic with all the brokers as replicas
createTopic(topic, 2, brokerCount, adminClientConfig = this.adminClientConfig, topicConfig = topicProps)
}
override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, brokerCount.toString)
// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.
FixedPortTestUtils.createBrokerConfigs(brokerCount, enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}
def clientOverrides(useRebootstrapTriggerMs: Boolean): Properties = {
val overrides = new Properties()
if (useRebootstrapTriggerMs) {
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "5000")
} else {
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "3600000")
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000")
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "5000")
overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
}
overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap")
overrides
}
}