From 40ee580ed23c876dec45eca4770e834d9aed76da Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Sat, 23 May 2020 15:35:43 -0700 Subject: [PATCH] KAFKA-9888: Copy connector configs before passing to REST extensions (#8511) The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propagate to the worker when, e.g., starting a connector. In this commit the map is copied before it's returned to REST extensions. An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original. Reviewers: Nigel Liang , Konstantine Karantasis --- .../connect/runtime/health/ConnectClusterStateImpl.java | 2 +- .../kafka/connect/storage/KafkaConfigBackingStore.java | 4 ++-- .../runtime/health/ConnectClusterStateImplTest.java | 9 ++++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index 38362b3fc27..6b7285df50b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState { FutureCallback> connectorConfigCallback = new FutureCallback<>(); herder.connectorConfig(connName, connectorConfigCallback); try { - return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS); + return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS)); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new ConnectException( String.format("Failed to retrieve configuration for connector '%s'", connName), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 3cf2288450b..b9e9831c4fd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -281,8 +281,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @Override public ClusterConfigState snapshot() { synchronized (lock) { - // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be - // immutable configs + // Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view + // of the config topic, any nested structures should be copied before making modifications return new ClusterConfigState( offset, sessionKey, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java index d8984f0aaf2..d8a7e49ee81 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertThrows; @RunWith(PowerMockRunner.class) @@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest { } }); EasyMock.replay(herder); - assertEquals(expectedConfig, connectClusterState.connectorConfig(connName)); + Map actualConfig = connectClusterState.connectorConfig(connName); + assertEquals(expectedConfig, actualConfig); + assertNotSame( + "Config should be copied in order to avoid mutation by REST extensions", + expectedConfig, + actualConfig + ); } @Test