KAFKA-15729 Add KRaft support in GetOffsetShellTest (#15489)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Owen Leung 2024-04-14 21:13:49 +08:00 committed by GitHub
parent 0b4e9afee2
commit 437ebb941e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 74 additions and 33 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kafka.tools;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
@ -25,16 +26,19 @@ import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -46,33 +50,29 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@ExtendWith(value = ClusterTestExtensions.class)
@ClusterTestDefaults(clusterType = Type.ZK)
@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4")
})
@Tag("integration")
public class GetOffsetShellTest {
private final int topicCount = 4;
private final int offsetTopicPartitionCount = 4;
private final ClusterInstance cluster;
private final String topicName = "topic";
public GetOffsetShellTest(ClusterInstance cluster) {
this.cluster = cluster;
}
private String getTopicName(int i) {
return topicName + i;
}
@BeforeEach
public void before() {
cluster.config().serverProperties().put("auto.create.topics.enable", false);
cluster.config().serverProperties().put("offsets.topic.replication.factor", "1");
cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount));
return "topic" + i;
}
private void setUp() {
try (Admin admin = Admin.create(cluster.config().adminClientProperties())) {
try (Admin admin = cluster.createAdminClient()) {
List<NewTopic> topics = new ArrayList<>();
IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1)));
@ -81,28 +81,53 @@ public class GetOffsetShellTest {
}
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers"));
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
IntStream.range(0, topicCount + 1)
.forEach(i -> IntStream.range(0, i * i)
.forEach(msgCount -> producer.send(
new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)))
.forEach(msgCount -> {
assertDoesNotThrow(() -> producer.send(
new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)).get());
})
);
}
}
static class Row {
private String name;
private int partition;
private Long timestamp;
private void createConsumerAndPoll() {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
public Row(String name, int partition, Long timestamp) {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
List<String> topics = new ArrayList<>();
for (int i = 0; i < topicCount + 1; i++) {
topics.add(getTopicName(i));
}
consumer.subscribe(topics);
consumer.poll(Duration.ofMillis(1000));
}
}
static class Row {
private final String name;
private final int partition;
private final Long offset;
public Row(String name, int partition, Long offset) {
this.name = name;
this.partition = partition;
this.timestamp = timestamp;
this.offset = offset;
}
@Override
public String toString() {
return "Row[name:" + name + ",partition:" + partition + ",offset:" + offset;
}
@Override
@ -113,12 +138,12 @@ public class GetOffsetShellTest {
Row r = (Row) o;
return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp);
return name.equals(r.name) && partition == r.partition && Objects.equals(offset, r.offset);
}
@Override
public int hashCode() {
return Objects.hash(name, partition, timestamp);
return Objects.hash(name, partition, offset);
}
}
@ -127,8 +152,11 @@ public class GetOffsetShellTest {
setUp();
List<Row> output = executeAndParse();
if (!cluster.isKRaftTest()) {
assertEquals(expectedOffsetsWithInternal(), output);
} else {
assertEquals(expectedTestTopicOffsets(), output);
}
}
@ClusterTest
@ -148,7 +176,8 @@ public class GetOffsetShellTest {
List<Row> offsets = executeAndParse("--topic", getTopicName(i));
assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + getTopicName(i));
});
}
);
}
@ClusterTest
@ -165,8 +194,11 @@ public class GetOffsetShellTest {
setUp();
List<Row> offsets = executeAndParse("--partitions", "0,1");
if (!cluster.isKRaftTest()) {
assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
} else {
assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets);
}
}
@ClusterTest
@ -182,6 +214,8 @@ public class GetOffsetShellTest {
public void testTopicPartitionsArg() {
setUp();
createConsumerAndPoll();
List<Row> offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3");
List<Row> expected = Arrays.asList(
new Row("__consumer_offsets", 3, 0L),
@ -236,7 +270,7 @@ public class GetOffsetShellTest {
List<Row> offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time);
offsets.forEach(
row -> assertTrue(row.timestamp >= 0 && row.timestamp <= Integer.parseInt(row.name.replace("topic", "")))
row -> assertTrue(row.offset >= 0 && row.offset <= Integer.parseInt(row.name.replace("topic", "")))
);
}
}
@ -288,6 +322,8 @@ public class GetOffsetShellTest {
public void testTopicPartitionsArgWithInternalIncluded() {
setUp();
createConsumerAndPoll();
List<Row> offsets = executeAndParse("--topic-partitions", "__.*:0");
assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets);
@ -320,8 +356,13 @@ public class GetOffsetShellTest {
@ClusterTest
public void testPrintHelp() {
Exit.setExitProcedure((statusCode, message) -> { });
try {
String out = ToolsTestUtils.captureStandardErr(() -> GetOffsetShell.mainNoExit("--help"));
assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT));
} finally {
Exit.resetExitProcedure();
}
}
@ClusterTest
@ -351,7 +392,7 @@ public class GetOffsetShellTest {
}
private List<Row> expectedOffsetsWithInternal() {
List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount)
List<Row> consOffsets = IntStream.range(0, 4)
.mapToObj(i -> new Row("__consumer_offsets", i, 0L))
.collect(Collectors.toList());