From 36a5aba4ecf56631372ad12d5c67af8fa3de05f8 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 9 May 2019 18:27:59 -0700 Subject: [PATCH] KAFKA-8231: Expansion of ConnectClusterState interface (#6584) Expand ConnectClusterState interface and implementation with methods that provide the immutable cluster details and the connector configuration. This includes unit tests for the new methods. Author: Chris Egerton Reviews: Arjun Satish , Konstantine Karantasis , Randall Hauch --- .../connect/health/ConnectClusterDetails.java | 32 +++++++++++++++++ .../connect/health/ConnectClusterState.java | 30 ++++++++++++++-- .../health/ConnectClusterDetailsImpl.java | 34 ++++++++++++++++++ .../health/ConnectClusterStateImpl.java | 29 +++++++++++++-- .../connect/runtime/rest/RestServer.java | 8 ++++- .../health/ConnectClusterStateImplTest.java | 35 +++++++++++++++++-- .../connect/runtime/rest/RestServerTest.java | 6 +++- 7 files changed, 165 insertions(+), 9 deletions(-) create mode 100644 connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java new file mode 100644 index 00000000000..edde6ff657a --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterDetails.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.health; + +/** + * Provides immutable Connect cluster information, such as the ID of the backing Kafka cluster. The + * Connect framework provides the implementation for this interface. + */ +public interface ConnectClusterDetails { + + /** + * Get the cluster ID of the Kafka cluster backing this Connect cluster. + * + * @return the cluster ID of the Kafka cluster backing this Connect cluster + **/ + String kafkaClusterId(); +} \ No newline at end of file diff --git a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java index d4292efd0ca..753ee1a7613 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/health/ConnectClusterState.java @@ -18,10 +18,13 @@ package org.apache.kafka.connect.health; import java.util.Collection; +import java.util.Map; /** - * Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension} - * implementations. The Connect framework provides the implementation for this interface. + * Provides the ability to lookup connector metadata, including status and configurations, as well + * as immutable cluster information such as Kafka cluster ID. This is made available to + * {@link org.apache.kafka.connect.rest.ConnectRestExtension} implementations. The Connect framework + * provides the implementation for this interface. */ public interface ConnectClusterState { @@ -43,4 +46,27 @@ public interface ConnectClusterState { * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found */ ConnectorHealth connectorHealth(String connName); + + /** + * Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying + * herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link + * org.apache.kafka.connect.errors.NotFoundException}. + * + * @param connName name of the connector + * @return the configuration of the connector for the connector name + * @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found + * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden + */ + default Map connectorConfig(String connName) { + throw new UnsupportedOperationException(); + } + + /** + * Get details about the setup of the Connect cluster. + * @return a {@link ConnectClusterDetails} object containing information about the cluster + * @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden + **/ + default ConnectClusterDetails clusterDetails() { + throw new UnsupportedOperationException(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java new file mode 100644 index 00000000000..09f09bd7d38 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterDetailsImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.health; + +import org.apache.kafka.connect.health.ConnectClusterDetails; + +public class ConnectClusterDetailsImpl implements ConnectClusterDetails { + + private final String kafkaClusterId; + + public ConnectClusterDetailsImpl(String kafkaClusterId) { + this.kafkaClusterId = kafkaClusterId; + } + + @Override + public String kafkaClusterId() { + return kafkaClusterId; + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index e3a4833681b..c3e950eb459 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.health; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.health.ConnectClusterDetails; import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.health.ConnectorHealth; import org.apache.kafka.connect.health.ConnectorState; @@ -38,10 +39,16 @@ import java.util.concurrent.TimeoutException; public class ConnectClusterStateImpl implements ConnectClusterState { private final long herderRequestTimeoutMs; + private final ConnectClusterDetails clusterDetails; private final Herder herder; - public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) { + public ConnectClusterStateImpl( + long connectorsTimeoutMs, + ConnectClusterDetails clusterDetails, + Herder herder + ) { this.herderRequestTimeoutMs = connectorsTimeoutMs; + this.clusterDetails = clusterDetails; this.herder = herder; } @@ -58,7 +65,6 @@ public class ConnectClusterStateImpl implements ConnectClusterState { @Override public ConnectorHealth connectorHealth(String connName) { - ConnectorStateInfo state = herder.connectorStatus(connName); ConnectorState connectorState = new ConnectorState( state.connector().state(), @@ -75,6 +81,25 @@ public class ConnectClusterStateImpl implements ConnectClusterState { return connectorHealth; } + @Override + public Map connectorConfig(String connName) { + FutureCallback> connectorConfigCallback = new FutureCallback<>(); + herder.connectorConfig(connName, connectorConfigCallback); + try { + return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new ConnectException( + String.format("Failed to retrieve configuration for connector '%s'", connName), + e + ); + } + } + + @Override + public ConnectClusterDetails clusterDetails() { + return clusterDetails; + } + private Map taskStates(List states) { Map taskStates = new HashMap<>(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 067fd88cbb1..d76cfff8fbf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -19,10 +19,12 @@ package org.apache.kafka.connect.runtime.rest; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.health.ConnectClusterDetails; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; @@ -328,10 +330,14 @@ public class RestServer { herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue()); } + ConnectClusterDetails connectClusterDetails = new ConnectClusterDetailsImpl( + herder.kafkaClusterId() + ); + ConnectRestExtensionContext connectRestExtensionContext = new ConnectRestExtensionContextImpl( new ConnectRestConfigurable(resourceConfig), - new ConnectClusterStateImpl(herderRequestTimeoutMs, herder) + new ConnectClusterStateImpl(herderRequestTimeoutMs, connectClusterDetails, herder) ); for (ConnectRestExtension connectRestExtension : connectRestExtensions) { connectRestExtension.register(connectRestExtensionContext); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java index 47520758fd5..d8984f0aaf2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java @@ -30,6 +30,8 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,17 +40,22 @@ import static org.junit.Assert.assertThrows; @RunWith(PowerMockRunner.class) public class ConnectClusterStateImplTest { - + protected static final String KAFKA_CLUSTER_ID = "franzwashere"; + @Mock protected Herder herder; protected ConnectClusterStateImpl connectClusterState; - protected Collection expectedConnectors; protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10); + protected Collection expectedConnectors; @Before public void setUp() { - connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder); expectedConnectors = Arrays.asList("sink1", "source1", "source2"); + connectClusterState = new ConnectClusterStateImpl( + herderRequestTimeoutMs, + new ConnectClusterDetailsImpl(KAFKA_CLUSTER_ID), + herder + ); } @Test @@ -66,6 +73,28 @@ public class ConnectClusterStateImplTest { assertEquals(expectedConnectors, connectClusterState.connectors()); } + @Test + public void connectorConfig() { + final String connName = "sink6"; + final Map expectedConfig = Collections.singletonMap("key", "value"); + Capture>> callback = EasyMock.newCapture(); + herder.connectorConfig(EasyMock.eq(connName), EasyMock.capture(callback)); + EasyMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Void answer() { + callback.getValue().onCompletion(null, expectedConfig); + return null; + } + }); + EasyMock.replay(herder); + assertEquals(expectedConfig, connectClusterState.connectorConfig(connName)); + } + + @Test + public void kafkaClusterId() { + assertEquals(KAFKA_CLUSTER_ID, connectClusterState.clusterDetails().kafkaClusterId()); + } + @Test public void connectorsFailure() { Capture>> callback = EasyMock.newCapture(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java index 3d297b79fab..91aa5e78dc2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java @@ -62,6 +62,8 @@ public class RestServerTest { private Plugins plugins; private RestServer server; + protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar"; + @After public void tearDown() { server.stop(); @@ -166,6 +168,7 @@ public class RestServerTest { Map configMap = new HashMap<>(baseWorkerProps()); DistributedConfig workerConfig = new DistributedConfig(configMap); + EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID); EasyMock.expect(herder.plugins()).andStubReturn(plugins); EasyMock.expect(plugins.newPlugins(Collections.emptyList(), workerConfig, @@ -202,6 +205,7 @@ public class RestServerTest { workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method); WorkerConfig workerConfig = new DistributedConfig(workerProps); + EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID); EasyMock.expect(herder.plugins()).andStubReturn(plugins); EasyMock.expect(plugins.newPlugins(Collections.emptyList(), workerConfig, @@ -260,7 +264,7 @@ public class RestServerTest { workerProps.put("offset.storage.file.filename", "/tmp"); WorkerConfig workerConfig = new StandaloneConfig(workerProps); - + EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID); EasyMock.expect(herder.plugins()).andStubReturn(plugins); EasyMock.expect(plugins.newPlugins(Collections.emptyList(), workerConfig,