diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1d60fde34d2..ac0accc17fa 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -134,7 +134,7 @@
+ files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|Values|IncrementalCooperativeAssignor).java"/>
result = new ArrayList<>();
for (String verificationAlgorithm : INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) {
try {
- validateSignatureAlgorithm(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm);
+ validateSignatureAlgorithm(crypto, INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm);
result.add(verificationAlgorithm);
} catch (Throwable t) {
log.trace("Verification algorithm '{}' not found", verificationAlgorithm);
@@ -290,7 +290,8 @@ public class DistributedConfig extends WorkerConfig {
}
@SuppressWarnings("unchecked")
- private static final ConfigDef CONFIG = baseConfigDef()
+ private static ConfigDef config(Crypto crypto) {
+ return baseConfigDef()
.define(GROUP_ID_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
@@ -375,7 +376,7 @@ public class DistributedConfig extends WorkerConfig {
atLeast(0),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
- /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
+ /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(9),
@@ -470,9 +471,9 @@ public class DistributedConfig extends WorkerConfig {
INTER_WORKER_KEY_TTL_MS_MS_DOC)
.define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG,
ConfigDef.Type.STRING,
- defaultKeyGenerationAlgorithm(),
+ defaultKeyGenerationAlgorithm(crypto),
ConfigDef.LambdaValidator.with(
- (name, value) -> validateKeyAlgorithm(name, (String) value),
+ (name, value) -> validateKeyAlgorithm(crypto, name, (String) value),
() -> "Any KeyGenerator algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC)
@@ -483,20 +484,21 @@ public class DistributedConfig extends WorkerConfig {
INTER_WORKER_KEY_SIZE_DOC)
.define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG,
ConfigDef.Type.STRING,
- defaultSignatureAlgorithm(),
+ defaultSignatureAlgorithm(crypto),
ConfigDef.LambdaValidator.with(
- (name, value) -> validateSignatureAlgorithm(name, (String) value),
+ (name, value) -> validateSignatureAlgorithm(crypto, name, (String) value),
() -> "Any MAC algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_SIGNATURE_ALGORITHM_DOC)
.define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG,
ConfigDef.Type.LIST,
- defaultVerificationAlgorithms(),
+ defaultVerificationAlgorithms(crypto),
ConfigDef.LambdaValidator.with(
- (name, value) -> validateVerificationAlgorithms(name, (List) value),
+ (name, value) -> validateVerificationAlgorithms(crypto, name, (List) value),
() -> "A list of one or more MAC algorithms, each supported by the worker JVM"),
ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+ }
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
@@ -547,18 +549,24 @@ public class DistributedConfig extends WorkerConfig {
}
public DistributedConfig(Map props) {
- super(CONFIG, props);
+ this(Crypto.SYSTEM, props);
+ }
+
+ // Visible for testing
+ DistributedConfig(Crypto crypto, Map props) {
+ super(config(crypto), props);
+ this.crypto = crypto;
exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
validateInterWorkerKeyConfigs();
}
public static void main(String[] args) {
- System.out.println(CONFIG.toHtml(4, config -> "connectconfigs_" + config));
+ System.out.println(config(Crypto.SYSTEM).toHtml(4, config -> "connectconfigs_" + config));
}
public KeyGenerator getInternalRequestKeyGenerator() {
try {
- KeyGenerator result = KeyGenerator.getInstance(getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG));
+ KeyGenerator result = crypto.keyGenerator(getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG));
Optional.ofNullable(getInt(INTER_WORKER_KEY_SIZE_CONFIG)).ifPresent(result::init);
return result;
} catch (NoSuchAlgorithmException | InvalidParameterException e) {
@@ -615,7 +623,7 @@ public class DistributedConfig extends WorkerConfig {
}
}
- private static void validateVerificationAlgorithms(String configName, List algorithms) {
+ private static void validateVerificationAlgorithms(Crypto crypto, String configName, List algorithms) {
if (algorithms.isEmpty()) {
throw new ConfigException(
configName,
@@ -625,24 +633,24 @@ public class DistributedConfig extends WorkerConfig {
}
for (String algorithm : algorithms) {
try {
- Mac.getInstance(algorithm);
+ crypto.mac(algorithm);
} catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "Mac");
}
}
}
- private static void validateSignatureAlgorithm(String configName, String algorithm) {
+ private static void validateSignatureAlgorithm(Crypto crypto, String configName, String algorithm) {
try {
- Mac.getInstance(algorithm);
+ crypto.mac(algorithm);
} catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "Mac");
}
}
- private static void validateKeyAlgorithm(String configName, String algorithm) {
+ private static void validateKeyAlgorithm(Crypto crypto, String configName, String algorithm) {
try {
- KeyGenerator.getInstance(algorithm);
+ crypto.keyGenerator(algorithm);
} catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "KeyGenerator");
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java
index 83bcdc17398..8ae9bdf0f2b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.eclipse.jetty.client.api.Request;
@@ -41,15 +42,17 @@ public class InternalRequestSignature {
/**
* Add a signature to a request.
- * @param key the key to sign the request with; may not be null
- * @param requestBody the body of the request; may not be null
+ *
+ * @param crypto the cryptography library used to generate {@link Mac} instances, may not be null
+ * @param key the key to sign the request with; may not be null
+ * @param requestBody the body of the request; may not be null
* @param signatureAlgorithm the algorithm to use to sign the request; may not be null
- * @param request the request to add the signature to; may not be null
+ * @param request the request to add the signature to; may not be null
*/
- public static void addToRequest(SecretKey key, byte[] requestBody, String signatureAlgorithm, Request request) {
+ public static void addToRequest(Crypto crypto, SecretKey key, byte[] requestBody, String signatureAlgorithm, Request request) {
Mac mac;
try {
- mac = mac(signatureAlgorithm);
+ mac = crypto.mac(signatureAlgorithm);
} catch (NoSuchAlgorithmException e) {
throw new ConnectException(e);
}
@@ -60,12 +63,14 @@ public class InternalRequestSignature {
/**
* Extract a signature from a request.
- * @param requestBody the body of the request; may not be null
- * @param headers the headers for the request; may be null
+ *
+ * @param crypto the cryptography library used to generate {@link Mac} instances, may not be null
+ * @param requestBody the body of the request; may not be null
+ * @param headers the headers for the request; may be null
* @return the signature extracted from the request, or null if one or more request signature
* headers was not present
*/
- public static InternalRequestSignature fromHeaders(byte[] requestBody, HttpHeaders headers) {
+ public static InternalRequestSignature fromHeaders(Crypto crypto, byte[] requestBody, HttpHeaders headers) {
if (headers == null) {
return null;
}
@@ -78,7 +83,7 @@ public class InternalRequestSignature {
Mac mac;
try {
- mac = mac(signatureAlgorithm);
+ mac = crypto.mac(signatureAlgorithm);
} catch (NoSuchAlgorithmException e) {
throw new BadRequestException(e.getMessage());
}
@@ -112,10 +117,6 @@ public class InternalRequestSignature {
return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature);
}
- private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
- return Mac.getInstance(signatureAlgorithm);
- }
-
private static byte[] sign(Mac mac, SecretKey key, byte[] requestBody) {
try {
mac.init(key);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
index 49652ca9b90..997d0b9cd43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java
@@ -20,6 +20,7 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@@ -136,6 +137,7 @@ public class RestClient {
if (sessionKey != null && requestSignatureAlgorithm != null) {
InternalRequestSignature.addToRequest(
+ Crypto.SYSTEM,
sessionKey,
serializedBody != null ? serializedBody.getBytes(StandardCharsets.UTF_8) : null,
requestSignatureAlgorithm,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index de1eb434240..f81e5daf6fd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
@@ -326,7 +327,7 @@ public class ConnectorsResource implements ConnectResource {
final byte[] requestBody) throws Throwable {
List