KAFKA-5674; Reduce max.connections.per.ip minimum to 0 (#3610)

By allowing `max.connections.per.ip` to be 0, Kafka can support IP-based filtering using `max.connections.per.ip.overrides`.
This commit is contained in:
Viktor Somogyi 2018-04-02 08:45:15 -07:00 committed by Jason Gustafson
parent 8662a022c4
commit 1dc30272e1
3 changed files with 42 additions and 4 deletions

View File

@ -782,7 +782,7 @@ object KafkaConfig {
.define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc) .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
.define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
.define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
.define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(1), MEDIUM, MaxConnectionsPerIpDoc) .define(MaxConnectionsPerIpProp, INT, Defaults.MaxConnectionsPerIp, atLeast(0), MEDIUM, MaxConnectionsPerIpDoc)
.define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc) .define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc) .define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)

View File

@ -20,8 +20,8 @@ package kafka.network
import java.io._ import java.io._
import java.net._ import java.net._
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.{HashMap, Properties, Random}
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import java.util.{HashMap, Random}
import javax.net.ssl._ import javax.net.ssl._
import com.yammer.metrics.core.{Gauge, Meter} import com.yammer.metrics.core.{Gauge, Meter}
@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite {
channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString))) channel.sendResponse(new RequestChannel.Response(request, Some(send), SendAction, Some(request.header.toString)))
} }
def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol))) val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
sockets += socket sockets += socket
socket socket
} }
@ -443,6 +443,43 @@ class SocketServerTest extends JUnitSuite {
assertNotNull(request) assertNotNull(request)
} }
@Test
def testZeroMaxConnectionsPerIp() {
val newProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
newProps.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
newProps.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "%s:%s".format("127.0.0.1", "5"))
val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(), Time.SYSTEM, credentialProvider)
try {
server.startup()
// make the maximum allowable number of connections
val conns = (0 until 5).map(_ => connect(server))
// now try one more (should fail)
val conn = connect(server)
conn.setSoTimeout(3000)
assertEquals(-1, conn.getInputStream.read())
conn.close()
// it should succeed after closing one connection
val address = conns.head.getInetAddress
conns.head.close()
TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length,
"Failed to decrement connection count after close")
val conn2 = connect(server)
val serializedBytes = producerRequestBytes()
sendRequest(conn2, serializedBytes)
val request = server.requestChannel.receiveRequest(2000)
assertNotNull(request)
// now try to connect from the external facing interface, which should fail
val conn3 = connect(s = server, localAddr = InetAddress.getLocalHost)
conn3.setSoTimeout(3000)
assertEquals(-1, conn3.getInputStream.read())
conn3.close()
} finally {
shutdownServerAndMetrics(server)
}
}
@Test @Test
def testMaxConnectionsPerIpOverrides() { def testMaxConnectionsPerIpOverrides() {
val overrideNum = server.config.maxConnectionsPerIp + 1 val overrideNum = server.config.maxConnectionsPerIp + 1

View File

@ -67,6 +67,7 @@
<h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in 1.2.0</a></h5> <h5><a id="upgrade_120_notable" href="#upgrade_120_notable">Notable changes in 1.2.0</a></h5>
<ul> <ul>
<li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config <code>offsets.retention.minutes</code> to 1440.</li> <li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config <code>offsets.retention.minutes</code> to 1440.</li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip minimum</code> to zero and therefore allows IP-based filtering of inbound connections.</li>
</ul> </ul>
<h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol Versions</a></h5> <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New Protocol Versions</a></h5>