HOTFIX: make sure all ConsumerGroupServices get closed (#15801)

Some services are not closed, so our CI print following error.

org.opentest4j.AssertionFailedError: Found 16 unexpected threads during @BeforeAll: `kafka-admin-client-thread | adminclient-287,kafka-admin-client-thread | adminclient-276,kafka-admin-client-thread | adminclient-271,kafka-admin-client-thread | adminclient-293,kafka-admin-client-thread | adminclient-281,kafka-admin-client-thread | adminclient-302,kafka-admin-client-thread | adminclient-334,kafka-admin-client-thread | adminclient-323,kafka-admin-client-thread | adminclient-257,kafka-admin-client-thread | adminclient-336,kafka-admin-client-thread | adminclient-308,kafka-admin-client-thread | adminclient-263,kafka-admin-client-thread | adminclient-273,kafka-admin-client-thread | adminclient-278,kafka-admin-client-thread | adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: <true> but was: <false>

#15679 use AfterEach to release service. However, the test cases having multi consumerConfigs will create a lot of services in testing. Hence, the intermediate servers are not closed.

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
Chia-Ping Tsai 2024-04-25 14:23:34 +08:00 committed by GitHub
parent 864744ffd4
commit 4e23378aa8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 19 additions and 28 deletions

View File

@ -40,7 +40,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
@ -70,7 +69,6 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
public static final String GROUP = "test.group";
private final ClusterInstance clusterInstance;
private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
private final Iterable<Map<String, Object>> consumerConfigs;
DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) {
@ -81,21 +79,14 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
: Collections.singletonList(Collections.emptyMap());
}
@AfterEach
public void tearDown() {
if (consumerGroupService != null) {
consumerGroupService.close();
}
}
@ClusterTest
public void testDeleteOffsetsNonExistingGroup() {
String group = "missing.group";
String topic = "foo:1";
setupConsumerGroupService(getArgs(group, topic));
Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(group, topic))) {
Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
}
}
@ClusterTest
@ -171,8 +162,8 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
};
}
private void setupConsumerGroupService(String[] args) {
consumerGroupService = new ConsumerGroupCommand.ConsumerGroupService(
private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) {
return new ConsumerGroupCommand.ConsumerGroupService(
ConsumerGroupCommandOptions.fromArgs(args),
Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
);
@ -187,20 +178,20 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
produceRecord();
this.withConsumerGroup(() -> {
String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic;
setupConsumerGroupService(getArgs(GROUP, topic));
Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(GROUP, Collections.singletonList(topic));
Errors topLevelError = res.getKey();
Map<TopicPartition, Throwable> partitions = res.getValue();
TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
// Partition level error should propagate to top level, unless this is due to a missed partition attempt.
if (inputPartition >= 0) {
assertEquals(expectedError, topLevelError);
try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = consumerGroupService(getArgs(GROUP, topic))) {
Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(GROUP, Collections.singletonList(topic));
Errors topLevelError = res.getKey();
Map<TopicPartition, Throwable> partitions = res.getValue();
TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
// Partition level error should propagate to top level, unless this is due to a missed partition attempt.
if (inputPartition >= 0) {
assertEquals(expectedError, topLevelError);
}
if (expectedError == Errors.NONE)
assertNull(partitions.get(tp));
else
assertEquals(expectedError.exception(), partitions.get(tp).getCause());
}
if (expectedError == Errors.NONE)
assertNull(partitions.get(tp));
else
assertEquals(expectedError.exception(), partitions.get(tp).getCause());
}, isStable, consumerConfig);
}