This commit is contained in:
Fan Yang 2025-09-22 09:59:41 +08:00
parent fe4f3e4508
commit dd8beb4bbf
1 changed files with 31 additions and 0 deletions

View File

@ -146,6 +146,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG; import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -898,6 +899,36 @@ public class WorkerTest {
verifyKafkaClusterId(); verifyKafkaClusterId();
} }
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) {
setup(enableTopicCreation);
ConcurrentMap<ConnectorTaskId, WorkerTask<?, ?>> tasks = new ConcurrentHashMap<>();
ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
mockKafkaClusterId();
mockInternalConverters();
mockFileConfigProvider();
worker = new Worker(WORKER_ID,
new MockTime(),
plugins,
config,
offsetBackingStore,
noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup(
worker.metrics(), tasks, herder
);
metricsGroup.recordTaskAdded(taskId1);
metricsGroup.recordTaskAdded(taskId2);
metricsGroup.recordTaskRemoved(taskId1);
assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), "should not throw NPE");
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testStartTaskFailure(boolean enableTopicCreation) { public void testStartTaskFailure(boolean enableTopicCreation) {