mirror of https://github.com/apache/kafka.git
MINOR: Avoid logging connector configuration in Connect framework (#5868)
Some connector configs may be sensitive, so we should avoid logging them. Reviewers: Alex Diachenko, Dustin Cote <dustin@confluent.io>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
3a492474db
commit
eeba93e894
|
|
@ -291,7 +291,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
*/
|
||||
@Override
|
||||
public void putConnectorConfig(String connector, Map<String, String> properties) {
|
||||
log.debug("Writing connector configuration {} for connector {} configuration", properties, connector);
|
||||
log.debug("Writing connector configuration for connector '{}'", connector);
|
||||
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
|
||||
connectConfig.put("properties", properties);
|
||||
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
|
||||
|
|
@ -304,7 +304,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
*/
|
||||
@Override
|
||||
public void removeConnectorConfig(String connector) {
|
||||
log.debug("Removing connector configuration for connector {}", connector);
|
||||
log.debug("Removing connector configuration for connector '{}'", connector);
|
||||
try {
|
||||
configLog.send(CONNECTOR_KEY(connector), null);
|
||||
configLog.send(TARGET_STATE_KEY(connector), null);
|
||||
|
|
@ -358,7 +358,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
|
||||
connectConfig.put("properties", taskConfig);
|
||||
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
|
||||
log.debug("Writing configuration for task " + index + " configuration: " + taskConfig);
|
||||
log.debug("Writing configuration for connector '{}' task {}", connector, index);
|
||||
ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index);
|
||||
configLog.send(TASK_KEY(connectorTaskId), serializedConfig);
|
||||
index++;
|
||||
|
|
@ -375,7 +375,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
|
||||
connectConfig.put("tasks", taskCount);
|
||||
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
|
||||
log.debug("Writing commit for connector " + connector + " with " + taskCount + " tasks.");
|
||||
log.debug("Writing commit for connector '{}' with {} tasks.", connector, taskCount);
|
||||
configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig);
|
||||
|
||||
// Read to end to ensure all the commit messages have been written
|
||||
|
|
@ -484,17 +484,17 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
}
|
||||
Object targetState = ((Map<String, Object>) value.value()).get("state");
|
||||
if (!(targetState instanceof String)) {
|
||||
log.error("Invalid data for target state for connector ({}): 'state' field should be a Map but is {}",
|
||||
log.error("Invalid data for target state for connector '{}': 'state' field should be a Map but is {}",
|
||||
connectorName, targetState == null ? null : targetState.getClass());
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
TargetState state = TargetState.valueOf((String) targetState);
|
||||
log.debug("Setting target state for connector {} to {}", connectorName, targetState);
|
||||
log.debug("Setting target state for connector '{}' to {}", connectorName, targetState);
|
||||
connectorTargetStates.put(connectorName, state);
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("Invalid target state for connector ({}): {}", connectorName, targetState);
|
||||
log.error("Invalid target state for connector '{}': {}", connectorName, targetState);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
@ -511,22 +511,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
synchronized (lock) {
|
||||
if (value.value() == null) {
|
||||
// Connector deletion will be written as a null value
|
||||
log.info("Removed connector " + connectorName + " due to null configuration. This is usually intentional and does not indicate an issue.");
|
||||
log.info("Successfully processed removal of connector '{}'", connectorName);
|
||||
connectorConfigs.remove(connectorName);
|
||||
removed = true;
|
||||
} else {
|
||||
// Connector configs can be applied and callbacks invoked immediately
|
||||
if (!(value.value() instanceof Map)) {
|
||||
log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
|
||||
log.error("Found configuration for connector '{}' in wrong format: {}", record.key(), value.value().getClass());
|
||||
return;
|
||||
}
|
||||
Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
|
||||
if (!(newConnectorConfig instanceof Map)) {
|
||||
log.error("Invalid data for connector config ({}): properties field should be a Map but is {}", connectorName,
|
||||
newConnectorConfig == null ? null : newConnectorConfig.getClass());
|
||||
log.error("Invalid data for config for connector '{}': 'properties' field should be a Map but is {}",
|
||||
connectorName, newConnectorConfig == null ? null : newConnectorConfig.getClass());
|
||||
return;
|
||||
}
|
||||
log.debug("Updating configuration for connector " + connectorName + " configuration: " + newConnectorConfig);
|
||||
log.debug("Updating configuration for connector '{}'", connectorName);
|
||||
connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
|
||||
|
||||
// Set the initial state of the connector to STARTED, which ensures that any connectors
|
||||
|
|
@ -545,17 +545,21 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
synchronized (lock) {
|
||||
ConnectorTaskId taskId = parseTaskId(record.key());
|
||||
if (taskId == null) {
|
||||
log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
|
||||
log.error("Ignoring task configuration because {} couldn't be parsed as a task config key", record.key());
|
||||
return;
|
||||
}
|
||||
if (value.value() == null) {
|
||||
log.error("Ignoring task configuration for task {} because it is unexpectedly null", taskId);
|
||||
return;
|
||||
}
|
||||
if (!(value.value() instanceof Map)) {
|
||||
log.error("Ignoring task configuration for task " + taskId + " because it is in the wrong format: " + value.value());
|
||||
log.error("Ignoring task configuration for task {} because the value is not a Map but is {}", taskId, value.value().getClass());
|
||||
return;
|
||||
}
|
||||
|
||||
Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
|
||||
if (!(newTaskConfig instanceof Map)) {
|
||||
log.error("Invalid data for task config (" + taskId + "): properties filed should be a Map but is " + newTaskConfig.getClass());
|
||||
log.error("Invalid data for config of task {} 'properties' field should be a Map but is {}", taskId, newTaskConfig.getClass());
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -564,7 +568,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
deferred = new HashMap<>();
|
||||
deferredTaskUpdates.put(taskId.connector(), deferred);
|
||||
}
|
||||
log.debug("Storing new config for task " + taskId + " this will wait for a commit message before the new config will take effect. New config: " + newTaskConfig);
|
||||
log.debug("Storing new config for task {}; this will wait for a commit message before the new config will take effect.", taskId);
|
||||
deferred.put(taskId, (Map<String, String>) newTaskConfig);
|
||||
}
|
||||
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
|
||||
|
|
@ -593,7 +597,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
// resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
|
||||
// exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
|
||||
if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
|
||||
log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
|
||||
log.error("Ignoring connector tasks configuration commit for connector '{}' because it is in the wrong format: {}", connectorName, value.value());
|
||||
return;
|
||||
}
|
||||
Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
|
||||
|
|
@ -608,7 +612,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
// historical data, in which case we would not have applied any updates yet and there will be no
|
||||
// task config data already committed for the connector, so we shouldn't have to clear any data
|
||||
// out. All we need to do is add the flag marking it inconsistent.
|
||||
log.debug("We have an incomplete set of task configs for connector " + connectorName + " probably due to compaction. So we are not doing anything with the new configuration.");
|
||||
log.debug("We have an incomplete set of task configs for connector '{}' probably due to compaction. So we are not doing anything with the new configuration.", connectorName);
|
||||
inconsistent.add(connectorName);
|
||||
} else {
|
||||
if (deferred != null) {
|
||||
|
|
@ -629,7 +633,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
if (started)
|
||||
updateListener.onTaskConfigUpdate(updatedTasks);
|
||||
} else {
|
||||
log.error("Discarding config update record with invalid key: " + record.key());
|
||||
log.error("Discarding config update record with invalid key: {}", record.key());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue