KAFKA-14346: Remove hard-to-mock javax.crypto calls (#12866)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2022-11-17 15:10:17 -08:00 committed by GitHub
parent fca5bfe13c
commit 31c69ae932
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 173 additions and 91 deletions

View File

@ -134,7 +134,7 @@
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="Worker(|Test).java"/> files="Worker(|Test).java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(DistributedHerder|KafkaConfigBackingStore|Values|IncrementalCooperativeAssignor).java"/> files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|Values|IncrementalCooperativeAssignor).java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"
files="Worker(SinkTask|SourceTask|Coordinator).java"/> files="Worker(SinkTask|SourceTask|Coordinator).java"/>
<suppress checks="ParameterNumber" <suppress checks="ParameterNumber"

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.distributed;
import javax.crypto.KeyGenerator;
import javax.crypto.Mac;
import java.security.NoSuchAlgorithmException;
/**
* An interface to allow the dependency injection of {@link Mac} and {@link KeyGenerator} instances for testing.
* Implementations of this class should be thread-safe.
*/
public interface Crypto {
/**
* Use the system implementation for cryptography calls.
*/
Crypto SYSTEM = new SystemCrypto();
/**
* Returns a {@code Mac} object that implements the
* specified MAC algorithm. See {@link Mac#getInstance(String)}.
*
* @param algorithm the standard name of the requested MAC algorithm.
* @return the new {@code Mac} object
* @throws NoSuchAlgorithmException if no {@code Provider} supports a
* {@code MacSpi} implementation for the specified algorithm
*/
Mac mac(String algorithm) throws NoSuchAlgorithmException;
/**
* Returns a {@code KeyGenerator} object that generates secret keys
* for the specified algorithm. See {@link KeyGenerator#getInstance(String)}.
*
* @param algorithm the standard name of the requested key algorithm.
* @return the new {@code KeyGenerator} object
* @throws NoSuchAlgorithmException if no {@code Provider} supports a
* {@code KeyGeneratorSpi} implementation for the
* specified algorithm
*/
KeyGenerator keyGenerator(String algorithm) throws NoSuchAlgorithmException;
class SystemCrypto implements Crypto {
@Override
public Mac mac(String algorithm) throws NoSuchAlgorithmException {
return Mac.getInstance(algorithm);
}
@Override
public KeyGenerator keyGenerator(String algorithm) throws NoSuchAlgorithmException {
return KeyGenerator.getInstance(algorithm);
}
};
}

View File

