diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index f114e66f8ff..6dc0f5807c7 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.utils.Java; import org.apache.kafka.common.utils.Utils; import javax.net.ssl.KeyManagerFactory; @@ -49,11 +50,15 @@ public class SslConfigs { public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. " - + "Default setting is TLSv1.2, which is fine for most cases. " - + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 " - + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities."; + + "The default is 'TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. " + + "This value should be fine for most use cases. " + + "Allowed values in recent JVMs are 'TLSv1.2' and 'TLSv1.3'. 'TLS', 'TLSv1.1', 'SSL', 'SSLv2' and 'SSLv3' " + + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. " + + "With the default value for this config and 'ssl.enabled.protocols', clients will downgrade to 'TLSv1.2' if " + + "the server does not support 'TLSv1.3'. If this config is set to 'TLSv1.2', clients will not use 'TLSv1.3' even " + + "if it is one of the values in ssl.enabled.protocols and the server only supports 'TLSv1.3'."; - public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2"; + public static final String DEFAULT_SSL_PROTOCOL; public static final String SSL_PROVIDER_CONFIG = "ssl.provider"; public static final String SSL_PROVIDER_DOC = "The name of the security provider used for SSL connections. Default value is the default security provider of the JVM."; @@ -63,8 +68,22 @@ public class SslConfigs { + "By default all the available cipher suites are supported."; public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; - public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections."; - public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2"; + public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. " + + "The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the " + + "default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback " + + "to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most " + + "cases. Also see the config documentation for `ssl.protocol`."; + public static final String DEFAULT_SSL_ENABLED_PROTOCOLS; + + static { + if (Java.IS_JAVA11_COMPATIBLE) { + DEFAULT_SSL_PROTOCOL = "TLSv1.3"; + DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.3"; + } else { + DEFAULT_SSL_PROTOCOL = "TLSv1.2"; + DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2"; + } + } public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"; public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. " diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 0e793b07713..ac94817dc8f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -41,7 +41,6 @@ import org.junit.runners.Parameterized; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; -import javax.net.ssl.SSLServerSocketFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; @@ -578,26 +577,6 @@ public class SslTransportLayerTest { server.verifyAuthenticationMetrics(1, 2); } - @Test - public void testUnsupportedCipher() throws Exception { - String[] cipherSuites = ((SSLServerSocketFactory) SSLServerSocketFactory.getDefault()).getSupportedCipherSuites(); - if (cipherSuites != null && cipherSuites.length > 1) { - sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); - sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[0])); - sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuites[1])); - - server = createEchoServer(SecurityProtocol.SSL); - createSelector(sslClientConfigs); - - checkAuthentiationFailed("1", "TLSv1.1"); - server.verifyAuthenticationMetrics(0, 1); - - checkAuthentiationFailed("2", "TLSv1"); - server.verifyAuthenticationMetrics(0, 2); - } - } - /** Checks connection failed using the specified {@code tlsVersion}. */ private void checkAuthentiationFailed(String node, String tlsVersion) throws IOException { sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(tlsVersion)); @@ -627,7 +606,6 @@ public class SslTransportLayerTest { */ @Test public void testUnsupportedCiphers() throws Exception { - String node = "0"; SSLContext context = SSLContext.getInstance(tlsProtocol); context.init(null, null, null); String[] cipherSuites = context.getDefaultSSLParameters().getCipherSuites(); @@ -636,10 +614,8 @@ public class SslTransportLayerTest { sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); createSelector(sslClientConfigs); - InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); - selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); - NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); + checkAuthentiationFailed("1", tlsProtocol); server.verifyAuthenticationMetrics(0, 1); } @@ -1250,7 +1226,7 @@ public class SslTransportLayerTest { void run() throws IOException; } - private static class TestSslChannelBuilder extends SslChannelBuilder { + static class TestSslChannelBuilder extends SslChannelBuilder { private Integer netReadBufSizeOverride; private Integer netWriteBufSizeOverride; @@ -1361,7 +1337,7 @@ public class SslTransportLayerTest { } } - private static class ResizeableBufferSize { + static class ResizeableBufferSize { private Integer bufSizeOverride; ResizeableBufferSize(Integer bufSizeOverride) { this.bufSizeOverride = bufSizeOverride; diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java new file mode 100644 index 00000000000..81b86d4e6a1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportTls12Tls13Test.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assume.assumeTrue; + +public class SslTransportTls12Tls13Test { + private static final int BUFFER_SIZE = 4 * 1024; + private static final Time TIME = Time.SYSTEM; + + private NioEchoServer server; + private Selector selector; + private Map sslClientConfigs; + private Map sslServerConfigs; + + @Before + public void setup() throws Exception { + // Create certificates for use by client and server. Add server cert to client truststore and vice versa. + CertStores serverCertStores = new CertStores(true, "server", "localhost"); + CertStores clientCertStores = new CertStores(false, "client", "localhost"); + sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); + sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); + + LogContext logContext = new LogContext(); + ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false, logContext); + channelBuilder.configure(sslClientConfigs); + this.selector = new Selector(5000, new Metrics(), TIME, "MetricGroup", channelBuilder, logContext); + } + + @After + public void teardown() throws Exception { + if (selector != null) + this.selector.close(); + if (server != null) + this.server.close(); + } + + /** + * Tests that connections fails if TLSv1.3 enabled but cipher suite suitable only for TLSv1.2 used. + */ + @Test + public void testCiphersSuiteForTls12FailsForTls13() throws Exception { + assumeTrue(Java.IS_JAVA11_COMPATIBLE); + + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.3")); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + + checkAuthentiationFailed(); + } + + /** + * Tests that connections can't be made if server uses TLSv1.2 with custom cipher suite and client uses TLSv1.3. + */ + @Test + public void testCiphersSuiteFailForServerTls12ClientTls13() throws Exception { + assumeTrue(Java.IS_JAVA11_COMPATIBLE); + + String tls12CipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + String tls13CipherSuite = "TLS_AES_128_GCM_SHA256"; + + sslServerConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Collections.singletonList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls12CipherSuite)); + server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME); + + sslClientConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.3"); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(tls13CipherSuite)); + + checkAuthentiationFailed(); + } + + /** + * Tests that connections can be made with TLSv1.3 cipher suite. + */ + @Test + public void testCiphersSuiteForTls13() throws Exception { + assumeTrue(Java.IS_JAVA11_COMPATIBLE); + + String cipherSuite = "TLS_AES_128_GCM_SHA256"; + + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME); + + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + checkAuthenticationSucceed(); + } + + /** + * Tests that connections can be made with TLSv1.2 cipher suite. + */ + @Test + public void testCiphersSuiteForTls12() throws Exception { + String cipherSuite = "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384"; + + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, new TestSecurityConfig(sslServerConfigs), null, TIME); + + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split(","))); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Collections.singletonList(cipherSuite)); + checkAuthenticationSucceed(); + } + + /** Checks connection failed using the specified {@code tlsVersion}. */ + private void checkAuthentiationFailed() throws IOException, InterruptedException { + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.3")); + createSelector(sslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE); + + NetworkTestUtils.waitForChannelClose(selector, "0", ChannelState.State.AUTHENTICATION_FAILED); + server.verifyAuthenticationMetrics(0, 1); + } + + private void checkAuthenticationSucceed() throws IOException, InterruptedException { + createSelector(sslClientConfigs); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + selector.connect("0", addr, BUFFER_SIZE, BUFFER_SIZE); + NetworkTestUtils.waitForChannelReady(selector, "0"); + server.verifyAuthenticationMetrics(1, 0); + } + + private void createSelector(Map sslClientConfigs) { + SslTransportLayerTest.TestSslChannelBuilder channelBuilder = new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT); + channelBuilder.configureBufferSizes(null, null, null); + channelBuilder.configure(sslClientConfigs); + this.selector = new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java new file mode 100644 index 00000000000..9f930a7bf77 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for the SSL transport layer. + * Checks different versions of the protocol usage on the server and client. + */ +@RunWith(value = Parameterized.class) +public class SslVersionsTransportLayerTest { + private static final int BUFFER_SIZE = 4 * 1024; + private static final Time TIME = Time.SYSTEM; + + private final List serverProtocols; + private final List clientProtocols; + + @Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}") + public static Collection data() { + List values = new ArrayList<>(); + + values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.2")}); + + if (Java.IS_JAVA11_COMPATIBLE) { + values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.3")}); + values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.2")}); + values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.3")}); + values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")}); + values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")}); + values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")}); + values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")}); + values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.3")}); + values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.2")}); + values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")}); + values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")}); + values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.3")}); + values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.2")}); + values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")}); + values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")}); + } + return values; + } + + /** + * Be aware that you can turn on debug mode for a javax.net.ssl library with the line {@code System.setProperty("javax.net.debug", "ssl:handshake");} + * @param serverProtocols Server protocols. + * @param clientProtocols Client protocols. + */ + public SslVersionsTransportLayerTest(List serverProtocols, List clientProtocols) { + this.serverProtocols = serverProtocols; + this.clientProtocols = clientProtocols; + } + + /** + * Tests that connection success with the default TLS version. + */ + @Test + public void testTlsDefaults() throws Exception { + // Create certificates for use by client and server. Add server cert to client truststore and vice versa. + CertStores serverCertStores = new CertStores(true, "server", "localhost"); + CertStores clientCertStores = new CertStores(false, "client", "localhost"); + + Map sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores, clientProtocols); + Map sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores, serverProtocols); + + NioEchoServer server = NetworkTestUtils.createEchoServer(ListenerName.forSecurityProtocol(SecurityProtocol.SSL), + SecurityProtocol.SSL, + new TestSecurityConfig(sslServerConfigs), + null, + TIME); + Selector selector = createClientSelector(sslClientConfigs); + + String node = "0"; + selector.connect(node, new InetSocketAddress("localhost", server.port()), BUFFER_SIZE, BUFFER_SIZE); + + if (isCompatible(serverProtocols, clientProtocols)) { + NetworkTestUtils.waitForChannelReady(selector, node); + + int msgSz = 1024 * 1024; + String message = TestUtils.randomString(msgSz); + selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes()))); + while (selector.completedReceives().isEmpty()) { + selector.poll(100L); + } + int totalBytes = msgSz + 4; // including 4-byte size + server.waitForMetric("incoming-byte", totalBytes); + server.waitForMetric("outgoing-byte", totalBytes); + server.waitForMetric("request", 1); + server.waitForMetric("response", 1); + } else { + NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); + server.verifyAuthenticationMetrics(0, 1); + } + } + + /** + *

