KAFKA-8231: Expansion of ConnectClusterState interface (#6584)

Expand ConnectClusterState interface and implementation with methods that provide the immutable cluster details and the connector configuration. This includes unit tests for the new methods.

Author: Chris Egerton <cegerton@oberlin.edu>
Reviews: Arjun Satish <arjun@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Chris Egerton 2019-05-09 18:27:59 -07:00 committed by Randall Hauch
parent b2826c6c2b
commit 36a5aba4ec
7 changed files with 165 additions and 9 deletions

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.health;
/**
* Provides immutable Connect cluster information, such as the ID of the backing Kafka cluster. The
* Connect framework provides the implementation for this interface.
*/
public interface ConnectClusterDetails {
/**
* Get the cluster ID of the Kafka cluster backing this Connect cluster.
*
* @return the cluster ID of the Kafka cluster backing this Connect cluster
**/
String kafkaClusterId();
}

View File

@ -18,10 +18,13 @@
package org.apache.kafka.connect.health;
import java.util.Collection;
import java.util.Map;
/**
* Provides the ability to lookup connector metadata and its health. This is made available to the {@link org.apache.kafka.connect.rest.ConnectRestExtension}
* implementations. The Connect framework provides the implementation for this interface.
* Provides the ability to lookup connector metadata, including status and configurations, as well
* as immutable cluster information such as Kafka cluster ID. This is made available to
* {@link org.apache.kafka.connect.rest.ConnectRestExtension} implementations. The Connect framework
* provides the implementation for this interface.
*/
public interface ConnectClusterState {
@ -43,4 +46,27 @@ public interface ConnectClusterState {
* @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
*/
ConnectorHealth connectorHealth(String connName);
/**
* Lookup the current configuration of a connector. This provides the current snapshot of configuration by querying the underlying
* herder. A connector returned by previous invocation of {@link #connectors()} may no longer be available and could result in {@link
* org.apache.kafka.connect.errors.NotFoundException}.
*
* @param connName name of the connector
* @return the configuration of the connector for the connector name
* @throws org.apache.kafka.connect.errors.NotFoundException if the requested connector can't be found
* @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
*/
default Map<String, String> connectorConfig(String connName) {
throw new UnsupportedOperationException();
}
/**
* Get details about the setup of the Connect cluster.
* @return a {@link ConnectClusterDetails} object containing information about the cluster
* @throws java.lang.UnsupportedOperationException if the default implementation has not been overridden
**/
default ConnectClusterDetails clusterDetails() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.health;
import org.apache.kafka.connect.health.ConnectClusterDetails;
public class ConnectClusterDetailsImpl implements ConnectClusterDetails {
private final String kafkaClusterId;
public ConnectClusterDetailsImpl(String kafkaClusterId) {
this.kafkaClusterId = kafkaClusterId;
}
@Override
public String kafkaClusterId() {
return kafkaClusterId;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime.health;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
@ -38,10 +39,16 @@ import java.util.concurrent.TimeoutException;
public class ConnectClusterStateImpl implements ConnectClusterState {
private final long herderRequestTimeoutMs;
private final ConnectClusterDetails clusterDetails;
private final Herder herder;
public ConnectClusterStateImpl(long connectorsTimeoutMs, Herder herder) {
public ConnectClusterStateImpl(
long connectorsTimeoutMs,
ConnectClusterDetails clusterDetails,
Herder herder
) {
this.herderRequestTimeoutMs = connectorsTimeoutMs;
this.clusterDetails = clusterDetails;
this.herder = herder;
}
@ -58,7 +65,6 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
@Override
public ConnectorHealth connectorHealth(String connName) {
ConnectorStateInfo state = herder.connectorStatus(connName);
ConnectorState connectorState = new ConnectorState(
state.connector().state(),
@ -75,6 +81,25 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
return connectorHealth;
}
@Override
public Map<String, String> connectorConfig(String connName) {
FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
herder.connectorConfig(connName, connectorConfigCallback);
try {
return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new ConnectException(
String.format("Failed to retrieve configuration for connector '%s'", connName),
e
);
}
}
@Override
public ConnectClusterDetails clusterDetails() {
return clusterDetails;
}
private Map<Integer, TaskState> taskStates(List<ConnectorStateInfo.TaskState> states) {
Map<Integer, TaskState> taskStates = new HashMap<>();

View File

@ -19,10 +19,12 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectClusterDetails;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
@ -328,10 +330,14 @@ public class RestServer {
herderRequestTimeoutMs = Math.min(herderRequestTimeoutMs, rebalanceTimeoutMs.longValue());
}
ConnectClusterDetails connectClusterDetails = new ConnectClusterDetailsImpl(
herder.kafkaClusterId()
);
ConnectRestExtensionContext connectRestExtensionContext =
new ConnectRestExtensionContextImpl(
new ConnectRestConfigurable(resourceConfig),
new ConnectClusterStateImpl(herderRequestTimeoutMs, herder)
new ConnectClusterStateImpl(herderRequestTimeoutMs, connectClusterDetails, herder)
);
for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
connectRestExtension.register(connectRestExtensionContext);

View File

@ -30,6 +30,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -38,17 +40,22 @@ import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
public class ConnectClusterStateImplTest {
protected static final String KAFKA_CLUSTER_ID = "franzwashere";
@Mock
protected Herder herder;
protected ConnectClusterStateImpl connectClusterState;
protected Collection<String> expectedConnectors;
protected long herderRequestTimeoutMs = TimeUnit.SECONDS.toMillis(10);
protected Collection<String> expectedConnectors;
@Before
public void setUp() {
connectClusterState = new ConnectClusterStateImpl(herderRequestTimeoutMs, herder);
expectedConnectors = Arrays.asList("sink1", "source1", "source2");
connectClusterState = new ConnectClusterStateImpl(
herderRequestTimeoutMs,
new ConnectClusterDetailsImpl(KAFKA_CLUSTER_ID),
herder
);
}
@Test
@ -66,6 +73,28 @@ public class ConnectClusterStateImplTest {
assertEquals(expectedConnectors, connectClusterState.connectors());
}
@Test
public void connectorConfig() {
final String connName = "sink6";
final Map<String, String> expectedConfig = Collections.singletonMap("key", "value");
Capture<Callback<Map<String, String>>> callback = EasyMock.newCapture();
herder.connectorConfig(EasyMock.eq(connName), EasyMock.capture(callback));
EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
@Override
public Void answer() {
callback.getValue().onCompletion(null, expectedConfig);
return null;
}
});
EasyMock.replay(herder);
assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
}
@Test
public void kafkaClusterId() {
assertEquals(KAFKA_CLUSTER_ID, connectClusterState.clusterDetails().kafkaClusterId());
}
@Test
public void connectorsFailure() {
Capture<Callback<Collection<String>>> callback = EasyMock.newCapture();

View File

@ -62,6 +62,8 @@ public class RestServerTest {
private Plugins plugins;
private RestServer server;
protected static final String KAFKA_CLUSTER_ID = "Xbafgnagvar";
@After
public void tearDown() {
server.stop();
@ -166,6 +168,7 @@ public class RestServerTest {
Map<String, String> configMap = new HashMap<>(baseWorkerProps());
DistributedConfig workerConfig = new DistributedConfig(configMap);
EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,
@ -202,6 +205,7 @@ public class RestServerTest {
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
WorkerConfig workerConfig = new DistributedConfig(workerProps);
EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,
@ -260,7 +264,7 @@ public class RestServerTest {
workerProps.put("offset.storage.file.filename", "/tmp");
WorkerConfig workerConfig = new StandaloneConfig(workerProps);
EasyMock.expect(herder.kafkaClusterId()).andReturn(KAFKA_CLUSTER_ID);
EasyMock.expect(herder.plugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newPlugins(Collections.emptyList(),
workerConfig,