@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.crypto.KeyGenerator; import javax.crypto.KeyGenerator;
import javax.crypto.Mac;
import java.security.InvalidParameterException; import java.security.InvalidParameterException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.security.Provider; import java.security.Provider;
@ -208,6 +207,7 @@ public class DistributedConfig extends WorkerConfig {
+ "which must include the algorithm used for the " + INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. " + "which must include the algorithm used for the " + INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. "
+ "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT + "' will be used as a default on JVMs that provide them; " + "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT + "' will be used as a default on JVMs that provide them; "
+ "on other JVMs, no default is used and a value for this property must be manually specified in the worker config."; + "on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
private Crypto crypto;
private enum ExactlyOnceSourceSupport { private enum ExactlyOnceSourceSupport {
DISABLED(false), DISABLED(false),
@ -237,9 +237,9 @@ public class DistributedConfig extends WorkerConfig {
// + "See the exactly-once source support documentation at [add docs link here] for more information on this feature."; // + "See the exactly-once source support documentation at [add docs link here] for more information on this feature.";
public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString(); public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString();
private static Object defaultKeyGenerationAlgorithm() { private static Object defaultKeyGenerationAlgorithm(Crypto crypto) {
try { try {
validateKeyAlgorithm(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT); validateKeyAlgorithm(crypto, INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
return INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT; return INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
} catch (Throwable t) { } catch (Throwable t) {
log.info( log.info(
@ -252,9 +252,9 @@ public class DistributedConfig extends WorkerConfig {
} }
} }
private static Object defaultSignatureAlgorithm() { private static Object defaultSignatureAlgorithm(Crypto crypto) {
try { try {
validateSignatureAlgorithm(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); validateSignatureAlgorithm(crypto, INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
return INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT; return INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT;
} catch (Throwable t) { } catch (Throwable t) {
log.info( log.info(
@ -267,11 +267,11 @@ public class DistributedConfig extends WorkerConfig {
} }
} }
private static Object defaultVerificationAlgorithms() { private static Object defaultVerificationAlgorithms(Crypto crypto) {
List<String> result = new ArrayList<>(); List<String> result = new ArrayList<>();
for (String verificationAlgorithm : INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) { for (String verificationAlgorithm : INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) {
try { try {
validateSignatureAlgorithm(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm); validateSignatureAlgorithm(crypto, INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm);
result.add(verificationAlgorithm); result.add(verificationAlgorithm);
} catch (Throwable t) { } catch (Throwable t) {
log.trace("Verification algorithm '{}' not found", verificationAlgorithm); log.trace("Verification algorithm '{}' not found", verificationAlgorithm);
@ -290,7 +290,8 @@ public class DistributedConfig extends WorkerConfig {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static final ConfigDef CONFIG = baseConfigDef() private static ConfigDef config(Crypto crypto) {
return baseConfigDef()
.define(GROUP_ID_CONFIG, .define(GROUP_ID_CONFIG,
ConfigDef.Type.STRING, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, ConfigDef.Importance.HIGH,
@ -375,7 +376,7 @@ public class DistributedConfig extends WorkerConfig {
atLeast(0), atLeast(0),
ConfigDef.Importance.MEDIUM, ConfigDef.Importance.MEDIUM,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC) CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
ConfigDef.Type.LONG, ConfigDef.Type.LONG,
TimeUnit.MINUTES.toMillis(9), TimeUnit.MINUTES.toMillis(9),
@ -470,9 +471,9 @@ public class DistributedConfig extends WorkerConfig {
INTER_WORKER_KEY_TTL_MS_MS_DOC) INTER_WORKER_KEY_TTL_MS_MS_DOC)
.define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, .define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG,
ConfigDef.Type.STRING, ConfigDef.Type.STRING,
defaultKeyGenerationAlgorithm(), defaultKeyGenerationAlgorithm(crypto),
ConfigDef.LambdaValidator.with( ConfigDef.LambdaValidator.with(
(name, value) -> validateKeyAlgorithm(name, (String) value), (name, value) -> validateKeyAlgorithm(crypto, name, (String) value),
() -> "Any KeyGenerator algorithm supported by the worker JVM"), () -> "Any KeyGenerator algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC) INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC)
@ -483,20 +484,21 @@ public class DistributedConfig extends WorkerConfig {
INTER_WORKER_KEY_SIZE_DOC) INTER_WORKER_KEY_SIZE_DOC)
.define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, .define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG,
ConfigDef.Type.STRING, ConfigDef.Type.STRING,
defaultSignatureAlgorithm(), defaultSignatureAlgorithm(crypto),
ConfigDef.LambdaValidator.with( ConfigDef.LambdaValidator.with(
(name, value) -> validateSignatureAlgorithm(name, (String) value), (name, value) -> validateSignatureAlgorithm(crypto, name, (String) value),
() -> "Any MAC algorithm supported by the worker JVM"), () -> "Any MAC algorithm supported by the worker JVM"),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
INTER_WORKER_SIGNATURE_ALGORITHM_DOC) INTER_WORKER_SIGNATURE_ALGORITHM_DOC)
.define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, .define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG,
ConfigDef.Type.LIST, ConfigDef.Type.LIST,
defaultVerificationAlgorithms(), defaultVerificationAlgorithms(crypto),
ConfigDef.LambdaValidator.with( ConfigDef.LambdaValidator.with(
(name, value) -> validateVerificationAlgorithms(name, (List<String>) value), (name, value) -> validateVerificationAlgorithms(crypto, name, (List<String>) value),
() -> "A list of one or more MAC algorithms, each supported by the worker JVM"), () -> "A list of one or more MAC algorithms, each supported by the worker JVM"),
ConfigDef.Importance.LOW, ConfigDef.Importance.LOW,
INTER_WORKER_VERIFICATION_ALGORITHMS_DOC); INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
}
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport; private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
@ -547,18 +549,24 @@ public class DistributedConfig extends WorkerConfig {
} }
public DistributedConfig(Map<String, String> props) { public DistributedConfig(Map<String, String> props) {
super(CONFIG, props); this(Crypto.SYSTEM, props);
}
// Visible for testing
DistributedConfig(Crypto crypto, Map<String, String> props) {
super(config(crypto), props);
this.crypto = crypto;
exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG)); exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
validateInterWorkerKeyConfigs(); validateInterWorkerKeyConfigs();
} }
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(CONFIG.toHtml(4, config -> "connectconfigs_" + config)); System.out.println(config(Crypto.SYSTEM).toHtml(4, config -> "connectconfigs_" + config));
} }
public KeyGenerator getInternalRequestKeyGenerator() { public KeyGenerator getInternalRequestKeyGenerator() {
try { try {
KeyGenerator result = KeyGenerator.getInstance(getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG)); KeyGenerator result = crypto.keyGenerator(getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG));
Optional.ofNullable(getInt(INTER_WORKER_KEY_SIZE_CONFIG)).ifPresent(result::init); Optional.ofNullable(getInt(INTER_WORKER_KEY_SIZE_CONFIG)).ifPresent(result::init);
return result; return result;
} catch (NoSuchAlgorithmException | InvalidParameterException e) { } catch (NoSuchAlgorithmException | InvalidParameterException e) {
@ -615,7 +623,7 @@ public class DistributedConfig extends WorkerConfig {
} }
} }
private static void validateVerificationAlgorithms(String configName, List<String> algorithms) { private static void validateVerificationAlgorithms(Crypto crypto, String configName, List<String> algorithms) {
if (algorithms.isEmpty()) { if (algorithms.isEmpty()) {
throw new ConfigException( throw new ConfigException(
configName, configName,
@ -625,24 +633,24 @@ public class DistributedConfig extends WorkerConfig {
} }
for (String algorithm : algorithms) { for (String algorithm : algorithms) {
try { try {
Mac.getInstance(algorithm); crypto.mac(algorithm);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "Mac"); throw unsupportedAlgorithmException(configName, algorithm, "Mac");
} }
} }
} }
private static void validateSignatureAlgorithm(String configName, String algorithm) { private static void validateSignatureAlgorithm(Crypto crypto, String configName, String algorithm) {
try { try {
Mac.getInstance(algorithm); crypto.mac(algorithm);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "Mac"); throw unsupportedAlgorithmException(configName, algorithm, "Mac");
} }
} }
private static void validateKeyAlgorithm(String configName, String algorithm) { private static void validateKeyAlgorithm(Crypto crypto, String configName, String algorithm) {
try { try {
KeyGenerator.getInstance(algorithm); crypto.keyGenerator(algorithm);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw unsupportedAlgorithmException(configName, algorithm, "KeyGenerator"); throw unsupportedAlgorithmException(configName, algorithm, "KeyGenerator");
} }

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.rest; package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
@ -41,15 +42,17 @@ public class InternalRequestSignature {
/** /**
* Add a signature to a request. * Add a signature to a request.
* @param key the key to sign the request with; may not be null *
* @param requestBody the body of the request; may not be null * @param crypto the cryptography library used to generate {@link Mac} instances, may not be null
* @param key the key to sign the request with; may not be null
* @param requestBody the body of the request; may not be null
* @param signatureAlgorithm the algorithm to use to sign the request; may not be null * @param signatureAlgorithm the algorithm to use to sign the request; may not be null
* @param request the request to add the signature to; may not be null * @param request the request to add the signature to; may not be null
*/ */
public static void addToRequest(SecretKey key, byte[] requestBody, String signatureAlgorithm, Request request) { public static void addToRequest(Crypto crypto, SecretKey key, byte[] requestBody, String signatureAlgorithm, Request request) {
Mac mac; Mac mac;
try { try {
mac = mac(signatureAlgorithm); mac = crypto.mac(signatureAlgorithm);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw new ConnectException(e); throw new ConnectException(e);
} }
@ -60,12 +63,14 @@ public class InternalRequestSignature {
/** /**
* Extract a signature from a request. * Extract a signature from a request.
* @param requestBody the body of the request; may not be null *
* @param headers the headers for the request; may be null * @param crypto the cryptography library used to generate {@link Mac} instances, may not be null
* @param requestBody the body of the request; may not be null
* @param headers the headers for the request; may be null
* @return the signature extracted from the request, or null if one or more request signature * @return the signature extracted from the request, or null if one or more request signature
* headers was not present * headers was not present
*/ */
public static InternalRequestSignature fromHeaders(byte[] requestBody, HttpHeaders headers) { public static InternalRequestSignature fromHeaders(Crypto crypto, byte[] requestBody, HttpHeaders headers) {
if (headers == null) { if (headers == null) {
return null; return null;
} }
@ -78,7 +83,7 @@ public class InternalRequestSignature {
Mac mac; Mac mac;
try { try {
mac = mac(signatureAlgorithm); mac = crypto.mac(signatureAlgorithm);
} catch (NoSuchAlgorithmException e) { } catch (NoSuchAlgorithmException e) {
throw new BadRequestException(e.getMessage()); throw new BadRequestException(e.getMessage());
} }
@ -112,10 +117,6 @@ public class InternalRequestSignature {
return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature); return MessageDigest.isEqual(sign(mac, key, requestBody), requestSignature);
} }
private static Mac mac(String signatureAlgorithm) throws NoSuchAlgorithmException {
return Mac.getInstance(signatureAlgorithm);
}
private static byte[] sign(Mac mac, SecretKey key, byte[] requestBody) { private static byte[] sign(Mac mac, SecretKey key, byte[] requestBody) {
try { try {
mac.init(key); mac.init(key);

View File

@ -20,6 +20,7 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils; import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
@ -136,6 +137,7 @@ public class RestClient {
if (sessionKey != null && requestSignatureAlgorithm != null) { if (sessionKey != null && requestSignatureAlgorithm != null) {
InternalRequestSignature.addToRequest( InternalRequestSignature.addToRequest(
Crypto.SYSTEM,
sessionKey, sessionKey,
serializedBody != null ? serializedBody.getBytes(StandardCharsets.UTF_8) : null, serializedBody != null ? serializedBody.getBytes(StandardCharsets.UTF_8) : null,
requestSignatureAlgorithm, requestSignatureAlgorithm,

View File

@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException; import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
@ -326,7 +327,7 @@ public class ConnectorsResource implements ConnectResource {
final byte[] requestBody) throws Throwable { final byte[] requestBody) throws Throwable {
List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE); List<Map<String, String>> taskConfigs = new ObjectMapper().readValue(requestBody, TASK_CONFIGS_TYPE);
FutureCallback<Void> cb = new FutureCallback<>(); FutureCallback<Void> cb = new FutureCallback<>();
herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(requestBody, headers)); herder.putTaskConfigs(connector, taskConfigs, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward); completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
} }
@ -338,7 +339,7 @@ public class ConnectorsResource implements ConnectResource {
final @QueryParam("forward") Boolean forward, final @QueryParam("forward") Boolean forward,
final byte[] requestBody) throws Throwable { final byte[] requestBody) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>(); FutureCallback<Void> cb = new FutureCallback<>();
herder.fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(requestBody, headers)); herder.fenceZombieSourceTasks(connector, cb, InternalRequestSignature.fromHeaders(Crypto.SYSTEM, requestBody, headers));
completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward); completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward);
} }

View File

@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.junit.Test; import org.junit.Test;
import org.mockito.MockedStatic;
import javax.crypto.KeyGenerator; import javax.crypto.KeyGenerator;
import javax.crypto.Mac; import javax.crypto.Mac;
@ -45,8 +44,9 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
public class DistributedConfigTest { public class DistributedConfigTest {
@ -69,58 +69,53 @@ public class DistributedConfigTest {
} }
@Test @Test
public void testDefaultAlgorithmsNotPresent() { public void testDefaultAlgorithmsNotPresent() throws NoSuchAlgorithmException {
final String fakeKeyGenerationAlgorithm = "FakeKeyGenerationAlgorithm"; final String fakeKeyGenerationAlgorithm = "FakeKeyGenerationAlgorithm";
final String fakeMacAlgorithm = "FakeMacAlgorithm"; final String fakeMacAlgorithm = "FakeMacAlgorithm";
final KeyGenerator fakeKeyGenerator = mock(KeyGenerator.class); final KeyGenerator fakeKeyGenerator = mock(KeyGenerator.class);
final Mac fakeMac = mock(Mac.class); final Mac fakeMac = mock(Mac.class);
final Crypto crypto = mock(Crypto.class);
Map<String, String> configs = configs(); Map<String, String> configs = configs();
configs.put(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, fakeKeyGenerationAlgorithm); configs.put(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, fakeKeyGenerationAlgorithm);
configs.put(DistributedConfig.INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, fakeMacAlgorithm); configs.put(DistributedConfig.INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, fakeMacAlgorithm);
configs.put(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, fakeMacAlgorithm); configs.put(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, fakeMacAlgorithm);
try ( // Make it seem like the default key generation algorithm isn't available on this worker
MockedStatic<KeyGenerator> keyGenerator = mockStatic(KeyGenerator.class); doThrow(new NoSuchAlgorithmException())
MockedStatic<Mac> mac = mockStatic(Mac.class) .when(crypto).keyGenerator(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
) { // But the one specified in the worker config file is
// Make it seem like the default key generation algorithm isn't available on this worker doReturn(fakeKeyGenerator)
keyGenerator.when(() -> KeyGenerator.getInstance(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT)) .when(crypto).keyGenerator(fakeKeyGenerationAlgorithm);
.thenThrow(new NoSuchAlgorithmException());
// But the one specified in the worker config file is
keyGenerator.when(() -> KeyGenerator.getInstance(fakeKeyGenerationAlgorithm))
.thenReturn(fakeKeyGenerator);
// And for the signature algorithm // And for the signature algorithm
mac.when(() -> Mac.getInstance(DistributedConfig.INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT)) doThrow(new NoSuchAlgorithmException())
.thenThrow(new NoSuchAlgorithmException()); .when(crypto).mac(DistributedConfig.INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
// Likewise for key verification algorithms // Likewise for key verification algorithms
DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT.forEach(verificationAlgorithm -> for (String verificationAlgorithm : DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) {
keyGenerator.when(() -> Mac.getInstance(verificationAlgorithm)) doThrow(new NoSuchAlgorithmException())
.thenThrow(new NoSuchAlgorithmException()) .when(crypto).mac(verificationAlgorithm);
);
mac.when(() -> Mac.getInstance(fakeMacAlgorithm))
.thenReturn(fakeMac);
// Should succeed; even though the defaults aren't present, the manually-specified algorithms are valid
new DistributedConfig(configs);
// Should fail; the default key generation algorithm isn't present, and no override is specified
String removed = configs.remove(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, removed);
// Should fail; the default key generation algorithm isn't present, and no override is specified
removed = configs.remove(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, removed);
// Should fail; the default key generation algorithm isn't present, and no override is specified
removed = configs.remove(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, removed);
} }
doReturn(fakeMac).when(crypto).mac(fakeMacAlgorithm);
// Should succeed; even though the defaults aren't present, the manually-specified algorithms are valid
new DistributedConfig(crypto, configs);
// Should fail; the default key generation algorithm isn't present, and no override is specified
String removed = configs.remove(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, removed);
// Should fail; the default key generation algorithm isn't present, and no override is specified
removed = configs.remove(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, removed);
// Should fail; the default key generation algorithm isn't present, and no override is specified
removed = configs.remove(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
assertThrows(ConfigException.class, () -> new DistributedConfig(configs));
configs.put(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, removed);
} }
@Test @Test

View File

@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime.rest; package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.junit.Test; import org.junit.Test;
@ -28,6 +29,7 @@ import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.SecretKeySpec;
import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.HttpHeaders;
import java.security.NoSuchAlgorithmException;
import java.util.Base64; import java.util.Base64;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -36,6 +38,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -56,46 +59,49 @@ public class InternalRequestSignatureTest {
36, 72, 86, -71, -32, 13, -8, 115, 85, 73, -65, -112, 6, 68, 41, -50 36, 72, 86, -71, -32, 13, -8, 115, 85, 73, -65, -112, 6, 68, 41, -50
}; };
private static final String ENCODED_SIGNATURE = Base64.getEncoder().encodeToString(SIGNATURE); private static final String ENCODED_SIGNATURE = Base64.getEncoder().encodeToString(SIGNATURE);
private final Crypto crypto = Crypto.SYSTEM;
@Test @Test
public void fromHeadersShouldReturnNullOnNullHeaders() { public void fromHeadersShouldReturnNullOnNullHeaders() {
assertNull(InternalRequestSignature.fromHeaders(REQUEST_BODY, null)); assertNull(InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY, null));
} }
@Test @Test
public void fromHeadersShouldReturnNullIfSignatureHeaderMissing() { public void fromHeadersShouldReturnNullIfSignatureHeaderMissing() {
assertNull(InternalRequestSignature.fromHeaders(REQUEST_BODY, internalRequestHeaders(null, SIGNATURE_ALGORITHM))); assertNull(InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY, internalRequestHeaders(null, SIGNATURE_ALGORITHM)));
} }
@Test @Test
public void fromHeadersShouldReturnNullIfSignatureAlgorithmHeaderMissing() { public void fromHeadersShouldReturnNullIfSignatureAlgorithmHeaderMissing() {
assertNull(InternalRequestSignature.fromHeaders(REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, null))); assertNull(InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, null)));
} }
@Test @Test
public void fromHeadersShouldThrowExceptionOnInvalidSignatureAlgorithm() { public void fromHeadersShouldThrowExceptionOnInvalidSignatureAlgorithm() {
assertThrows(BadRequestException.class, () -> InternalRequestSignature.fromHeaders(REQUEST_BODY, assertThrows(BadRequestException.class, () -> InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY,
internalRequestHeaders(ENCODED_SIGNATURE, "doesn'texist"))); internalRequestHeaders(ENCODED_SIGNATURE, "doesn'texist")));
} }
@Test @Test
public void fromHeadersShouldThrowExceptionOnInvalidBase64Signature() { public void fromHeadersShouldThrowExceptionOnInvalidBase64Signature() {
assertThrows(BadRequestException.class, () -> InternalRequestSignature.fromHeaders(REQUEST_BODY, assertThrows(BadRequestException.class, () -> InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY,
internalRequestHeaders("not valid base 64", SIGNATURE_ALGORITHM))); internalRequestHeaders("not valid base 64", SIGNATURE_ALGORITHM)));
} }
@Test @Test
public void fromHeadersShouldReturnNonNullResultOnValidSignatureAndSignatureAlgorithm() { public void fromHeadersShouldReturnNonNullResultOnValidSignatureAndSignatureAlgorithm() {
InternalRequestSignature signature = InternalRequestSignature signature =
InternalRequestSignature.fromHeaders(REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, SIGNATURE_ALGORITHM)); InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, SIGNATURE_ALGORITHM));
assertNotNull(signature); assertNotNull(signature);
assertNotNull(signature.keyAlgorithm()); assertNotNull(signature.keyAlgorithm());
} }
@Test @Test
public void addToRequestShouldThrowExceptionOnInvalidSignatureAlgorithm() { public void addToRequestShouldThrowExceptionOnInvalidSignatureAlgorithm() throws NoSuchAlgorithmException {
Request request = mock(Request.class); Request request = mock(Request.class);
assertThrows(ConnectException.class, () -> InternalRequestSignature.addToRequest(KEY, REQUEST_BODY, "doesn'texist", request)); Crypto crypto = mock(Crypto.class);
when(crypto.mac(anyString())).thenThrow(new NoSuchAlgorithmException("doesn'texist"));
assertThrows(ConnectException.class, () -> InternalRequestSignature.addToRequest(crypto, KEY, REQUEST_BODY, "doesn'texist", request));
} }
@Test @Test
@ -112,7 +118,7 @@ public class InternalRequestSignatureTest {
signatureAlgorithmCapture.capture() signatureAlgorithmCapture.capture()
)).thenReturn(request); )).thenReturn(request);
InternalRequestSignature.addToRequest(KEY, REQUEST_BODY, SIGNATURE_ALGORITHM, request); InternalRequestSignature.addToRequest(crypto, KEY, REQUEST_BODY, SIGNATURE_ALGORITHM, request);
assertEquals( assertEquals(
"Request should have valid base 64-encoded signature added as header", "Request should have valid base 64-encoded signature added as header",
@ -133,7 +139,7 @@ public class InternalRequestSignatureTest {
InternalRequestSignature signature = new InternalRequestSignature(REQUEST_BODY, mac, SIGNATURE); InternalRequestSignature signature = new InternalRequestSignature(REQUEST_BODY, mac, SIGNATURE);
assertTrue(signature.isValid(KEY)); assertTrue(signature.isValid(KEY));
signature = InternalRequestSignature.fromHeaders(REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, SIGNATURE_ALGORITHM)); signature = InternalRequestSignature.fromHeaders(crypto, REQUEST_BODY, internalRequestHeaders(ENCODED_SIGNATURE, SIGNATURE_ALGORITHM));
assertTrue(signature.isValid(KEY)); assertTrue(signature.isValid(KEY));
signature = new InternalRequestSignature("[{\"different_config\":\"different_value\"}]".getBytes(), mac, SIGNATURE); signature = new InternalRequestSignature("[{\"different_config\":\"different_value\"}]".getBytes(), mac, SIGNATURE);