+ * The explanation of this check in the structure of the ClientHello SSL message. + * Please, take a look at the Guide, + * "Send ClientHello Message" section. + *

+ * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version + * ... + * > supported_versions: Lists which versions of TLS the client supports. In particular, if the client + * > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension + * > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the + * > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3. + *

+ * + * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3. + * + * @param serverProtocols Server protocols. Expected to be non empty. + * @param clientProtocols Client protocols. Expected to be non empty. + * @return {@code true} if client should be able to connect to the server. + */ + private boolean isCompatible(List serverProtocols, List clientProtocols) { + assertNotNull(serverProtocols); + assertFalse(serverProtocols.isEmpty()); + assertNotNull(clientProtocols); + assertFalse(clientProtocols.isEmpty()); + + return serverProtocols.contains(clientProtocols.get(0)) || + (clientProtocols.get(0).equals("TLSv1.3") && !Collections.disjoint(serverProtocols, clientProtocols)); + } + + private static Map getTrustingConfig(CertStores certStores, CertStores peerCertStores, List tlsProtocols) { + Map configs = certStores.getTrustingConfig(peerCertStores); + configs.putAll(sslConfig(tlsProtocols)); + return configs; + } + + private static Map sslConfig(List tlsProtocols) { + Map sslConfig = new HashMap<>(); + sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0)); + sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols); + return sslConfig; + } + + private Selector createClientSelector(Map sslClientConfigs) { + SslTransportLayerTest.TestSslChannelBuilder channelBuilder = + new SslTransportLayerTest.TestSslChannelBuilder(Mode.CLIENT); + channelBuilder.configureBufferSizes(null, null, null); + channelBuilder.configure(sslClientConfigs); + return new Selector(100 * 5000, new Metrics(), TIME, "MetricGroup", channelBuilder, new LogContext()); + } +} diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 8f1091ed133..10e4d28af38 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -170,7 +170,7 @@ class SocketServerTest { } private def sslClientSocket(port: Int): Socket = { - val sslContext = SSLContext.getInstance("TLSv1.2") + val sslContext = SSLContext.getInstance(TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS) sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom()) val socketFactory = sslContext.getSocketFactory val socket = socketFactory.createSocket("localhost", port) diff --git a/docs/upgrade.html b/docs/upgrade.html index 103fb99feaf..be1e342b675 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -23,7 +23,12 @@

  • Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application scalability using exactly-once guarantees - (cf. KIP-447
  • + (cf. KIP-447) + +
  • TLSv1.3 has been enabled by default for Java 11 or newer. The client and server will negotiate TLSv1.3 if + both support it and fallback to TLSv1.2 otherwise. See + KIP-573 for more details. +
Notable changes in 2.5.0
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py b/tests/kafkatest/benchmarks/core/benchmark_test.py index 2b4ff87bb35..6bab304f303 100644 --- a/tests/kafkatest/benchmarks/core/benchmark_test.py +++ b/tests/kafkatest/benchmarks/core/benchmark_test.py @@ -55,12 +55,12 @@ class Benchmark(Test): def setUp(self): self.zk.start() - def start_kafka(self, security_protocol, interbroker_security_protocol, version): + def start_kafka(self, security_protocol, interbroker_security_protocol, version, tls_version=None): self.kafka = KafkaService( self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, - version=version) + version=version, tls_version=tls_version) self.kafka.log_level = "INFO" # We don't DEBUG logging here self.kafka.start() @@ -68,11 +68,12 @@ class Benchmark(Test): @parametrize(acks=1, topic=TOPIC_REP_ONE) @parametrize(acks=1, topic=TOPIC_REP_THREE) @parametrize(acks=-1, topic=TOPIC_REP_THREE) - @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT', 'SSL']) + @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['SSL'], tls_version=['TLSv1.2', 'TLSv1.3']) + @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 10000, 100000], compression_type=["none", "snappy"], security_protocol=['PLAINTEXT']) @cluster(num_nodes=7) @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3) def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE, - compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH), + compression_type="none", security_protocol='PLAINTEXT', tls_version=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): """ Setup: 1 node zk + 3 node kafka cluster @@ -85,7 +86,7 @@ class Benchmark(Test): client_version = KafkaVersion(client_version) broker_version = KafkaVersion(broker_version) self.validate_versions(client_version, broker_version) - self.start_kafka(security_protocol, security_protocol, broker_version) + self.start_kafka(security_protocol, security_protocol, broker_version, tls_version) # Always generate the same total amount of data nrecords = int(self.target_data_size / message_size) @@ -101,9 +102,10 @@ class Benchmark(Test): return compute_aggregate_throughput(self.producer) @cluster(num_nodes=5) - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) - def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT', + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"]) + def test_long_term_producer_throughput(self, compression_type="none", + security_protocol='PLAINTEXT', tls_version=None, interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): """ @@ -119,7 +121,7 @@ class Benchmark(Test): self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, @@ -157,11 +159,11 @@ class Benchmark(Test): return data @cluster(num_nodes=5) - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"]) @cluster(num_nodes=6) @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"]) - def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", + def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): """ @@ -178,7 +180,7 @@ class Benchmark(Test): self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, @@ -189,9 +191,9 @@ class Benchmark(Test): return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) @cluster(num_nodes=6) - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) - def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"]) + def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, interbroker_security_protocol=None, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): """ @@ -207,7 +209,7 @@ class Benchmark(Test): self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) num_records = 10 * 1000 * 1000 # 10e6 self.producer = ProducerPerformanceService( @@ -236,9 +238,9 @@ class Benchmark(Test): return data @cluster(num_nodes=6) - @parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT') - @matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"]) - def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", + @matrix(security_protocol=['SSL'], interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'], compression_type=["none", "snappy"]) + @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", "snappy"]) + def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT", tls_version=None, interbroker_security_protocol=None, num_consumers=1, client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)): """ @@ -250,7 +252,7 @@ class Benchmark(Test): self.validate_versions(client_version, broker_version) if interbroker_security_protocol is None: interbroker_security_protocol = security_protocol - self.start_kafka(security_protocol, interbroker_security_protocol, broker_version) + self.start_kafka(security_protocol, interbroker_security_protocol, broker_version, tls_version) num_records = 10 * 1000 * 1000 # 10e6 # seed kafka w/messages diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ec44fc9906d..58bd47a7bf8 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import json import os.path import re @@ -31,7 +30,7 @@ from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.security.minikdc import MiniKdc from kafkatest.services.security.listener_security_config import ListenerSecurityConfig from kafkatest.services.security.security_config import SecurityConfig -from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2 +from kafkatest.version import DEV_BRANCH from kafkatest.services.kafka.util import fix_opts_for_new_jvm @@ -95,17 +94,20 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): "collect_default": True} } - def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, + def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, + interbroker_security_protocol=SecurityConfig.PLAINTEXT, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None, zk_client_secure=False, - listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""): + listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, + extra_kafka_opts="", tls_version=None): """ :param context: test context :param ZookeeperService zk: :param dict topics: which topics to create automatically :param str security_protocol: security protocol for clients to use + :param str tls_version: version of the TLS protocol. :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication :param str client_sasl_mechanism: sasl mechanism for clients to use :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication @@ -129,6 +131,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.zk = zk self.security_protocol = security_protocol + self.tls_version = tls_version self.client_sasl_mechanism = client_sasl_mechanism self.topics = topics self.minikdc = None @@ -215,7 +218,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): zk_sasl=self.zk.zk_sasl, zk_tls=self.zk_client_secure, client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism, - listener_security_config=self.listener_security_config) + listener_security_config=self.listener_security_config, + tls_version=self.tls_version) for port in self.port_mappings.values(): if port.open: config.enable_security_protocol(port.security_protocol) @@ -354,15 +358,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def start_node(self, node, timeout_sec=60): node.account.mkdirs(KafkaService.PERSISTENT_ROOT) + + self.security_config.setup_node(node) + self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True) + prop_file = self.prop_file(node) self.logger.info("kafka.properties:") self.logger.info(prop_file) node.account.create_file(KafkaService.CONFIG_FILE, prop_file) node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR)) - self.security_config.setup_node(node) - self.security_config.setup_credentials(node, self.path, self.zk_connect_setting(), broker=True) - cmd = self.start_cmd(node) self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) with node.account.monitor_log(KafkaService.STDOUT_STDERR_CAPTURE) as monitor: diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 9795eacbc39..0c028aac707 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -44,7 +44,10 @@ listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }} {% endif %} {% endfor %} {% endif %} - +{% if security_config.tls_version is not none %} +ssl.enabled.protocols={{ security_config.tls_version }} +ssl.protocol={{ security_config.tls_version }} +{% endif %} ssl.keystore.location=/mnt/security/test.keystore.jks ssl.keystore.password=test-ks-passwd ssl.key.password=test-ks-passwd diff --git a/tests/kafkatest/services/kafka/util.py b/tests/kafkatest/services/kafka/util.py index 92d59c74d16..8782ebe7b42 100644 --- a/tests/kafkatest/services/kafka/util.py +++ b/tests/kafkatest/services/kafka/util.py @@ -13,16 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os.path - from collections import namedtuple -from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0 -from kafkatest.directory_layout.kafka_path import create_path_resolver + +from kafkatest.utils.remote_account import java_version +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0 TopicPartition = namedtuple('TopicPartition', ['topic', 'partition']) new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9), str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0)]) + def fix_opts_for_new_jvm(node): # Startup scripts for early versions of Kafka contains options # that not supported on latest versions of JVM like -XX:+PrintGCDateStamps or -XX:UseParNewGC. @@ -38,21 +38,4 @@ def fix_opts_for_new_jvm(node): cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true\"; " return cmd -def java_version(node): - # Determine java version on the node - version = -1 - for line in node.account.ssh_capture("java -version"): - if line.find("version") != -1: - version = parse_version_str(line) - return version -def parse_version_str(line): - # Parse java version string. Examples: - #`openjdk version "11.0.5" 2019-10-15` will return 11. - #`java version "1.5.0"` will return 5. - line = line[line.find('version \"') + 9:] - dot_pos = line.find(".") - if line[:dot_pos] == "1": - return int(line[dot_pos+1:line.find(".", dot_pos+1)]) - else: - return int(line[:dot_pos]) diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index 7ada9c55e82..1212a7d5454 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -28,14 +28,14 @@ class KafkaLog4jAppender(KafkaPathResolverMixin, BackgroundThreadService): "collect_default": False} } - def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT"): + def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, security_protocol="PLAINTEXT", tls_version=None): super(KafkaLog4jAppender, self).__init__(context, num_nodes) self.kafka = kafka self.topic = topic self.max_messages = max_messages self.security_protocol = security_protocol - self.security_config = SecurityConfig(self.context, security_protocol) + self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version) self.stop_timeout_sec = 30 for node in self.nodes: diff --git a/tests/kafkatest/services/log_compaction_tester.py b/tests/kafkatest/services/log_compaction_tester.py index 4a19650ff2e..cc6bf4fc296 100644 --- a/tests/kafkatest/services/log_compaction_tester.py +++ b/tests/kafkatest/services/log_compaction_tester.py @@ -33,12 +33,13 @@ class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService): "collect_default": True} } - def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30): + def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30, tls_version=None): super(LogCompactionTester, self).__init__(context, 1) self.kafka = kafka self.security_protocol = security_protocol - self.security_config = SecurityConfig(self.context, security_protocol) + self.tls_version = tls_version + self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version) self.stop_timeout_sec = stop_timeout_sec self.log_compaction_completed = False diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py index 8751797d6cf..13a1288001f 100644 --- a/tests/kafkatest/services/replica_verification_tool.py +++ b/tests/kafkatest/services/replica_verification_tool.py @@ -29,14 +29,16 @@ class ReplicaVerificationTool(KafkaPathResolverMixin, BackgroundThreadService): "collect_default": False} } - def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", stop_timeout_sec=30): + def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT", + stop_timeout_sec=30, tls_version=None): super(ReplicaVerificationTool, self).__init__(context, num_nodes) self.kafka = kafka self.topic = topic self.report_interval_ms = report_interval_ms self.security_protocol = security_protocol - self.security_config = SecurityConfig(self.context, security_protocol) + self.tls_version = tls_version + self.security_config = SecurityConfig(self.context, security_protocol, tls_version=tls_version) self.partition_lag = {} self.stop_timeout_sec = stop_timeout_sec diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index f70d7d282ef..2fb4f47bf02 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -19,10 +19,13 @@ import subprocess from tempfile import mkdtemp from shutil import rmtree from ducktape.template import TemplateRenderer + from kafkatest.services.security.minikdc import MiniKdc from kafkatest.services.security.listener_security_config import ListenerSecurityConfig import itertools +from kafkatest.utils.remote_account import java_version + class SslStores(object): def __init__(self, local_scratch_dir, logger=None): @@ -140,7 +143,7 @@ class SecurityConfig(TemplateRenderer): def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, zk_sasl=False, zk_tls=False, template_props="", static_jaas_conf=True, jaas_override_variables=None, - listener_security_config=ListenerSecurityConfig()): + listener_security_config=ListenerSecurityConfig(), tls_version=None): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -186,6 +189,10 @@ class SecurityConfig(TemplateRenderer): 'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism, 'sasl.kerberos.service.name' : 'kafka' } + + if tls_version is not None: + self.properties.update({'tls.version' : tls_version}) + self.properties.update(self.listener_security_config.client_listener_overrides) self.jaas_override_variables = jaas_override_variables or {} @@ -201,7 +208,8 @@ class SecurityConfig(TemplateRenderer): template_props=template_props, static_jaas_conf=static_jaas_conf, jaas_override_variables=jaas_override_variables, - listener_security_config=self.listener_security_config) + listener_security_config=self.listener_security_config, + tls_version=self.tls_version) def enable_security_protocol(self, security_protocol): self.has_sasl = self.has_sasl or self.is_sasl(security_protocol) @@ -259,6 +267,9 @@ class SecurityConfig(TemplateRenderer): if self.has_sasl: self.setup_sasl(node) + if java_version(node) <= 11 and self.properties.get('tls.version') == 'TLSv1.3': + self.properties.update({'tls.version': 'TLSv1.2'}) + def setup_credentials(self, node, path, zk_connect, broker): if broker: self.maybe_create_scram_credentials(node, zk_connect, path, self.interbroker_sasl_mechanism, @@ -303,6 +314,10 @@ class SecurityConfig(TemplateRenderer): def security_protocol(self): return self.properties['security.protocol'] + @property + def tls_version(self): + return self.properties.get('tls.version') + @property def client_sasl_mechanism(self): return self.properties['sasl.mechanism'] diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index f5c64222bdb..01ef34f3183 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -125,10 +125,10 @@ class ReplicationTest(EndToEndTest): broker_type="leader", security_protocol="SASL_SSL", client_sasl_mechanism="SCRAM-SHA-256", interbroker_sasl_mechanism="SCRAM-SHA-512") @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], - security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"]) + security_protocol=["PLAINTEXT"], broker_type=["leader"], compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"]) def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI", - compression_type=None, enable_idempotence=False): + compression_type=None, enable_idempotence=False, tls_version=None): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. @@ -149,7 +149,8 @@ class ReplicationTest(EndToEndTest): security_protocol=security_protocol, interbroker_security_protocol=security_protocol, client_sasl_mechanism=client_sasl_mechanism, - interbroker_sasl_mechanism=interbroker_sasl_mechanism) + interbroker_sasl_mechanism=interbroker_sasl_mechanism, + tls_version=tls_version) self.kafka.start() compression_types = None if not compression_type else [compression_type] diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index f5ec08c67f9..6c0e447a8b3 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -23,8 +23,9 @@ from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int +from kafkatest.utils.remote_account import java_version from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion -from kafkatest.services.kafka.util import java_version, new_jdk_not_supported +from kafkatest.services.kafka.util import new_jdk_not_supported class TestUpgrade(ProduceConsumeValidateTest): diff --git a/tests/kafkatest/utils/remote_account.py b/tests/kafkatest/utils/remote_account.py index d6ea72f6646..b572e06fe30 100644 --- a/tests/kafkatest/utils/remote_account.py +++ b/tests/kafkatest/utils/remote_account.py @@ -37,3 +37,22 @@ def line_count(node, file): raise Exception("Expected single line of output from wc -l") return int(out[0].strip().split(" ")[0]) + +def java_version(node): + # Determine java version on the node + version = -1 + for line in node.account.ssh_capture("java -version"): + if line.find("version") != -1: + version = parse_version_str(line) + return version + +def parse_version_str(line): + # Parse java version string. Examples: + #`openjdk version "11.0.5" 2019-10-15` will return 11. + #`java version "1.5.0"` will return 5. + line = line[line.find('version \"') + 9:] + dot_pos = line.find(".") + if line[:dot_pos] == "1": + return int(line[dot_pos+1:line.find(".", dot_pos+1)]) + else: + return int(line[:dot_pos])