mirror of https://github.com/apache/kafka.git
Merge 399f5bc19e
into 4a5aa37169
This commit is contained in:
commit
08a4dd91cd
|
@ -2394,6 +2394,10 @@ public final class Worker {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
|
protected synchronized void recordTaskRemoved(ConnectorTaskId connectorTaskId) {
|
||||||
|
if (!connectorStatusMetrics.containsKey(connectorTaskId.connector())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Unregister connector task count metric if we remove the last task of the connector
|
// Unregister connector task count metric if we remove the last task of the connector
|
||||||
if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) {
|
if (tasks.keySet().stream().noneMatch(id -> id.connector().equals(connectorTaskId.connector()))) {
|
||||||
connectorStatusMetrics.get(connectorTaskId.connector()).close();
|
connectorStatusMetrics.get(connectorTaskId.connector()).close();
|
||||||
|
|
|
@ -146,6 +146,7 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
|
||||||
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
|
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
|
||||||
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
|
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
|
||||||
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
|
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
|
||||||
|
@ -898,6 +899,35 @@ public class WorkerTest {
|
||||||
verifyKafkaClusterId();
|
verifyKafkaClusterId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ValueSource(booleans = {true, false})
|
||||||
|
public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean enableTopicCreation) {
|
||||||
|
setup(enableTopicCreation);
|
||||||
|
mockKafkaClusterId();
|
||||||
|
mockInternalConverters();
|
||||||
|
mockFileConfigProvider();
|
||||||
|
|
||||||
|
worker = new Worker(WORKER_ID,
|
||||||
|
new MockTime(),
|
||||||
|
plugins,
|
||||||
|
config,
|
||||||
|
offsetBackingStore,
|
||||||
|
noneConnectorClientConfigOverridePolicy);
|
||||||
|
worker.herder = herder;
|
||||||
|
|
||||||
|
// Pass an empty tasks map to simulate all tasks failing to start
|
||||||
|
Worker.ConnectorStatusMetricsGroup metricsGroup = new Worker.ConnectorStatusMetricsGroup(
|
||||||
|
worker.metrics(), new ConcurrentHashMap<>(), herder
|
||||||
|
);
|
||||||
|
|
||||||
|
ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
|
||||||
|
ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
|
||||||
|
metricsGroup.recordTaskAdded(taskId1);
|
||||||
|
metricsGroup.recordTaskAdded(taskId2);
|
||||||
|
metricsGroup.recordTaskRemoved(taskId1);
|
||||||
|
assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), "should not throw NPE");
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = {true, false})
|
@ValueSource(booleans = {true, false})
|
||||||
public void testStartTaskFailure(boolean enableTopicCreation) {
|
public void testStartTaskFailure(boolean enableTopicCreation) {
|
||||||
|
|
Loading…
Reference in New Issue