KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)

The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propagate to the worker when, e.g., starting a connector.

In this commit the map is copied before it's returned to REST extensions.

An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
Chris Egerton 2020-05-23 15:35:43 -07:00 committed by GitHub
parent 981ef5166d
commit 40ee580ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 4 deletions

View File

@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
herder.connectorConfig(connName, connectorConfigCallback);
try {
return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new ConnectException(
String.format("Failed to retrieve configuration for connector '%s'", connName),

View File

@ -281,8 +281,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
@Override
public ClusterConfigState snapshot() {
synchronized (lock) {
// Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
// immutable configs
// Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view
// of the config topic, any nested structures should be copied before making modifications
return new ClusterConfigState(
offset,
sessionKey,

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest {
}
});
EasyMock.replay(herder);
assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
Map<String, String> actualConfig = connectClusterState.connectorConfig(connName);
assertEquals(expectedConfig, actualConfig);
assertNotSame(
"Config should be copied in order to avoid mutation by REST extensions",
expectedConfig,
actualConfig
);
}
@Test