mirror of https://github.com/apache/kafka.git
KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (#8695)
1. Enables `TLSv1.3` by default with Java 11 or newer. 2. Add unit tests that cover the various TLSv1.2 and TLSv1.3 combinations. 3. Extend `benchmark_test.py` and `replication_test.py` to run with 'TLSv1.2' or 'TLSv1.3'. Reviewers: Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
parent
9e5e4cd558
commit
8b22b81596
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.kafka.common.config;
|
package org.apache.kafka.common.config;
|
||||||
|
|
||||||
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
|
import org.apache.kafka.common.utils.Java;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
import javax.net.ssl.KeyManagerFactory;
|
import javax.net.ssl.KeyManagerFactory;
|
||||||
|
@ -49,11 +50,15 @@ public class SslConfigs {
|
||||||
|
|
||||||
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
|
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
|
||||||
public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
|
public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. "
|
||||||
+ "Default setting is TLSv1.2, which is fine for most cases. "
|
+ "The default is 'TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. "
|
||||||
+ "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 "
|
+ "This value should be fine for most use cases. "
|
||||||
+ "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.";
|
+ "Allowed values in recent JVMs are 'TLSv1.2' and 'TLSv1.3'. 'TLS', 'TLSv1.1', 'SSL', 'SSLv2' and 'SSLv3' "
|
||||||
|
+ "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. "
|
||||||
|
+ "With the default value for this config and 'ssl.enabled.protocols', clients will downgrade to 'TLSv1.2' if "
|
||||||
|
+ "the server does not support 'TLSv1.3'. If this config is set to 'TLSv1.2', clients will not use 'TLSv1.3' even "
|
||||||
|
+ "if it is one of the values in ssl.enabled.protocols and the server only supports 'TLSv1.3'.";
|
||||||
|
|
||||||
public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2";
|
public static final String DEFAULT_SSL_PROTOCOL;
|
||||||
|
|
||||||
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
|
public static final String SSL_PROVIDER_CONFIG = "ssl.provider";
|
||||||
public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
|
public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.";
|
||||||
|
@ -63,8 +68,22 @@ public class SslConfigs {
|
||||||
+ "By default all the available cipher suites are supported.";
|
+ "By default all the available cipher suites are supported.";
|
||||||
|
|
||||||
public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
|
public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
|
||||||
public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections.";
|
public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
|
||||||
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2";
|
+ "The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the "
|
||||||
|
+ "default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback "
|
||||||
|
+ "to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most "
|
||||||
|
+ "cases. Also see the config documentation for `ssl.protocol`.";
|
||||||
|
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS;
|
||||||
|
|
||||||
|
static {
|
||||||
|
if (Java.IS_JAVA11_COMPATIBLE) {
|
||||||
|
DEFAULT_SSL_PROTOCOL = "TLSv1.3";
|
||||||
|
DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3";
|
||||||
|
} else {
|
||||||
|
DEFAULT_SSL_PROTOCOL = "TLSv1.2";
|
||||||
|
DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
|
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
|
||||||
public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
|
public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.junit.runners.Parameterized;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
import javax.net.ssl.SSLParameters;
|
import javax.net.ssl.SSLParameters;
|
||||||
import javax.net.ssl.SSLServerSocketFactory;
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
@ -578,26 +577,6 @@ public class SslTransportLayerTest {
|
||||||
server.verifyAuthenticationMetrics(1, 2);
|
server.verifyAuthenticationMetrics(1, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUnsupportedCipher() throws Exception {
|
|
||||||
String[] cipherSuites = ((SSLServerSocketFactory) SSLServerSocketFactory.getDefault()).getSupportedCipherSuites();
|
|
||||||
if (cipherSuites != null && cipherSuites.length > 1) {
|
|
||||||
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
|
|
||||||
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[0]));
|
|
||||||
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
|
|
||||||
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[1]));
|
|
||||||
|
|
||||||
server = createEchoServer(SecurityProtocol.SSL);
|
|
||||||
createSelector(sslClientConfigs);
|
|
||||||
|
|
||||||
checkAuthentiationFailed("1", "TLSv1.1");
|
|
||||||
server.verifyAuthenticationMetrics(0, 1);
|
|
||||||
|
|
||||||
checkAuthentiationFailed("2", "TLSv1");
|
|
||||||
server.verifyAuthenticationMetrics(0, 2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Checks connection failed using the specified {@code tlsVersion}. */
|
/** Checks connection failed using the specified {@code tlsVersion}. */
|
||||||
private void checkAuthentiationFailed(String node, String tlsVersion) throws IOException {
|
private void checkAuthentiationFailed(String node, String tlsVersion) throws IOException {
|
||||||
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(tlsVersion));
|
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(tlsVersion));
|
||||||
|
@ -627,7 +606,6 @@ public class SslTransportLayerTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUnsupportedCiphers() throws Exception {
|
public void testUnsupportedCiphers() throws Exception {
|
||||||
String node = "0";
|
|
||||||
SSLContext context = SSLContext.getInstance(tlsProtocol);
|
SSLContext context = SSLContext.getInstance(tlsProtocol);
|
||||||
context.init(null, null, null);
|
context.init(null, null, null);
|
||||||
String[] cipherSuites = context.getDefaultSSLParameters().getCipherSuites();
|
String[] cipherSuites = context.getDefaultSSLParameters().getCipherSuites();
|
||||||
|
@ -636,10 +614,8 @@ public class SslTransportLayerTest {
|
||||||
|
|
||||||
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
|
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
|
||||||
createSelector(sslClientConfigs);
|
createSelector(sslClientConfigs);
|
||||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
|
|
||||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
|
||||||
|
|
||||||
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
|
checkAuthentiationFailed("1", tlsProtocol);
|
||||||
server.verifyAuthenticationMetrics(0, 1);
|
server.verifyAuthenticationMetrics(0, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1250,7 +1226,7 @@ public class SslTransportLayerTest {
|
||||||
void run() throws IOException;
|
void run() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestSslChannelBuilder extends SslChannelBuilder {
|
static class TestSslChannelBuilder extends SslChannelBuilder {
|
||||||
|
|
||||||
private Integer netReadBufSizeOverride;
|
private Integer netReadBufSizeOverride;
|
||||||
private Integer netWriteBufSizeOverride;
|
private Integer netWriteBufSizeOverride;
|
||||||
|
@ -1361,7 +1337,7 @@ public class SslTransportLayerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ResizeableBufferSize {
|
static class ResizeableBufferSize {
|
||||||
private Integer bufSizeOverride;
|
private Integer bufSizeOverride;
|
||||||
ResizeableBufferSize(Integer bufSizeOverride) {
|
ResizeableBufferSize(Integer bufSizeOverride) {
|
||||||
this.bufSizeOverride = bufSizeOverride;
|
this.bufSizeOverride = bufSizeOverride;
|
||||||
|
|
|
@ -0,0 +1,169 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.network;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.security.TestSecurityConfig;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
import org.apache.kafka.common.utils.Java;
|
||||||
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
public class SslTransportTls12Tls13Test {
|
||||||
|
private static final int BUFFER_SIZE = 4 * 1024;
|
||||||
|
private static final Time TIME = Time.SYSTEM;
|
||||||
|
|
||||||
|
private NioEchoServer server;
|
||||||
|
private Selector selector;
|
||||||
|
private Map<String, Object> sslClientConfigs;
|
||||||
|
private Map<String, Object> sslServerConfigs;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
// Create certificates for use by client and server. Add server cert to client truststore and vice versa.
|
||||||
|
CertStores serverCertStores = new CertStores(true, "server", "localhost");
|
||||||
|
CertStores clientCertStores = new CertStores(false, "client", "localhost");
|
||||||
|
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
|
||||||
|
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
|
||||||
|
|
||||||
|
LogContext logContext = new LogContext();
|
||||||
|
ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext);
|
||||||
|
channelBuilder.configure(sslClientConfigs);
|
||||||
|
this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
if (selector != null)
|
||||||
|
this.selector.close();
|
||||||
|
if (server != null)
|
||||||
|
this.server.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCiphersSuiteForTls12FailsForTls13() throws Exception {
|
||||||
|
assumeTrue(Java.IS_JAVA11_COMPATIBLE);
|
||||||
|
|
||||||
|
String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
|
||||||
|
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3"));
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
|
||||||
|
SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
|
||||||
|
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3"));
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
|
||||||
|
checkAuthentiationFailed();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that connections can't be made if server uses TLSv1.2 with custom cipher suite and client uses TLSv1.3.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCiphersSuiteFailForServerTls12ClientTls13() throws Exception {
|
||||||
|
assumeTrue(Java.IS_JAVA11_COMPATIBLE);
|
||||||
|
|
||||||
|
String tls12CipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
|
||||||
|
String tls13CipherSuite = "TLS_AES_128_GCM_SHA256";
|
||||||
|
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2");
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.2"));
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls12CipherSuite));
|
||||||
|
server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
|
||||||
|
SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
|
||||||
|
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3");
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls13CipherSuite));
|
||||||
|
|
||||||
|
checkAuthentiationFailed();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that connections can be made with TLSv1.3 cipher suite.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCiphersSuiteForTls13() throws Exception {
|
||||||
|
assumeTrue(Java.IS_JAVA11_COMPATIBLE);
|
||||||
|
|
||||||
|
String cipherSuite = "TLS_AES_128_GCM_SHA256";
|
||||||
|
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
|
||||||
|
SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
|
||||||
|
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
checkAuthenticationSucceed();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that connections can be made with TLSv1.2 cipher suite.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCiphersSuiteForTls12() throws Exception {
|
||||||
|
String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384";
|
||||||
|
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(",")));
|
||||||
|
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
|
||||||
|
SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME);
|
||||||
|
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(",")));
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite));
|
||||||
|
checkAuthenticationSucceed();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Checks connection failed using the specified {@code tlsVersion}. */
|
||||||
|
private void checkAuthentiationFailed() throws IOException, InterruptedException {
|
||||||
|
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3"));
|
||||||
|
createSelector(sslClientConfigs);
|
||||||
|
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
|
||||||
|
selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||||
|
|
||||||
|
NetworkTestUtils.waitForChannelClose(selector, "0", ChannelState.State.AUTHENTICATION_FAILED);
|
||||||
|
server.verifyAuthenticationMetrics(0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkAuthenticationSucceed() throws IOException, InterruptedException {
|
||||||
|
createSelector(sslClientConfigs);
|
||||||
|
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
|
||||||
|
selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||||
|
NetworkTestUtils.waitForChannelReady(selector, "0");
|
||||||
|
server.verifyAuthenticationMetrics(1, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createSelector(Map<String, Object> sslClientConfigs) {
|
||||||
|
SslTransportLayerTest.TestSslChannelBuilder channelBuilder = new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT);
|
||||||
|
channelBuilder.configureBufferSizes(null, null, null);
|
||||||
|
channelBuilder.configure(sslClientConfigs);
|
||||||
|
this.selector = new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,183 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.network;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.kafka.common.config.SslConfigs;
|
||||||
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
import org.apache.kafka.common.security.TestSecurityConfig;
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
|
import org.apache.kafka.common.utils.Java;
|
||||||
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for the SSL transport layer.
|
||||||
|
* Checks different versions of the protocol usage on the server and client.
|
||||||
|
*/
|
||||||
|
@RunWith(value = Parameterized.class)
|
||||||
|
public class SslVersionsTransportLayerTest {
|
||||||
|
private static final int BUFFER_SIZE = 4 * 1024;
|
||||||
|
private static final Time TIME = Time.SYSTEM;
|
||||||
|
|
||||||
|
private final List<String> serverProtocols;
|
||||||
|
private final List<String> clientProtocols;
|
||||||
|
|
||||||
|
@Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}")
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
List<Object[]> values = new ArrayList<>();
|
||||||
|
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.2")});
|
||||||
|
|
||||||
|
if (Java.IS_JAVA11_COMPATIBLE) {
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.3")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.2")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.3")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")});
|
||||||
|
values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.3")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.2")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.3")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.2")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")});
|
||||||
|
values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")});
|
||||||
|
}
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Be aware that you can turn on debug mode for a javax.net.ssl library with the line {@code System.setProperty("javax.net.debug", "ssl:handshake");}
|
||||||
|
* @param serverProtocols Server protocols.
|
||||||
|
* @param clientProtocols Client protocols.
|
||||||
|
*/
|
||||||
|
public SslVersionsTransportLayerTest(List<String> serverProtocols, List<String> clientProtocols) {
|
||||||
|
this.serverProtocols = serverProtocols;
|
||||||
|
this.clientProtocols = clientProtocols;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that connection success with the default TLS version.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTlsDefaults() throws Exception {
|
||||||
|
// Create certificates for use by client and server. Add server cert to client truststore and vice versa.
|
||||||
|
CertStores serverCertStores = new CertStores(true, "server", "localhost");
|
||||||
|
CertStores clientCertStores = new CertStores(false, "client", "localhost");
|
||||||
|
|
||||||
|
Map<String, Object> sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores, clientProtocols);
|
||||||
|
Map<String, Object> sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores, serverProtocols);
|
||||||
|
|
||||||
|
NioEchoServer server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL),
|
||||||
|
SecurityProtocol.SSL,
|
||||||
|
new TestSecurityConfig(sslServerConfigs),
|
||||||
|
null,
|
||||||
|
TIME);
|
||||||
|
Selector selector = createClientSelector(sslClientConfigs);
|
||||||
|
|
||||||
|
String node = "0";
|
||||||
|
selector.connect(node, new InetSocketAddress("localhost", server.port()), BUFFER_SIZE, BUFFER_SIZE);
|
||||||
|
|
||||||
|
if (isCompatible(serverProtocols, clientProtocols)) {
|
||||||
|
NetworkTestUtils.waitForChannelReady(selector, node);
|
||||||
|
|
||||||
|
int msgSz = 1024 * 1024;
|
||||||
|
String message = TestUtils.randomString(msgSz);
|
||||||
|
selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes())));
|
||||||
|
while (selector.completedReceives().isEmpty()) {
|
||||||
|
selector.poll(100L);
|
||||||
|
}
|
||||||
|
int totalBytes = msgSz + 4; // including 4-byte size
|
||||||
|
server.waitForMetric("incoming-byte", totalBytes);
|
||||||
|
server.waitForMetric("outgoing-byte", totalBytes);
|
||||||
|
server.waitForMetric("request", 1);
|
||||||
|
server.waitForMetric("response", 1);
|
||||||
|
} else {
|
||||||
|
NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED);
|
||||||
|
server.verifyAuthenticationMetrics(0, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* The explanation of this check in the structure of the ClientHello SSL message.
|
||||||
|
* Please, take a look at the <a href="https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6">Guide</a>,
|
||||||
|
* "Send ClientHello Message" section.
|
||||||
|
* <p>
|
||||||
|
* > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version
|
||||||
|
* ...
|
||||||
|
* > supported_versions: Lists which versions of TLS the client supports. In particular, if the client
|
||||||
|
* > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension
|
||||||
|
* > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the
|
||||||
|
* > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3.
|
||||||
|
* <p>
|
||||||
|
*
|
||||||
|
* This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3.
|
||||||
|
*
|
||||||
|
* @param serverProtocols Server protocols. Expected to be non empty.
|
||||||
|
* @param clientProtocols Client protocols. Expected to be non empty.
|
||||||
|
* @return {@code true} if client should be able to connect to the server.
|
||||||
|
*/
|
||||||
|
private boolean isCompatible(List<String> serverProtocols, List<String> clientProtocols) {
|
||||||
|
assertNotNull(serverProtocols);
|
||||||
|
assertFalse(serverProtocols.isEmpty());
|
||||||
|
assertNotNull(clientProtocols);
|
||||||
|
assertFalse(clientProtocols.isEmpty());
|
||||||
|
|
||||||
|
return serverProtocols.contains(clientProtocols.get(0)) ||
|
||||||
|
(clientProtocols.get(0).equals("TLSv1.3") && !Collections.disjoint(serverProtocols, clientProtocols));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> getTrustingConfig(CertStores certStores, CertStores peerCertStores, List<String> tlsProtocols) {
|
||||||
|
Map<String, Object> configs = certStores.getTrustingConfig(peerCertStores);
|
||||||
|
configs.putAll(sslConfig(tlsProtocols));
|
||||||
|
return configs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Map<String, Object> sslConfig(List<String> tlsProtocols) {
|
||||||
|
Map<String, Object> sslConfig = new HashMap<>();
|
||||||
|
sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0));
|
||||||
|
sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols);
|
||||||
|
return sslConfig;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Selector createClientSelector(Map<String, Object> sslClientConfigs) {
|
||||||
|
SslTransportLayerTest.TestSslChannelBuilder channelBuilder =
|
||||||
|
new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT);
|
||||||
|
channelBuilder.configureBufferSizes(null, null, null);
|
||||||
|
channelBuilder.configure(sslClientConfigs);
|
||||||
|
return new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext());
|
||||||
|
}
|
||||||
|
}
|
|
@ -170,7 +170,7 @@ class SocketServerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def sslClientSocket(port: Int): Socket = {
|
private def sslClientSocket(port: Int): Socket = {
|
||||||
val sslContext = SSLContext.getInstance("TLSv1.2")
|
val sslContext = SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS)
|
||||||
sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
|
sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom())
|
||||||
val socketFactory = sslContext.getSocketFactory
|
val socketFactory = sslContext.getSocketFactory
|
||||||
val socket = socketFactory.createSocket("localhost", port)
|
val socket = socketFactory.createSocket("localhost", port)
|
||||||
|
|
|
@ -23,7 +23,12 @@
|
||||||
<ul>
|
<ul>
|
||||||
<li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
|
<li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application
|
||||||
scalability using exactly-once guarantees
|
scalability using exactly-once guarantees
|
||||||
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a></li>
|
(cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics">KIP-447</a>)
|
||||||
|
</li>
|
||||||
|
<li>TLSv1.3 has been enabled by default for Java 11 or newer. The client and server will negotiate TLSv1.3 if
|
||||||
|
both support it and fallback to TLSv1.2 otherwise. See
|
||||||
|
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-573%3A+Enable+TLSv1.3+by+default">KIP-573</a> for more details.
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
<h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>
|
<h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>
|
||||||
|
|
|
@ -55,12 +55,12 @@ class Benchmark(Test):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.zk.start()
|
self.zk.start()
|
||||||
|
|
||||||
def start_kafka(self, security_protocol, interbroker_security_protocol, version):
|
def start_kafka(self, security_protocol, interbroker_security_protocol, version, tls_version=None):
|
||||||
self.kafka = KafkaService(
|
self.kafka = KafkaService(
|
||||||
self.test_context, self.num_brokers,
|
self.test_context, self.num_brokers,
|
||||||
self.zk, security_protocol=security_protocol,
|
self.zk, security_protocol=security_protocol,
|
||||||
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
|
interbroker_security_protocol=interbroker_security_protocol, topics=self.topics,
|
||||||
version=version)
|
version=version, tls_version=tls_version)
|
||||||
self.kafka.log_level = "INFO" # We don't DEBUG logging here
|
self.kafka.log_level = "INFO" # We don't DEBUG logging here
|
||||||
self.kafka.start()
|
self.kafka.start()
|
||||||
|
|
||||||
|
@ -68,11 +68,12 @@ class Benchmark(Test):
|
||||||
@parametrize(acks=1, topic=TOPIC_REP_ONE)
|
@parametrize(acks=1, topic=TOPIC_REP_ONE)
|
||||||
@parametrize(acks=1, topic=TOPIC_REP_THREE)
|
@parametrize(acks=1, topic=TOPIC_REP_THREE)
|
||||||
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
|
@parametrize(acks=-1, topic=TOPIC_REP_THREE)
|
||||||
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL'])
|
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['SSL'], tls_version=['TLSv1.2', 'TLSv1.3'])
|
||||||
|
@matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT'])
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
|
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
|
||||||
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
|
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
|
||||||
compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH),
|
compression_type="none", security_protocol='PLAINTEXT', tls_version=None, client_version=str(DEV_BRANCH),
|
||||||
broker_version=str(DEV_BRANCH)):
|
broker_version=str(DEV_BRANCH)):
|
||||||
"""
|
"""
|
||||||
Setup: 1 node zk + 3 node kafka cluster
|
Setup: 1 node zk + 3 node kafka cluster
|
||||||
|
@ -85,7 +86,7 @@ class Benchmark(Test):
|
||||||
client_version = KafkaVersion(client_version)
|
client_version = KafkaVersion(client_version)
|
||||||
broker_version = KafkaVersion(broker_version)
|
broker_version = KafkaVersion(broker_version)
|
||||||
self.validate_versions(client_version, broker_version)
|
self.validate_versions(client_version, broker_version)
|
||||||
self.start_kafka(security_protocol, security_protocol, broker_version)
|
self.start_kafka(security_protocol, security_protocol, broker_version, tls_version)
|
||||||
# Always generate the same total amount of data
|
# Always generate the same total amount of data
|
||||||
nrecords = int(self.target_data_size / message_size)
|
nrecords = int(self.target_data_size / message_size)
|
||||||
|
|
||||||
|
@ -101,9 +102,10 @@ class Benchmark(Test):
|
||||||
return compute_aggregate_throughput(self.producer)
|
return compute_aggregate_throughput(self.producer)
|
||||||
|
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
|
||||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
|
||||||
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
|
def test_long_term_producer_throughput(self, compression_type="none",
|
||||||
|
security_protocol='PLAINTEXT', tls_version=None,
|
||||||
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
||||||
broker_version=str(DEV_BRANCH)):
|
broker_version=str(DEV_BRANCH)):
|
||||||
"""
|
"""
|
||||||
|
@ -119,7 +121,7 @@ class Benchmark(Test):
|
||||||
self.validate_versions(client_version, broker_version)
|
self.validate_versions(client_version, broker_version)
|
||||||
if interbroker_security_protocol is None:
|
if interbroker_security_protocol is None:
|
||||||
interbroker_security_protocol = security_protocol
|
interbroker_security_protocol = security_protocol
|
||||||
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
|
||||||
self.producer = ProducerPerformanceService(
|
self.producer = ProducerPerformanceService(
|
||||||
self.test_context, 1, self.kafka,
|
self.test_context, 1, self.kafka,
|
||||||
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
|
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
|
||||||
|
@ -157,11 +159,11 @@ class Benchmark(Test):
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@cluster(num_nodes=5)
|
@cluster(num_nodes=5)
|
||||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
|
||||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
|
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
|
||||||
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
|
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
|
||||||
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
|
||||||
broker_version=str(DEV_BRANCH)):
|
broker_version=str(DEV_BRANCH)):
|
||||||
"""
|
"""
|
||||||
|
@ -178,7 +180,7 @@ class Benchmark(Test):
|
||||||
self.validate_versions(client_version, broker_version)
|
self.validate_versions(client_version, broker_version)
|
||||||
if interbroker_security_protocol is None:
|
if interbroker_security_protocol is None:
|
||||||
interbroker_security_protocol = security_protocol
|
interbroker_security_protocol = security_protocol
|
||||||
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
|
||||||
self.logger.info("BENCHMARK: End to end latency")
|
self.logger.info("BENCHMARK: End to end latency")
|
||||||
self.perf = EndToEndLatencyService(
|
self.perf = EndToEndLatencyService(
|
||||||
self.test_context, 1, self.kafka,
|
self.test_context, 1, self.kafka,
|
||||||
|
@ -189,9 +191,9 @@ class Benchmark(Test):
|
||||||
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
|
||||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
|
||||||
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT",
|
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
|
||||||
interbroker_security_protocol=None,
|
interbroker_security_protocol=None,
|
||||||
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
||||||
"""
|
"""
|
||||||
|
@ -207,7 +209,7 @@ class Benchmark(Test):
|
||||||
self.validate_versions(client_version, broker_version)
|
self.validate_versions(client_version, broker_version)
|
||||||
if interbroker_security_protocol is None:
|
if interbroker_security_protocol is None:
|
||||||
interbroker_security_protocol = security_protocol
|
interbroker_security_protocol = security_protocol
|
||||||
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
|
||||||
num_records = 10 * 1000 * 1000 # 10e6
|
num_records = 10 * 1000 * 1000 # 10e6
|
||||||
|
|
||||||
self.producer = ProducerPerformanceService(
|
self.producer = ProducerPerformanceService(
|
||||||
|
@ -236,9 +238,9 @@ class Benchmark(Test):
|
||||||
return data
|
return data
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
|
@matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"])
|
||||||
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
|
@matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"])
|
||||||
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT",
|
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None,
|
||||||
interbroker_security_protocol=None, num_consumers=1,
|
interbroker_security_protocol=None, num_consumers=1,
|
||||||
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
|
||||||
"""
|
"""
|
||||||
|
@ -250,7 +252,7 @@ class Benchmark(Test):
|
||||||
self.validate_versions(client_version, broker_version)
|
self.validate_versions(client_version, broker_version)
|
||||||
if interbroker_security_protocol is None:
|
if interbroker_security_protocol is None:
|
||||||
interbroker_security_protocol = security_protocol
|
interbroker_security_protocol = security_protocol
|
||||||
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version)
|
self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version)
|
||||||
num_records = 10 * 1000 * 1000 # 10e6
|
num_records = 10 * 1000 * 1000 # 10e6
|
||||||
|
|
||||||
# seed kafka w/messages
|
# seed kafka w/messages
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import collections
|
|
||||||
import json
|
import json
|
||||||
import os.path
|
import os.path
|
||||||
import re
|
import re
|
||||||
|
@ -31,7 +30,7 @@ from kafkatest.services.monitor.jmx import JmxMixin
|
||||||
from kafkatest.services.security.minikdc import MiniKdc
|
from kafkatest.services.security.minikdc import MiniKdc
|
||||||
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
|
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
|
||||||
from kafkatest.services.security.security_config import SecurityConfig
|
from kafkatest.services.security.security_config import SecurityConfig
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2
|
from kafkatest.version import DEV_BRANCH
|
||||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||||
|
|
||||||
|
|
||||||
|
@ -95,17 +94,20 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
"collect_default": True}
|
"collect_default": True}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
|
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT,
|
||||||
|
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
|
||||||
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
|
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
|
||||||
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
|
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
|
||||||
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
|
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
|
||||||
zk_client_secure=False,
|
zk_client_secure=False,
|
||||||
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""):
|
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None,
|
||||||
|
extra_kafka_opts="", tls_version=None):
|
||||||
"""
|
"""
|
||||||
:param context: test context
|
:param context: test context
|
||||||
:param ZookeeperService zk:
|
:param ZookeeperService zk:
|
||||||
:param dict topics: which topics to create automatically
|
:param dict topics: which topics to create automatically
|
||||||
:param str security_protocol: security protocol for clients to use
|
:param str security_protocol: security protocol for clients to use
|
||||||
|
:param str tls_version: version of the TLS protocol.
|
||||||
:param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
|
:param str interbroker_security_protocol: security protocol to use for broker-to-broker communication
|
||||||
:param str client_sasl_mechanism: sasl mechanism for clients to use
|
:param str client_sasl_mechanism: sasl mechanism for clients to use
|
||||||
:param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
|
:param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication
|
||||||
|
@ -129,6 +131,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
self.zk = zk
|
self.zk = zk
|
||||||
|
|
||||||
self.security_protocol = security_protocol
|
self.security_protocol = security_protocol
|
||||||
|
self.tls_version = tls_version
|
||||||
self.client_sasl_mechanism = client_sasl_mechanism
|
self.client_sasl_mechanism = client_sasl_mechanism
|
||||||
self.topics = topics
|
self.topics = topics
|
||||||
self.minikdc = None
|
self.minikdc = None
|
||||||
|
@ -215,7 +218,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
|
zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure,
|
||||||
client_sasl_mechanism=self.client_sasl_mechanism,
|
client_sasl_mechanism=self.client_sasl_mechanism,
|
||||||
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
|
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism,
|
||||||
listener_security_config=self.listener_security_config)
|
listener_security_config=self.listener_security_config,
|
||||||
|
tls_version=self.tls_version)
|
||||||
for port in self.port_mappings.values():
|
for port in self.port_mappings.values():
|
||||||
if port.open:
|
if port.open:
|
||||||
config.enable_security_protocol(port.security_protocol)
|
config.enable_security_protocol(port.security_protocol)
|
||||||
|
@ -354,15 +358,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
|
|
||||||
def start_node(self, node, timeout_sec=60):
|
def start_node(self, node, timeout_sec=60):
|
||||||
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
|
node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
|
||||||
|
|
||||||
|
self.security_config.setup_node(node)
|
||||||
|
self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True)
|
||||||
|
|
||||||
prop_file = self.prop_file(node)
|
prop_file = self.prop_file(node)
|
||||||
self.logger.info("kafka.properties:")
|
self.logger.info("kafka.properties:")
|
||||||
self.logger.info(prop_file)
|
self.logger.info(prop_file)
|
||||||
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
|
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
|
||||||
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
|
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
|
||||||
|
|
||||||
self.security_config.setup_node(node)
|
|
||||||
self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True)
|
|
||||||
|
|
||||||
cmd = self.start_cmd(node)
|
cmd = self.start_cmd(node)
|
||||||
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
||||||
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
|
with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor:
|
||||||
|
|
|
@ -44,7 +44,10 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
{% if security_config.tls_version is not none %}
|
||||||
|
ssl.enabled.protocols={{ security_config.tls_version }}
|
||||||
|
ssl.protocol={{ security_config.tls_version }}
|
||||||
|
{% endif %}
|
||||||
ssl.keystore.location=/mnt/security/test.keystore.jks
|
ssl.keystore.location=/mnt/security/test.keystore.jks
|
||||||
ssl.keystore.password=test-ks-passwd
|
ssl.keystore.password=test-ks-passwd
|
||||||
ssl.key.password=test-ks-passwd
|
ssl.key.password=test-ks-passwd
|
||||||
|
|
|
@ -13,16 +13,16 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import os.path
|
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
|
|
||||||
from kafkatest.directory_layout.kafka_path import create_path_resolver
|
from kafkatest.utils.remote_account import java_version
|
||||||
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
|
||||||
|
|
||||||
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
|
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
|
||||||
|
|
||||||
new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
|
new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)])
|
||||||
|
|
||||||
|
|
||||||
def fix_opts_for_new_jvm(node):
|
def fix_opts_for_new_jvm(node):
|
||||||
# Startup scripts for early versions of Kafka contains options
|
# Startup scripts for early versions of Kafka contains options
|
||||||
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
|
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC.
|
||||||
|
@ -38,21 +38,4 @@ def fix_opts_for_new_jvm(node):
|
||||||
cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
|
cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; "
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
def java_version(node):
|
|
||||||
# Determine java version on the node
|
|
||||||
version = -1
|
|
||||||
for line in node.account.ssh_capture("java -version"):
|
|
||||||
if line.find("version") != -1:
|
|
||||||
version = parse_version_str(line)
|
|
||||||
return version
|
|
||||||
|
|
||||||
def parse_version_str(line):
|
|
||||||
# Parse java version string. Examples:
|
|
||||||
#`openjdk version "11.0.5" 2019-10-15` will return 11.
|
|
||||||
#`java version "1.5.0"` will return 5.
|
|
||||||
line = line[line.find('version \"') + 9:]
|
|
||||||
dot_pos = line.find(".")
|
|
||||||
if line[:dot_pos] == "1":
|
|
||||||
return int(line[dot_pos+1:line.find(".", dot_pos+1)])
|
|
||||||
else:
|
|
||||||
return int(line[:dot_pos])
|
|
||||||
|
|
|
@ -28,14 +28,14 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService):
|
||||||
"collect_default": False}
|
"collect_default": False}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT"):
|
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT", tls_version=None):
|
||||||
super(KafkaLog4jAppender, self).__init__(context, num_nodes)
|
super(KafkaLog4jAppender, self).__init__(context, num_nodes)
|
||||||
|
|
||||||
self.kafka = kafka
|
self.kafka = kafka
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.max_messages = max_messages
|
self.max_messages = max_messages
|
||||||
self.security_protocol = security_protocol
|
self.security_protocol = security_protocol
|
||||||
self.security_config = SecurityConfig(self.context, security_protocol)
|
self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
|
||||||
self.stop_timeout_sec = 30
|
self.stop_timeout_sec = 30
|
||||||
|
|
||||||
for node in self.nodes:
|
for node in self.nodes:
|
||||||
|
|
|
@ -33,12 +33,13 @@ class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService):
|
||||||
"collect_default": True}
|
"collect_default": True}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30):
|
def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30, tls_version=None):
|
||||||
super(LogCompactionTester, self).__init__(context, 1)
|
super(LogCompactionTester, self).__init__(context, 1)
|
||||||
|
|
||||||
self.kafka = kafka
|
self.kafka = kafka
|
||||||
self.security_protocol = security_protocol
|
self.security_protocol = security_protocol
|
||||||
self.security_config = SecurityConfig(self.context, security_protocol)
|
self.tls_version = tls_version
|
||||||
|
self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
|
||||||
self.stop_timeout_sec = stop_timeout_sec
|
self.stop_timeout_sec = stop_timeout_sec
|
||||||
self.log_compaction_completed = False
|
self.log_compaction_completed = False
|
||||||
|
|
||||||
|
|
|
@ -29,14 +29,16 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService):
|
||||||
"collect_default": False}
|
"collect_default": False}
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", stop_timeout_sec=30):
|
def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT",
|
||||||
|
stop_timeout_sec=30, tls_version=None):
|
||||||
super(ReplicaVerificationTool, self).__init__(context, num_nodes)
|
super(ReplicaVerificationTool, self).__init__(context, num_nodes)
|
||||||
|
|
||||||
self.kafka = kafka
|
self.kafka = kafka
|
||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.report_interval_ms = report_interval_ms
|
self.report_interval_ms = report_interval_ms
|
||||||
self.security_protocol = security_protocol
|
self.security_protocol = security_protocol
|
||||||
self.security_config = SecurityConfig(self.context, security_protocol)
|
self.tls_version = tls_version
|
||||||
|
self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version)
|
||||||
self.partition_lag = {}
|
self.partition_lag = {}
|
||||||
self.stop_timeout_sec = stop_timeout_sec
|
self.stop_timeout_sec = stop_timeout_sec
|
||||||
|
|
||||||
|
|
|
@ -19,10 +19,13 @@ import subprocess
|
||||||
from tempfile import mkdtemp
|
from tempfile import mkdtemp
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
from ducktape.template import TemplateRenderer
|
from ducktape.template import TemplateRenderer
|
||||||
|
|
||||||
from kafkatest.services.security.minikdc import MiniKdc
|
from kafkatest.services.security.minikdc import MiniKdc
|
||||||
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
|
from kafkatest.services.security.listener_security_config import ListenerSecurityConfig
|
||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
|
from kafkatest.utils.remote_account import java_version
|
||||||
|
|
||||||
|
|
||||||
class SslStores(object):
|
class SslStores(object):
|
||||||
def __init__(self, local_scratch_dir, logger=None):
|
def __init__(self, local_scratch_dir, logger=None):
|
||||||
|
@ -140,7 +143,7 @@ class SecurityConfig(TemplateRenderer):
|
||||||
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
|
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
|
||||||
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
|
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
|
||||||
zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
|
zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None,
|
||||||
listener_security_config=ListenerSecurityConfig()):
|
listener_security_config=ListenerSecurityConfig(), tls_version=None):
|
||||||
"""
|
"""
|
||||||
Initialize the security properties for the node and copy
|
Initialize the security properties for the node and copy
|
||||||
keystore and truststore to the remote node if the transport protocol
|
keystore and truststore to the remote node if the transport protocol
|
||||||
|
@ -186,6 +189,10 @@ class SecurityConfig(TemplateRenderer):
|
||||||
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
|
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
|
||||||
'sasl.kerberos.service.name' : 'kafka'
|
'sasl.kerberos.service.name' : 'kafka'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tls_version is not None:
|
||||||
|
self.properties.update({'tls.version' : tls_version})
|
||||||
|
|
||||||
self.properties.update(self.listener_security_config.client_listener_overrides)
|
self.properties.update(self.listener_security_config.client_listener_overrides)
|
||||||
self.jaas_override_variables = jaas_override_variables or {}
|
self.jaas_override_variables = jaas_override_variables or {}
|
||||||
|
|
||||||
|
@ -201,7 +208,8 @@ class SecurityConfig(TemplateRenderer):
|
||||||
template_props=template_props,
|
template_props=template_props,
|
||||||
static_jaas_conf=static_jaas_conf,
|
static_jaas_conf=static_jaas_conf,
|
||||||
jaas_override_variables=jaas_override_variables,
|
jaas_override_variables=jaas_override_variables,
|
||||||
listener_security_config=self.listener_security_config)
|
listener_security_config=self.listener_security_config,
|
||||||
|
tls_version=self.tls_version)
|
||||||
|
|
||||||
def enable_security_protocol(self, security_protocol):
|
def enable_security_protocol(self, security_protocol):
|
||||||
self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
|
self.has_sasl = self.has_sasl or self.is_sasl(security_protocol)
|
||||||
|
@ -259,6 +267,9 @@ class SecurityConfig(TemplateRenderer):
|
||||||
if self.has_sasl:
|
if self.has_sasl:
|
||||||
self.setup_sasl(node)
|
self.setup_sasl(node)
|
||||||
|
|
||||||
|
if java_version(node) <= 11 and self.properties.get('tls.version') == 'TLSv1.3':
|
||||||
|
self.properties.update({'tls.version': 'TLSv1.2'})
|
||||||
|
|
||||||
def setup_credentials(self, node, path, zk_connect, broker):
|
def setup_credentials(self, node, path, zk_connect, broker):
|
||||||
if broker:
|
if broker:
|
||||||
self.maybe_create_scram_credentials(node, zk_connect, path, self.interbroker_sasl_mechanism,
|
self.maybe_create_scram_credentials(node, zk_connect, path, self.interbroker_sasl_mechanism,
|
||||||
|
@ -303,6 +314,10 @@ class SecurityConfig(TemplateRenderer):
|
||||||
def security_protocol(self):
|
def security_protocol(self):
|
||||||
return self.properties['security.protocol']
|
return self.properties['security.protocol']
|
||||||
|
|
||||||
|
@property
|
||||||
|
def tls_version(self):
|
||||||
|
return self.properties.get('tls.version')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def client_sasl_mechanism(self):
|
def client_sasl_mechanism(self):
|
||||||
return self.properties['sasl.mechanism']
|
return self.properties['sasl.mechanism']
|
||||||
|
|
|
@ -125,10 +125,10 @@ class ReplicationTest(EndToEndTest):
|
||||||
broker_type="leader",
|
broker_type="leader",
|
||||||
security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
|
security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512")
|
||||||
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
|
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
|
||||||
security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"])
|
security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"])
|
||||||
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type,
|
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type,
|
||||||
client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI",
|
client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI",
|
||||||
compression_type=None, enable_idempotence=False):
|
compression_type=None, enable_idempotence=False, tls_version=None):
|
||||||
"""Replication tests.
|
"""Replication tests.
|
||||||
These tests verify that replication provides simple durability guarantees by checking that data acked by
|
These tests verify that replication provides simple durability guarantees by checking that data acked by
|
||||||
brokers is still available for consumption in the face of various failure scenarios.
|
brokers is still available for consumption in the face of various failure scenarios.
|
||||||
|
@ -149,7 +149,8 @@ class ReplicationTest(EndToEndTest):
|
||||||
security_protocol=security_protocol,
|
security_protocol=security_protocol,
|
||||||
interbroker_security_protocol=security_protocol,
|
interbroker_security_protocol=security_protocol,
|
||||||
client_sasl_mechanism=client_sasl_mechanism,
|
client_sasl_mechanism=client_sasl_mechanism,
|
||||||
interbroker_sasl_mechanism=interbroker_sasl_mechanism)
|
interbroker_sasl_mechanism=interbroker_sasl_mechanism,
|
||||||
|
tls_version=tls_version)
|
||||||
self.kafka.start()
|
self.kafka.start()
|
||||||
|
|
||||||
compression_types = None if not compression_type else [compression_type]
|
compression_types = None if not compression_type else [compression_type]
|
||||||
|
|
|
@ -23,8 +23,9 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
|
from kafkatest.utils.remote_account import java_version
|
||||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion
|
||||||
from kafkatest.services.kafka.util import java_version, new_jdk_not_supported
|
from kafkatest.services.kafka.util import new_jdk_not_supported
|
||||||
|
|
||||||
class TestUpgrade(ProduceConsumeValidateTest):
|
class TestUpgrade(ProduceConsumeValidateTest):
|
||||||
|
|
||||||
|
|
|
@ -37,3 +37,22 @@ def line_count(node, file):
|
||||||
raise Exception("Expected single line of output from wc -l")
|
raise Exception("Expected single line of output from wc -l")
|
||||||
|
|
||||||
return int(out[0].strip().split(" ")[0])
|
return int(out[0].strip().split(" ")[0])
|
||||||
|
|
||||||
|
def java_version(node):
|
||||||
|
# Determine java version on the node
|
||||||
|
version = -1
|
||||||
|
for line in node.account.ssh_capture("java -version"):
|
||||||
|
if line.find("version") != -1:
|
||||||
|
version = parse_version_str(line)
|
||||||
|
return version
|
||||||
|
|
||||||
|
def parse_version_str(line):
|
||||||
|
# Parse java version string. Examples:
|
||||||
|
#`openjdk version "11.0.5" 2019-10-15` will return 11.
|
||||||
|
#`java version "1.5.0"` will return 5.
|
||||||
|
line = line[line.find('version \"') + 9:]
|
||||||
|
dot_pos = line.find(".")
|
||||||
|
if line[:dot_pos] == "1":
|
||||||
|
return int(line[dot_pos+1:line.find(".", dot_pos+1)])
|
||||||
|
else:
|
||||||
|
return int(line[:dot_pos])
|
||||||
|
|
Loading…
Reference in New Issue