From b199eba296f1b3d38bbfb786486208e7ddc8d554 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Wed, 23 Jan 2019 11:00:23 -0800 Subject: [PATCH] KAFKA-5117: Stop resolving externalized configs in Connect REST API [KIP-297](https://cwiki.apache.org/confluence/display/KAFKA/KIP-297%3A+Externalizing+Secrets+for+Connect+Configurations#KIP-297:ExternalizingSecretsforConnectConfigurations-PublicInterfaces) introduced the `ConfigProvider` mechanism, which was primarily intended for externalizing secrets provided in connector configurations. However, when querying the Connect REST API for the configuration of a connector or its tasks, those secrets are still exposed. The changes here prevent the Connect REST API from ever exposing resolved configurations in order to address that. rhauch has given a more thorough writeup of the thinking behind this in [KAFKA-5117](https://issues.apache.org/jira/browse/KAFKA-5117) Tested and verified manually. If these changes are approved unit tests can be added to prevent a regression. Author: Chris Egerton Reviewers: Robert Yokota , Randall Hauch Closes #6129 from C0urante/hide-provided-connect-configs (cherry picked from commit 743607af5aa625a19377688709870b021014dee2) Signed-off-by: Ewen Cheslack-Postava --- .../distributed/DistributedHerder.java | 4 ++-- .../runtime/standalone/StandaloneHerder.java | 4 ++-- .../distributed/DistributedHerderTest.java | 11 ++++++++- .../standalone/StandaloneHerderTest.java | 24 +++++++++++++------ .../tests/connect/connect_rest_test.py | 7 ++++-- tests/kafkatest/tests/connect/connect_test.py | 5 ++-- .../templates/connect-distributed.properties | 6 +++++ 7 files changed, 44 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 099f084f45f..7edc3b25cae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -451,7 +451,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { if (!configState.contains(connName)) { callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); } else { - Map config = configState.connectorConfig(connName); + Map config = configState.rawConnectorConfig(connName); callback.onCompletion(null, new ConnectorInfo(connName, config, configState.tasks(connName), connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))); @@ -607,7 +607,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { List result = new ArrayList<>(); for (int i = 0; i < configState.taskCount(connName); i++) { ConnectorTaskId id = new ConnectorTaskId(connName, i); - result.add(new TaskInfo(id, configState.taskConfig(id))); + result.add(new TaskInfo(id, configState.rawTaskConfig(id))); } callback.onCompletion(null, result); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index fe31c284613..95b53e5d730 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -134,7 +134,7 @@ public class StandaloneHerder extends AbstractHerder { private ConnectorInfo createConnectorInfo(String connector) { if (!configState.contains(connector)) return null; - Map config = configState.connectorConfig(connector); + Map config = configState.rawConnectorConfig(connector); return new ConnectorInfo(connector, config, configState.tasks(connector), connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))); } @@ -232,7 +232,7 @@ public class StandaloneHerder extends AbstractHerder { List result = new ArrayList<>(); for (ConnectorTaskId taskId : configState.tasks(connName)) - result.add(new TaskInfo(taskId, configState.taskConfig(taskId))); + result.add(new TaskInfo(taskId, configState.rawTaskConfig(taskId))); callback.onCompletion(null, result); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index a0de8cf14ac..25c1da8c512 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -1293,7 +1293,16 @@ public class DistributedHerderTest { EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes(); expectRebalance(1, Collections.emptyList(), Collections.emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); + + WorkerConfigTransformer configTransformer = EasyMock.mock(WorkerConfigTransformer.class); + EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")); + EasyMock.replay(configTransformer); + ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, Collections.emptySet(), configTransformer); + + expectPostRebalanceCatchup(snapshotWithTransform); member.wakeup(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index b98c15e7014..a23ee10ea48 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -115,12 +115,14 @@ public class StandaloneHerderTest { public void setup() { worker = PowerMock.createMock(Worker.class); herder = PowerMock.createPartialMock(StandaloneHerder.class, new String[]{"connectorTypeForClass"}, - worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore()); + worker, WORKER_ID, KAFKA_CLUSTER_ID, statusBackingStore, new MemoryConfigBackingStore(transformer)); plugins = PowerMock.createMock(Plugins.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class); delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); PowerMock.mockStatic(Plugins.class); PowerMock.mockStatic(WorkerConnector.class); + Capture> configCapture = Capture.newInstance(); + EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(configCapture))).andAnswer(configCapture::getValue).anyTimes(); } @Test @@ -357,7 +359,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); @@ -390,7 +393,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(taskId, configState, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(false); @@ -458,7 +462,6 @@ public class StandaloneHerderTest { // Create connector connector = PowerMock.createMock(BogusSourceConnector.class); expectAdd(SourceSink.SOURCE); - Connector connectorMock = PowerMock.createMock(SourceConnector.class); expectConfigValidation(connector, true, connConfig); // Validate accessors with 1 connector @@ -485,6 +488,13 @@ public class StandaloneHerderTest { herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); + + EasyMock.reset(transformer); + EasyMock.expect(transformer.transform(EasyMock.eq(CONNECTOR_NAME), EasyMock.anyObject())) + .andThrow(new AssertionError("Config transformation should not occur when requesting connector or task info")) + .anyTimes(); + EasyMock.replay(transformer); + herder.connectors(listConnectorsCb); herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); @@ -604,8 +614,7 @@ public class StandaloneHerderTest { PowerMock.verifyAll(); } - private void expectAdd(SourceSink sourceSink) throws Exception { - + private void expectAdd(SourceSink sourceSink) { Map connectorProps = connectorConfig(sourceSink); ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? new SourceConnectorConfig(plugins, connectorProps) : @@ -634,7 +643,8 @@ public class StandaloneHerderTest { Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps), - new HashSet<>()); + new HashSet<>(), + transformer); worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), configState, connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 8b6157b46da..c13515bea06 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -14,7 +14,7 @@ # limitations under the License. from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.connect import ConnectDistributedService, ConnectRestError +from kafkatest.services.connect import ConnectDistributedService, ConnectRestError, ConnectServiceBase from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from ducktape.cluster.remoteaccount import RemoteCommandError @@ -43,7 +43,9 @@ class ConnectRestApiTest(KafkaTest): INPUT_FILE2 = "/mnt/connect.input2" OUTPUT_FILE = "/mnt/connect.output" - TOPIC = "test" + TOPIC = "${file:%s:topic.external}" % ConnectServiceBase.EXTERNAL_CONFIGS_FILE + TOPIC_TEST = "test" + DEFAULT_BATCH_SIZE = "2000" OFFSETS_TOPIC = "connect-offsets" OFFSETS_REPLICATION_FACTOR = "1" @@ -78,6 +80,7 @@ class ConnectRestApiTest(KafkaTest): self.schemas = True self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.set_external_configs(lambda node: self.render("connect-file-external.properties", node=node)) self.cc.start() diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index f01ff0a835e..9a1ff1bb631 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -22,8 +22,7 @@ from ducktape.errors import TimeoutError from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService -from kafkatest.services.connect import ConnectStandaloneService -from kafkatest.services.connect import ErrorTolerance +from kafkatest.services.connect import ConnectServiceBase, ConnectStandaloneService, ErrorTolerance from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -47,7 +46,7 @@ class ConnectStandaloneFileTest(Test): OFFSETS_FILE = "/mnt/connect.offsets" - TOPIC = "${file:/mnt/connect/connect-external-configs.properties:topic.external}" + TOPIC = "${file:%s:topic.external}" % ConnectServiceBase.EXTERNAL_CONFIGS_FILE TOPIC_TEST = "test" FIRST_INPUT_LIST = ["foo", "bar", "baz"] diff --git a/tests/kafkatest/tests/connect/templates/connect-distributed.properties b/tests/kafkatest/tests/connect/templates/connect-distributed.properties index 186773e7d1a..ca8c4f84efb 100644 --- a/tests/kafkatest/tests/connect/templates/connect-distributed.properties +++ b/tests/kafkatest/tests/connect/templates/connect-distributed.properties @@ -50,3 +50,9 @@ consumer.session.timeout.ms=10000 # Reduce the admin client request timeouts so that we don't wait the default 120 sec before failing to connect the admin client request.timeout.ms=30000 + +# Allow connector configs to use externalized config values of the form: +# ${file:/mnt/connect/connect-external-configs.properties:topic.external} +# +config.providers=file +config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider