MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord (#15802)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-04-27 07:11:10 +08:00 committed by GitHub
parent d88c15fc3e
commit c287ad5dbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 78 additions and 58 deletions

View File

@ -38,7 +38,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
@ -51,9 +50,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
@ -65,8 +64,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
}) })
@ExtendWith(ClusterTestExtensions.class) @ExtendWith(ClusterTestExtensions.class)
public class DeleteOffsetsConsumerGroupCommandIntegrationTest { public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
public static final String TOPIC = "foo"; public static final String TOPIC_PREFIX = "foo.";
public static final String GROUP = "test.group"; public static final String GROUP_PREFIX = "test.group.";
private final ClusterInstance clusterInstance; private final ClusterInstance clusterInstance;
private final Iterable<Map<String, Object>> consumerConfigs; private final Iterable<Map<String, Object>> consumerConfigs;
@ -91,65 +90,89 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
createTopic(TOPIC); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig); String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
removeTopic(TOPIC); createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
removeTopic(topic);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() { public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
createTopic(TOPIC); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerConfig); String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
removeTopic(TOPIC); createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
removeTopic(topic);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() { public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
testWithConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerConfig); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() { public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
testWithConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerConfig); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() { public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
createTopic(TOPIC); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false, consumerConfig); String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
removeTopic(TOPIC); createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.NONE);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
removeTopic(topic);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() { public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
createTopic(TOPIC); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false, consumerConfig); String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
removeTopic(TOPIC); createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.NONE);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
removeTopic(topic);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() { public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
testWithConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, false, consumerConfig); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
} }
} }
@ClusterTest @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() { public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
for (Map<String, Object> consumerConfig : consumerConfigs) { for (Map<String, Object> consumerConfig: consumerConfigs) {
testWithConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION, false, consumerConfig); String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
} }
} }
@ -169,17 +192,15 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
); );
} }
private void testWithConsumerGroup(String inputTopic, private Runnable getValidateRunnable(String inputTopic,
int inputPartition, String inputGroup,
int expectedPartition, int inputPartition,
Errors expectedError, int expectedPartition,
boolean isStable, Errors expectedError) {
Map<String, Object> consumerConfig) { return () -> {
produceRecord();
this.withConsumerGroup(() -> {
String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic; String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic;
try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(GROUP, topic))) { try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(inputGroup, topic))) {
Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(GROUP, Collections.singletonList(topic)); Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(inputGroup, Collections.singletonList(topic));
Errors topLevelError = res.getKey(); Errors topLevelError = res.getKey();
Map<TopicPartition, Throwable> partitions = res.getValue(); Map<TopicPartition, Throwable> partitions = res.getValue();
TopicPartition tp = new TopicPartition(inputTopic, expectedPartition); TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
@ -192,32 +213,31 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
else else
assertEquals(expectedError.exception(), partitions.get(tp).getCause()); assertEquals(expectedError.exception(), partitions.get(tp).getCause());
} }
}, isStable, consumerConfig); };
} }
private void testWithConsumerGroup(String inputTopic,
private void produceRecord() { String inputGroup,
KafkaProducer<byte[], byte[]> producer = createProducer(); Map<String, Object> consumerConfig,
try { boolean isStable,
producer.send(new ProducerRecord<>(TOPIC, 0, null, null)).get(); Runnable validateRunnable) {
} catch (ExecutionException | InterruptedException e) { produceRecord(inputTopic);
throw new RuntimeException(e); try (Consumer<byte[], byte[]> consumer = createConsumer(inputGroup, consumerConfig)) {
} finally { consumer.subscribe(Collections.singletonList(inputTopic));
Utils.closeQuietly(producer, "producer");
}
}
private void withConsumerGroup(Runnable body, boolean isStable, Map<String, Object> consumerConfig) {
try (Consumer<byte[], byte[]> consumer = createConsumer(consumerConfig)) {
consumer.subscribe(Collections.singletonList(TOPIC));
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
Assertions.assertNotEquals(0, records.count()); Assertions.assertNotEquals(0, records.count());
consumer.commitSync(); consumer.commitSync();
if (isStable) { if (isStable) {
body.run(); validateRunnable.run();
} }
} }
if (!isStable) { if (!isStable) {
body.run(); validateRunnable.run();
}
}
private void produceRecord(String topic) {
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
assertDoesNotThrow(() -> producer.send(new ProducerRecord<>(topic, 0, null, null)).get());
} }
} }
@ -231,11 +251,11 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
return new KafkaProducer<>(config); return new KafkaProducer<>(config);
} }
private Consumer<byte[], byte[]> createConsumer(Map<String, Object> config) { private Consumer<byte[], byte[]> createConsumer(String group, Map<String, Object> config) {
Map<String, Object> consumerConfig = new HashMap<>(config); Map<String, Object> consumerConfig = new HashMap<>(config);
consumerConfig.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); consumerConfig.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, GROUP); consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, group);
consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); consumerConfig.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
// Increase timeouts to avoid having a rebalance during the test // Increase timeouts to avoid having a rebalance during the test