This commit is contained in:
Fan Yang 2025-09-25 09:53:14 +08:00
parent dd8beb4bbf
commit 399f5bc19e
1 changed files with 4 additions and 5 deletions

View File

@ -903,10 +903,6 @@ public class WorkerTest {
@ValueSource(booleans = {true, false}) @ValueSource(booleans = {true, false})
public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) { public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) {
setup(enableTopicCreation); setup(enableTopicCreation);
ConcurrentMap<ConnectorTaskId, WorkerTask<?, ?>> tasks = new ConcurrentHashMap<>();
ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
mockKafkaClusterId(); mockKafkaClusterId();
mockInternalConverters(); mockInternalConverters();
mockFileConfigProvider(); mockFileConfigProvider();
@ -919,10 +915,13 @@ public class WorkerTest {
noneConnectorClientConfigOverridePolicy); noneConnectorClientConfigOverridePolicy);
worker.herder = herder; worker.herder = herder;
// Pass an empty tasks map to simulate all tasks failing to start
Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup( Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup(
worker.metrics(), tasks, herder worker.metrics(), new ConcurrentHashMap<>(), herder
); );
ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
metricsGroup.recordTaskAdded(taskId1); metricsGroup.recordTaskAdded(taskId1);
metricsGroup.recordTaskAdded(taskId2); metricsGroup.recordTaskAdded(taskId2);
metricsGroup.recordTaskRemoved(taskId1); metricsGroup.recordTaskRemoved(taskId1);