KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests

this is the initial implementation.

Author: radai-rosenblatt <radai.rosenblatt@gmail.com>

Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>, Jun Rao <junrao@gmail.com>

Closes #2330 from radai-rosenblatt/broker-memory-pool-with-muting
This commit is contained in:
radai-rosenblatt 2017-07-26 08:19:56 +02:00 committed by Jun Rao
parent f15cdbc91b
commit 47ee8e954d
38 changed files with 1446 additions and 62 deletions

View File

@ -43,6 +43,7 @@
<allow pkg="org.apache.kafka.common.security" />
<allow pkg="org.apache.kafka.common.utils" />
<allow pkg="org.apache.kafka.common.errors" exact-match="true" />
<allow pkg="org.apache.kafka.common.memory" />
<subpackage name="common">
<disallow pkg="org.apache.kafka.clients" />
@ -67,6 +68,10 @@
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="memory">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
<subpackage name="network">
<allow pkg="org.apache.kafka.common.security.auth" />
<allow pkg="org.apache.kafka.common.protocol" />

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.memory;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* An extension of SimpleMemoryPool that tracks allocated buffers and logs an error when they "leak"
* (when they are garbage-collected without having been release()ed).
* THIS IMPLEMENTATION IS A DEVELOPMENT/DEBUGGING AID AND IS NOT MEANT PRO PRODUCTION USE.
*/
public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable {
private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>();
//serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them
//to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated
private final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>();
private final GarbageCollectionListener gcListener = new GarbageCollectionListener();
private final Thread gcListenerThread;
private volatile boolean alive = true;
public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
this.alive = true;
this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");
this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown
this.gcListenerThread.start();
}
@Override
protected void bufferToBeReturned(ByteBuffer justAllocated) {
BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers);
BufferMetadata metadata = new BufferMetadata(justAllocated.capacity());
if (buffersInFlight.put(ref, metadata) != null)
//this is a bug. it means either 2 different co-existing buffers got
//the same identity or we failed to register a released/GC'ed buffer
throw new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!");
log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode);
}
@Override
protected void bufferToBeReleased(ByteBuffer justReleased) {
BufferReference ref = new BufferReference(justReleased); //used ro lookup only
BufferMetadata metadata = buffersInFlight.remove(ref);
if (metadata == null)
//its impossible for the buffer to have already been GC'ed (because we have a hard ref to it
//in the function arg) so this means either a double free or not our buffer.
throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool");
if (metadata.sizeBytes != justReleased.capacity()) {
//this is a bug
throw new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes);
}
log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode);
}
@Override
public void close() throws Exception {
alive = false;
gcListenerThread.interrupt();
}
private class GarbageCollectionListener implements Runnable {
@Override
public void run() {
while (alive) {
try {
BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocks
ref.clear();
//this cannot race with a release() call because an object is either reachable or not,
//release() can only happen before its GC'ed, and enqueue can only happen after.
//if the ref was enqueued it must then not have been released
BufferMetadata metadata = buffersInFlight.remove(ref);
if (metadata == null) {
//it can happen rarely that the buffer was release()ed properly (so no metadata) and yet
//the reference object to it remains reachable for a short period of time after release()
//and hence gets enqueued. this is because we keep refs in a ConcurrentHashMap which cleans
//up keys lazily.
continue;
}
availableMemory.addAndGet(metadata.sizeBytes);
log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode);
} catch (InterruptedException e) {
log.debug("interrupted", e);
//ignore, we're a daemon thread
}
}
log.info("GC listener shutting down");
}
}
private static final class BufferMetadata {
private final int sizeBytes;
private BufferMetadata(int sizeBytes) {
this.sizeBytes = sizeBytes;
}
}
private static final class BufferReference extends WeakReference<ByteBuffer> {
private final int hashCode;
private BufferReference(ByteBuffer referent) { //used for lookup purposes only - no queue required.
this(referent, null);
}
private BufferReference(ByteBuffer referent, ReferenceQueue<? super ByteBuffer> q) {
super(referent, q);
hashCode = System.identityHashCode(referent);
}
@Override
public boolean equals(Object o) {
if (this == o) { //this is important to find leaked buffers (by ref identity)
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BufferReference that = (BufferReference) o;
if (hashCode != that.hashCode) {
return false;
}
ByteBuffer thisBuf = get();
if (thisBuf == null) {
//our buffer has already been GC'ed, yet "that" is not us. so not same buffer
return false;
}
ByteBuffer thatBuf = that.get();
return thisBuf == thatBuf;
}
@Override
public int hashCode() {
return hashCode;
}
}
@Override
public String toString() {
long allocated = sizeBytes - availableMemory.get();
return "GarbageCollectedMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used in " + buffersInFlight.size() + " buffers}";
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.memory;
import java.nio.ByteBuffer;
/**
* A common memory pool interface for non-blocking pools.
* Every buffer returned from {@link #tryAllocate(int)} must always be {@link #release(ByteBuffer) released}.
*/
public interface MemoryPool {
MemoryPool NONE = new MemoryPool() {
@Override
public ByteBuffer tryAllocate(int sizeBytes) {
return ByteBuffer.allocate(sizeBytes);
}
@Override
public void release(ByteBuffer previouslyAllocated) {
//nop
}
@Override
public long size() {
return Long.MAX_VALUE;
}
@Override
public long availableMemory() {
return Long.MAX_VALUE;
}
@Override
public boolean isOutOfMemory() {
return false;
}
@Override
public String toString() {
return "NONE";
}
};
/**
* Tries to acquire a ByteBuffer of the specified size
* @param sizeBytes size required
* @return a ByteBuffer (which later needs to be release()ed), or null if no memory available.
* the buffer will be of the exact size requested, even if backed by a larger chunk of memory
*/
ByteBuffer tryAllocate(int sizeBytes);
/**
* Returns a previously allocated buffer to the pool.
* @param previouslyAllocated a buffer previously returned from tryAllocate()
*/
void release(ByteBuffer previouslyAllocated);
/**
* Returns the total size of this pool
* @return total size, in bytes
*/
long size();
/**
* Returns the amount of memory available for allocation by this pool.
* NOTE: result may be negative (pools may over allocate to avoid starvation issues)
* @return bytes available
*/
long availableMemory();
/**
* Returns true if the pool cannot currently allocate any more buffers
* - meaning total outstanding buffers meets or exceeds pool size and
* some would need to be released before further allocations are possible.
*
* This is equivalent to availableMemory() <= 0
* @return true if out of memory
*/
boolean isOutOfMemory();
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.memory;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a simple pool implementation. this implementation just provides a limit on the total outstanding memory.
* any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s)
*/
public class SimpleMemoryPool implements MemoryPool {
protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendly
protected final long sizeBytes;
protected final boolean strict;
protected final AtomicLong availableMemory;
protected final int maxSingleAllocationSize;
protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds
protected volatile Sensor oomTimeSensor;
public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) {
if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)
throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."
+ "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");
this.sizeBytes = sizeInBytes;
this.strict = strict;
this.availableMemory = new AtomicLong(sizeInBytes);
this.maxSingleAllocationSize = maxSingleAllocationBytes;
this.oomTimeSensor = oomPeriodSensor;
}
@Override
public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes < 1)
throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
if (sizeBytes > maxSingleAllocationSize)
throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
long available;
boolean success = false;
//in strict mode we will only allocate memory if we have at least the size required.
//in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
//can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
long threshold = strict ? sizeBytes : 1;
while ((available = availableMemory.get()) >= threshold) {
success = availableMemory.compareAndSet(available, available - sizeBytes);
if (success)
break;
}
if (success) {
maybeRecordEndOfDrySpell();
} else {
if (oomTimeSensor != null) {
startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
}
log.trace("refused to allocate buffer of size {}", sizeBytes);
return null;
}
ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
bufferToBeReturned(allocated);
return allocated;
}
@Override
public void release(ByteBuffer previouslyAllocated) {
if (previouslyAllocated == null)
throw new IllegalArgumentException("provided null buffer");
bufferToBeReleased(previouslyAllocated);
availableMemory.addAndGet(previouslyAllocated.capacity());
maybeRecordEndOfDrySpell();
}
@Override
public long size() {
return sizeBytes;
}
@Override
public long availableMemory() {
return availableMemory.get();
}
@Override
public boolean isOutOfMemory() {
return availableMemory.get() <= 0;
}
//allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
protected void bufferToBeReturned(ByteBuffer justAllocated) {
log.trace("allocated buffer of size {} ", justAllocated.capacity());
}
//allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed.
protected void bufferToBeReleased(ByteBuffer justReleased) {
log.trace("released buffer of size {}", justReleased.capacity());
}
@Override
public String toString() {
long allocated = sizeBytes - availableMemory.get();
return "SimpleMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used}";
}
protected void maybeRecordEndOfDrySpell() {
if (oomTimeSensor != null) {
long startOfDrySpell = startOfNoMemPeriod.getAndSet(0);
if (startOfDrySpell != 0) {
//how long were we refusing allocation requests for
oomTimeSensor.record((System.nanoTime() - startOfDrySpell) / 1000000.0); //fractional (double) millis
}
}
}
}

View File

@ -20,6 +20,8 @@ import java.util.Map;
import java.nio.channels.SelectionKey;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.memory.MemoryPool;
/**
* A ChannelBuilder interface to build Channel based on configs
@ -36,10 +38,11 @@ public interface ChannelBuilder extends AutoCloseable {
* returns a Channel with TransportLayer and Authenticator configured.
* @param id channel id
* @param key SelectionKey
* @param maxReceiveSize
* @param maxReceiveSize max size of a single receive buffer to allocate
* @param memoryPool memory pool from which to allocate buffers, or null for none
* @return KafkaChannel
*/
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException;
/**

View File

@ -25,6 +25,8 @@ import java.nio.channels.SelectionKey;
import java.security.Principal;
import java.util.Objects;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.utils.Utils;
public class KafkaChannel {
@ -35,6 +37,7 @@ public class KafkaChannel {
// The values are read and reset after each response is sent.
private long networkThreadTimeNanos;
private final int maxReceiveSize;
private final MemoryPool memoryPool;
private NetworkReceive receive;
private Send send;
// Track connection and mute state of channels to enable outstanding requests on channels to be
@ -43,12 +46,13 @@ public class KafkaChannel {
private boolean muted;
private ChannelState state;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
this.id = id;
this.transportLayer = transportLayer;
this.authenticator = authenticator;
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.memoryPool = memoryPool;
this.disconnected = false;
this.muted = false;
this.state = ChannelState.NOT_CONNECTED;
@ -56,7 +60,7 @@ public class KafkaChannel {
public void close() throws IOException {
this.disconnected = true;
Utils.closeAll(transportLayer, authenticator);
Utils.closeAll(transportLayer, authenticator, receive);
}
/**
@ -106,13 +110,16 @@ public class KafkaChannel {
return id;
}
public void mute() {
/**
* externally muting a channel should be done via selector to ensure proper state handling
*/
void mute() {
if (!disconnected)
transportLayer.removeInterestOps(SelectionKey.OP_READ);
muted = true;
}
public void unmute() {
void unmute() {
if (!disconnected)
transportLayer.addInterestOps(SelectionKey.OP_READ);
muted = false;
@ -125,6 +132,17 @@ public class KafkaChannel {
return muted;
}
public boolean isInMutableState() {
//some requests do not require memory, so if we do not know what the current (or future) request is
//(receive == null) we dont mute. we also dont mute if whatever memory required has already been
//successfully allocated (if none is required for the currently-being-read request
//receive.memoryAllocated() is expected to return true)
if (receive == null || receive.memoryAllocated())
return false;
//also cannot mute if underlying transport is not in the ready state
return transportLayer.ready();
}
public boolean ready() {
return transportLayer.ready() && authenticator.complete();
}
@ -161,7 +179,7 @@ public class KafkaChannel {
NetworkReceive result = null;
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
receive(receive);
@ -169,6 +187,9 @@ public class KafkaChannel {
receive.payload().rewind();
result = receive;
receive = null;
} else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
//pool must be out of memory, mute ourselves.
mute();
}
return result;
}
@ -210,4 +231,28 @@ public class KafkaChannel {
return send.completed();
}
/**
* @return true if underlying transport has bytes remaining to be read from any underlying intermediate buffers.
*/
public boolean hasBytesBuffered() {
return transportLayer.hasBytesBuffered();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaChannel that = (KafkaChannel) o;
return Objects.equals(id, that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@ -21,6 +21,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import org.apache.kafka.common.memory.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
@ -29,10 +32,14 @@ public class NetworkReceive implements Receive {
public final static String UNKNOWN_SOURCE = "";
public final static int UNLIMITED = -1;
private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
private final ByteBuffer size;
private final int maxSize;
private final MemoryPool memoryPool;
private int requestedBufferSize = -1;
private ByteBuffer buffer;
@ -41,6 +48,7 @@ public class NetworkReceive implements Receive {
this.buffer = buffer;
this.size = null;
this.maxSize = UNLIMITED;
this.memoryPool = MemoryPool.NONE;
}
public NetworkReceive(String source) {
@ -48,6 +56,7 @@ public class NetworkReceive implements Receive {
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = UNLIMITED;
this.memoryPool = MemoryPool.NONE;
}
public NetworkReceive(int maxSize, String source) {
@ -55,6 +64,15 @@ public class NetworkReceive implements Receive {
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
this.memoryPool = MemoryPool.NONE;
}
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
public NetworkReceive() {
@ -68,13 +86,32 @@ public class NetworkReceive implements Receive {
@Override
public boolean complete() {
return !size.hasRemaining() && !buffer.hasRemaining();
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
public long readFrom(ScatteringByteChannel channel) throws IOException {
return readFromReadableChannel(channel);
}
@Override
public boolean requiredMemoryAmountKnown() {
return requestedBufferSize != -1;
}
@Override
public boolean memoryAllocated() {
return buffer != null;
}
@Override
public void close() throws IOException {
if (buffer != null && buffer != EMPTY_BUFFER) {
memoryPool.release(buffer);
buffer = null;
}
}
// Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
// See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
// This can go away after we get rid of BlockingChannel
@ -93,10 +130,17 @@ public class NetworkReceive implements Receive {
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
this.buffer = ByteBuffer.allocate(receiveSize);
requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import java.util.Map;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.KafkaException;
@ -39,12 +40,13 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
}
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.warn("Failed to create channel due to ", e);
throw new KafkaException(e);

View File

@ -213,6 +213,11 @@ public class PlaintextTransportLayer implements TransportLayer {
return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
}
@Override
public boolean hasBytesBuffered() {
return false;
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);

View File

@ -16,13 +16,14 @@
*/
package org.apache.kafka.common.network;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ScatteringByteChannel;
/**
* This interface models the in-progress reading of data from a channel to a source identified by an integer id
*/
public interface Receive {
public interface Receive extends Closeable {
/**
* The numeric id of the source from which we are receiving data.
@ -42,4 +43,13 @@ public interface Receive {
*/
long readFrom(ScatteringByteChannel channel) throws IOException;
/**
* Do we know yet how much memory we require to fully read this
*/
boolean requiredMemoryAmountKnown();
/**
* Has the underlying memory required to complete reading been allocated yet?
*/
boolean memoryAllocated();
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.authenticator.CredentialCache;
@ -100,7 +101,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SocketChannel socketChannel = (SocketChannel) key.channel();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
@ -114,7 +116,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
// Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
authenticator.configure(transportLayer, null, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);

View File

@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
@ -38,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
@ -87,11 +89,14 @@ public class Selector implements Selectable, AutoCloseable {
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
private boolean outOfMemory;
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
private Set<SelectionKey> keysWithBufferedRead;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
@ -101,6 +106,11 @@ public class Selector implements Selectable, AutoCloseable {
private final int maxReceiveSize;
private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
private final MemoryPool memoryPool;
private final long lowMemThreshold;
//indicates if the previous call to poll was able to make progress in reading already-buffered data.
//this is used to prevent tight loops when memory is not available to read any more data
private boolean madeReadProgressLastPoll = true;
/**
* Create a new nioSelector
@ -122,7 +132,8 @@ public class Selector implements Selectable, AutoCloseable {
Map<String, String> metricTags,
boolean metricsPerConnection,
boolean recordTimePerConnection,
ChannelBuilder channelBuilder) {
ChannelBuilder channelBuilder,
MemoryPool memoryPool) {
try {
this.nioSelector = java.nio.channels.Selector.open();
} catch (IOException e) {
@ -131,11 +142,14 @@ public class Selector implements Selectable, AutoCloseable {
this.maxReceiveSize = maxReceiveSize;
this.time = time;
this.channels = new HashMap<>();
this.explicitlyMutedChannels = new HashSet<>();
this.outOfMemory = false;
this.completedSends = new ArrayList<>();
this.completedReceives = new ArrayList<>();
this.stagedReceives = new HashMap<>();
this.immediatelyConnectedKeys = new HashSet<>();
this.closingChannels = new HashMap<>();
this.keysWithBufferedRead = new HashSet<>();
this.connected = new ArrayList<>();
this.disconnected = new HashMap<>();
this.failedSends = new ArrayList<>();
@ -143,6 +157,8 @@ public class Selector implements Selectable, AutoCloseable {
this.channelBuilder = channelBuilder;
this.recordTimePerConnection = recordTimePerConnection;
this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
this.memoryPool = memoryPool;
this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
}
public Selector(int maxReceiveSize,
@ -153,7 +169,7 @@ public class Selector implements Selectable, AutoCloseable {
Map<String, String> metricTags,
boolean metricsPerConnection,
ChannelBuilder channelBuilder) {
this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE);
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
@ -200,7 +216,7 @@ public class Selector implements Selectable, AutoCloseable {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
KafkaChannel channel;
try {
channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
} catch (Exception e) {
try {
socketChannel.close();
@ -227,7 +243,7 @@ public class Selector implements Selectable, AutoCloseable {
*/
public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
key.attach(channel);
this.channels.put(id, channel);
}
@ -311,20 +327,46 @@ public class Selector implements Selectable, AutoCloseable {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {
//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.unmute();
}
}
outOfMemory = false;
}
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
//poll from channels that have buffered data (but nothing more from the underlying socket)
if (!keysWithBufferedRead.isEmpty()) {
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
//poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
@ -339,10 +381,16 @@ public class Selector implements Selectable, AutoCloseable {
addToCompletedReceives();
}
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
/**
* handle any ready I/O on a set of selection keys
* @param selectionKeys set of keys to handle
* @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
* @param currentTimeNanos time at which set of keys was determined
*/
private void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator<SelectionKey> iterator = selectionKeys.iterator();
Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
@ -372,14 +420,18 @@ public class Selector implements Selectable, AutoCloseable {
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
if (channel.isConnected() && !channel.ready()) {
channel.prepare();
}
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
attemptRead(key, channel);
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers that we could not read
//(possibly because no memory). it may be the case that the underlying socket will
//not come up in the next poll() and so we need to remember this channel for the
//next poll call otherwise data may be stuck in said buffers forever.
keysWithBufferedRead.add(key);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
@ -408,6 +460,39 @@ public class Selector implements Selectable, AutoCloseable {
}
}
private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
//it is possible that the iteration order over selectionKeys is the same every invocation.
//this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.
Collection<SelectionKey> inHandlingOrder;
if (!outOfMemory && memoryPool.availableMemory() < lowMemThreshold) {
List<SelectionKey> temp = new ArrayList<>(selectionKeys);
Collections.shuffle(temp);
inHandlingOrder = temp;
} else {
inHandlingOrder = selectionKeys;
}
return inHandlingOrder;
}
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
addToStagedReceives(channel, networkReceive);
}
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
// Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
if (recordTimePerConnection)
@ -442,6 +527,7 @@ public class Selector implements Selectable, AutoCloseable {
private void mute(KafkaChannel channel) {
channel.mute();
explicitlyMutedChannels.add(channel);
}
@Override
@ -451,6 +537,7 @@ public class Selector implements Selectable, AutoCloseable {
}
private void unmute(KafkaChannel channel) {
explicitlyMutedChannels.remove(channel);
channel.unmute();
}
@ -509,6 +596,7 @@ public class Selector implements Selectable, AutoCloseable {
this.disconnected.put(channel, ChannelState.FAILED_SEND);
}
this.failedSends.clear();
this.madeReadProgressLastPoll = false;
}
/**
@ -674,7 +762,7 @@ public class Selector implements Selectable, AutoCloseable {
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
if (!explicitlyMutedChannels.contains(channel)) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
@ -900,4 +988,13 @@ public class Selector implements Selectable, AutoCloseable {
}
}
//package-private for testing
boolean isOutOfMemory() {
return outOfMemory;
}
//package-private for testing
boolean isMadeReadProgressLastPoll() {
return madeReadProgressLastPoll;
}
}

View File

@ -22,6 +22,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.KafkaException;
@ -50,12 +51,13 @@ public class SslChannelBuilder implements ChannelBuilder {
}
}
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
@Override
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
try {
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
Authenticator authenticator = new DefaultAuthenticator();
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
throw new KafkaException(e);

View File

@ -738,6 +738,11 @@ public class SslTransportLayer implements TransportLayer {
return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) == 0;
}
@Override
public boolean hasBytesBuffered() {
return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, this);

View File

@ -84,6 +84,11 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
boolean isMute();
/**
* @return true if channel has bytes to be read in any intermediate buffers
*/
boolean hasBytesBuffered();
/**
* Transfers bytes from `fileChannel` to this `TransportLayer`.
*
@ -99,5 +104,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
* @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
*/
long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
public class ProtoUtils {
public static void walk(Schema schema, SchemaVisitor visitor) {
if (schema == null || visitor == null) {
throw new IllegalArgumentException("Both schema and visitor must be provided");
}
handleNode(schema, visitor);
}
private static void handleNode(Type node, SchemaVisitor visitor) {
if (node instanceof Schema) {
Schema schema = (Schema) node;
visitor.visit(schema);
for (Field f : schema.fields()) {
handleNode(f.type, visitor);
}
} else if (node instanceof ArrayOf) {
ArrayOf array = (ArrayOf) node;
visitor.visit(array);
handleNode(array.type(), visitor);
} else {
visitor.visit(node);
}
}
}

View File

@ -21,10 +21,13 @@ import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
@ -32,6 +35,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT16;
import static org.apache.kafka.common.protocol.types.Type.INT32;
import static org.apache.kafka.common.protocol.types.Type.INT64;
import static org.apache.kafka.common.protocol.types.Type.INT8;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
import static org.apache.kafka.common.protocol.types.Type.RECORDS;
import static org.apache.kafka.common.protocol.types.Type.STRING;
import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@ -1825,6 +1829,7 @@ public class Protocol {
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
static final EnumSet<ApiKeys> DELAYED_DEALLOCATION_REQUESTS; //initialized in static block
/* the latest version of each api */
static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@ -1924,6 +1929,44 @@ public class Protocol {
throw new IllegalStateException("Request and response for version " + i + " of API "
+ api.id + " are defined inconsistently. One is null while the other is not null.");
}
/* go over all request schemata and find those that retain links to the underlying ByteBuffer from
* which they were read out. knowing which requests do (and do not) retain a reference to the buffer
* is needed to enable buffers to be released as soon as possible for requests that no longer need them */
Set<ApiKeys> requestsWithBufferRefs = new HashSet<>();
for (int reqId = 0; reqId < REQUESTS.length; reqId++) {
ApiKeys requestType = ApiKeys.forId(reqId);
Schema[] schemata = REQUESTS[reqId];
if (schemata == null) {
continue;
}
for (Schema requestVersionSchema : schemata) {
if (retainsBufferReference(requestVersionSchema)) {
requestsWithBufferRefs.add(requestType);
break; //kafka is loose with versions, so if _ANY_ version retains buffers we must assume all do.
}
}
}
DELAYED_DEALLOCATION_REQUESTS = EnumSet.copyOf(requestsWithBufferRefs);
}
private static boolean retainsBufferReference(Schema schema) {
if (schema == null) {
return false;
}
final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
SchemaVisitor detector = new SchemaVisitorAdapter() {
@Override
public void visit(Type field) {
if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
foundBufferReference.set(Boolean.TRUE);
}
}
};
foundBufferReference.set(Boolean.FALSE);
ProtoUtils.walk(schema, detector);
return foundBufferReference.get();
}
public static boolean apiVersionSupported(short apiKey, short apiVersion) {
@ -1936,6 +1979,10 @@ public class Protocol {
0);
}
public static boolean requiresDelayedDeallocation(int apiKey) {
return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
}
private static String indentString(int size) {
StringBuilder b = new StringBuilder(size);
for (int i = 0; i < size; i++)

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
public interface SchemaVisitor {
void visit(Schema schema);
void visit(ArrayOf array);
void visit(Type field);
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Type;
public abstract class SchemaVisitorAdapter implements SchemaVisitor {
@Override
public void visit(Schema schema) {
//nop
}
@Override
public void visit(ArrayOf array) {
//nop
}
@Override
public void visit(Type field) {
//nop
}
}

View File

@ -70,4 +70,9 @@ public class Field {
return schema;
}
@Override
public String toString() {
return name + ":" + type;
}
}

View File

@ -139,9 +139,7 @@ public class Schema extends Type {
StringBuilder b = new StringBuilder();
b.append('{');
for (int i = 0; i < this.fields.length; i++) {
b.append(this.fields[i].name);
b.append(':');
b.append(this.fields[i].type());
b.append(this.fields[i].toString());
if (i < this.fields.length - 1)
b.append(',');
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
import java.text.DecimalFormat;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,6 +63,11 @@ public class Utils {
// IPv6 is supported with [ip] pattern
private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
// Prints up to 2 decimal digits. Used for human readable printing
private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
private static final String[] BYTE_SCALE_SUFFIXES = new String[] {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"};
public static final String NL = System.getProperty("line.separator");
private static final Logger log = LoggerFactory.getLogger(Utils.class);
@ -378,6 +384,28 @@ public class Utils {
: host + ":" + port;
}
/**
* Formats a byte number as a human readable String ("3.2 MB")
* @param bytes some size in bytes
* @return
*/
public static String formatBytes(long bytes) {
if (bytes < 0) {
return "" + bytes;
}
double asDouble = (double) bytes;
int ordinal = (int) Math.floor(Math.log(asDouble) / Math.log(1024.0));
double scale = Math.pow(1024.0, ordinal);
double scaled = asDouble / scale;
String formatted = TWO_DIGIT_FORMAT.format(scaled);
try {
return formatted + " " + BYTE_SCALE_SUFFIXES[ordinal];
} catch (IndexOutOfBoundsException e) {
//huge number?
return "" + asDouble;
}
}
/**
* Create a string representation of an array joined by the given separator
* @param strs The array of items
@ -632,6 +660,7 @@ public class Utils {
IOException exception = null;
for (Closeable closeable : closeables) {
try {
if (closeable != null)
closeable.close();
} catch (IOException e) {
if (exception != null)

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.memory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Utils;
import org.junit.Assert;
import org.junit.Test;
public class GarbageCollectedMemoryPoolTest {
@Test(expected = IllegalArgumentException.class)
public void testZeroSize() throws Exception {
new GarbageCollectedMemoryPool(0, 7, true, null);
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeSize() throws Exception {
new GarbageCollectedMemoryPool(-1, 7, false, null);
}
@Test(expected = IllegalArgumentException.class)
public void testZeroMaxAllocation() throws Exception {
new GarbageCollectedMemoryPool(100, 0, true, null);
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeMaxAllocation() throws Exception {
new GarbageCollectedMemoryPool(100, -1, false, null);
}
@Test(expected = IllegalArgumentException.class)
public void testMaxAllocationLargerThanSize() throws Exception {
new GarbageCollectedMemoryPool(100, 101, true, null);
}
@Test(expected = IllegalArgumentException.class)
public void testAllocationOverMaxAllocation() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
pool.tryAllocate(11);
}
@Test(expected = IllegalArgumentException.class)
public void testAllocationZero() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
pool.tryAllocate(0);
}
@Test(expected = IllegalArgumentException.class)
public void testAllocationNegative() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
pool.tryAllocate(-1);
}
@Test(expected = IllegalArgumentException.class)
public void testReleaseNull() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
pool.release(null);
}
@Test(expected = IllegalArgumentException.class)
public void testReleaseForeignBuffer() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
ByteBuffer fellOffATruck = ByteBuffer.allocate(1);
pool.release(fellOffATruck);
}
@Test
public void testDoubleFree() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
ByteBuffer buffer = pool.tryAllocate(5);
Assert.assertNotNull(buffer);
pool.release(buffer); //so far so good
try {
pool.release(buffer);
Assert.fail("2nd release() should have failed");
} catch (IllegalArgumentException e) {
//as expected
} catch (Throwable t) {
Assert.fail("expected an IllegalArgumentException. instead got " + t);
}
}
@Test
public void testAllocationBound() throws Exception {
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(21, 10, false, null);
ByteBuffer buf1 = pool.tryAllocate(10);
Assert.assertNotNull(buf1);
Assert.assertEquals(10, buf1.capacity());
ByteBuffer buf2 = pool.tryAllocate(10);
Assert.assertNotNull(buf2);
Assert.assertEquals(10, buf2.capacity());
ByteBuffer buf3 = pool.tryAllocate(10);
Assert.assertNotNull(buf3);
Assert.assertEquals(10, buf3.capacity());
//no more allocations
Assert.assertNull(pool.tryAllocate(1));
//release a buffer
pool.release(buf3);
//now we can have more
ByteBuffer buf4 = pool.tryAllocate(10);
Assert.assertNotNull(buf4);
Assert.assertEquals(10, buf4.capacity());
//no more allocations
Assert.assertNull(pool.tryAllocate(1));
}
@Test
public void testBuffersGarbageCollected() throws Exception {
Runtime runtime = Runtime.getRuntime();
long maxHeap = runtime.maxMemory(); //in bytes
long maxPool = maxHeap / 2;
long maxSingleAllocation = maxPool / 10;
Assert.assertTrue(maxSingleAllocation < Integer.MAX_VALUE / 2); //test JVM running with too much memory for this test logic (?)
GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(maxPool, (int) maxSingleAllocation, false, null);
//we will allocate 30 buffers from this pool, which is sized such that at-most
//11 should coexist and 30 do not fit in the JVM memory, proving that:
// 1. buffers were reclaimed and
// 2. the pool registered the reclamation.
int timeoutSeconds = 30;
long giveUp = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
boolean success = false;
int buffersAllocated = 0;
while (System.currentTimeMillis() < giveUp) {
ByteBuffer buffer = pool.tryAllocate((int) maxSingleAllocation);
if (buffer == null) {
System.gc();
Thread.sleep(10);
continue;
}
buffersAllocated++;
if (buffersAllocated >= 30) {
success = true;
break;
}
}
Assert.assertTrue("failed to allocate 30 buffers in " + timeoutSeconds + " seconds."
+ " buffers allocated: " + buffersAllocated + " heap " + Utils.formatBytes(maxHeap)
+ " pool " + Utils.formatBytes(maxPool) + " single allocation "
+ Utils.formatBytes(maxSingleAllocation), success);
}
}

View File

@ -94,14 +94,15 @@ public class NioEchoServer extends Thread {
List<NetworkReceive> completedReceives = selector.completedReceives();
for (NetworkReceive rcv : completedReceives) {
KafkaChannel channel = channel(rcv.source());
channel.mute();
String channelId = channel.id();
selector.mute(channelId);
NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
if (outputChannel == null)
selector.send(send);
else {
for (ByteBuffer buffer : send.buffers)
outputChannel.write(buffer);
channel.unmute();
selector.unmute(channelId);
}
}
for (Send send : selector.completedSends())

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* test helper class that will connect to a given server address, write out the given payload and disconnect
*/
public class PlaintextSender extends Thread {
public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
super(new Runnable() {
@Override
public void run() {
try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());
OutputStream os = connection.getOutputStream()) {
os.write(payload);
os.flush();
} catch (Exception e) {
e.printStackTrace(System.err);
}
}
});
setDaemon(true);
setName("PlaintextSender - " + payload.length + " bytes @ " + serverAddress);
}
}

View File

@ -22,13 +22,22 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
@ -50,7 +59,7 @@ public class SelectorTest {
protected Time time;
protected Selector selector;
protected ChannelBuilder channelBuilder;
private Metrics metrics;
protected Metrics metrics;
@Before
public void setUp() throws Exception {
@ -322,6 +331,87 @@ public class SelectorTest {
assertTrue("Unexpected receive", selector.completedReceives().isEmpty());
}
@Test
public void testMuteOnOOM() throws Exception {
//clean up default selector, replace it with one that uses a finite mem pool
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
new HashMap<String, String>(), true, false, channelBuilder, pool);
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
Thread sender1 = createSender(serverAddress, randomPayload(900));
Thread sender2 = createSender(serverAddress, randomPayload(900));
sender1.start();
sender2.start();
//wait until everything has been flushed out to network (assuming payload size is smaller than OS buffer size)
//this is important because we assume both requests' prefixes (1st 4 bytes) have made it.
sender1.join(5000);
sender2.join(5000);
SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
channelX.configureBlocking(false);
SocketChannel channelY = ss.accept();
channelY.configureBlocking(false);
selector.register("clientX", channelX);
selector.register("clientY", channelY);
List<NetworkReceive> completed = Collections.emptyList();
long deadline = System.currentTimeMillis() + 5000;
while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
selector.poll(1000);
completed = selector.completedReceives();
}
assertEquals("could not read a single request within timeout", 1, completed.size());
NetworkReceive firstReceive = completed.get(0);
assertEquals(0, pool.availableMemory());
assertTrue(selector.isOutOfMemory());
selector.poll(10);
assertTrue(selector.completedReceives().isEmpty());
assertEquals(0, pool.availableMemory());
assertTrue(selector.isOutOfMemory());
firstReceive.close();
assertEquals(900, pool.availableMemory()); //memory has been released back to pool
completed = Collections.emptyList();
deadline = System.currentTimeMillis() + 5000;
while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
selector.poll(1000);
completed = selector.completedReceives();
}
assertEquals("could not read a single request within timeout", 1, selector.completedReceives().size());
assertEquals(0, pool.availableMemory());
assertFalse(selector.isOutOfMemory());
}
}
private Thread createSender(InetSocketAddress serverAddress, byte[] payload) {
return new PlaintextSender(serverAddress, payload);
}
protected byte[] randomPayload(int sizeBytes) throws Exception {
Random random = new Random();
byte[] payload = new byte[sizeBytes + 4];
random.nextBytes(payload);
ByteArrayOutputStream prefixOs = new ByteArrayOutputStream();
DataOutputStream prefixDos = new DataOutputStream(prefixOs);
prefixDos.writeInt(sizeBytes);
prefixDos.flush();
prefixDos.close();
prefixOs.flush();
prefixOs.close();
byte[] prefix = prefixOs.toByteArray();
System.arraycopy(prefix, 0, payload, 0, prefix.length);
return payload;
}
private String blockingRequest(String node, String s) throws IOException {
selector.send(createSend(node, s));
selector.poll(1000L);

View File

@ -17,17 +17,23 @@
package org.apache.kafka.common.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
@ -35,6 +41,7 @@ import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestSslUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -43,7 +50,6 @@ import org.junit.Test;
*/
public class SslSelectorTest extends SelectorTest {
private Metrics metrics;
private Map<String, Object> sslClientConfigs;
@Before
@ -160,6 +166,90 @@ public class SslSelectorTest extends SelectorTest {
}
@Override
public void testMuteOnOOM() throws Exception {
//clean up default selector, replace it with one that uses a finite mem pool
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null);
//the initial channel builder is for clients, we need a server one
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
channelBuilder = new SslChannelBuilder(Mode.SERVER);
channelBuilder.configure(sslServerConfigs);
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
new HashMap<String, String>(), true, false, channelBuilder, pool);
try (ServerSocketChannel ss = ServerSocketChannel.open()) {
ss.bind(new InetSocketAddress(0));
InetSocketAddress serverAddress = (InetSocketAddress) ss.getLocalAddress();
SslSender sender1 = createSender(serverAddress, randomPayload(900));
SslSender sender2 = createSender(serverAddress, randomPayload(900));
sender1.start();
sender2.start();
SocketChannel channelX = ss.accept(); //not defined if its 1 or 2
channelX.configureBlocking(false);
SocketChannel channelY = ss.accept();
channelY.configureBlocking(false);
selector.register("clientX", channelX);
selector.register("clientY", channelY);
boolean success = false;
NetworkReceive firstReceive = null;
long deadline = System.currentTimeMillis() + 5000;
//keep calling poll until:
//1. both senders have completed the handshakes (so server selector has tried reading both payloads)
//2. a single payload is actually read out completely (the other is too big to fit)
while (System.currentTimeMillis() < deadline) {
selector.poll(10);
List<NetworkReceive> completed = selector.completedReceives();
if (firstReceive == null) {
if (!completed.isEmpty()) {
assertEquals("expecting a single request", 1, completed.size());
firstReceive = completed.get(0);
assertTrue(selector.isMadeReadProgressLastPoll());
assertEquals(0, pool.availableMemory());
}
} else {
assertTrue("only expecting single request", completed.isEmpty());
}
boolean handshaked = sender1.waitForHandshake(1);
handshaked = handshaked && sender2.waitForHandshake(1);
if (handshaked && firstReceive != null) {
success = true;
break;
}
}
if (!success) {
Assert.fail("could not initiate connections within timeout");
}
selector.poll(10);
assertTrue(selector.completedReceives().isEmpty());
assertEquals(0, pool.availableMemory());
assertTrue(selector.isOutOfMemory());
firstReceive.close();
assertEquals(900, pool.availableMemory()); //memory has been released back to pool
List<NetworkReceive> completed = Collections.emptyList();
deadline = System.currentTimeMillis() + 5000;
while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
selector.poll(1000);
completed = selector.completedReceives();
}
assertEquals("could not read remaining request within timeout", 1, completed.size());
assertEquals(0, pool.availableMemory());
assertFalse(selector.isOutOfMemory());
}
}
/**
* Connects and waits for handshake to complete. This is required since SslTransportLayer
* implementation requires the channel to be ready before send is invoked (unlike plaintext
@ -169,4 +259,7 @@ public class SslSelectorTest extends SelectorTest {
blockingConnect(node, serverAddr);
}
private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) {
return new SslSender(serverAddress, payload);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class SslSender extends Thread {
private final InetSocketAddress serverAddress;
private final byte[] payload;
private final CountDownLatch handshaked = new CountDownLatch(1);
public SslSender(InetSocketAddress serverAddress, byte[] payload) {
this.serverAddress = serverAddress;
this.payload = payload;
setDaemon(true);
setName("SslSender - " + payload.length + " bytes @ " + serverAddress);
}
@Override
public void run() {
try {
SSLContext sc = SSLContext.getInstance("TLSv1.2");
sc.init(null, new TrustManager[]{new NaiveTrustManager()}, new java.security.SecureRandom());
try (SSLSocket connection = (SSLSocket) sc.getSocketFactory().createSocket(serverAddress.getAddress(), serverAddress.getPort())) {
OutputStream os = connection.getOutputStream();
connection.startHandshake();
handshaked.countDown();
os.write(payload);
os.flush();
}
} catch (Exception e) {
e.printStackTrace(System.err);
}
}
public boolean waitForHandshake(long timeoutMillis) throws InterruptedException {
return handshaked.await(timeoutMillis, TimeUnit.MILLISECONDS);
}
/**
* blindly trust any certificate presented to it
*/
private static class NaiveTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
//nop
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
//nop
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}

View File

@ -39,6 +39,7 @@ import javax.net.ssl.SSLParameters;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.metrics.Metrics;
@ -580,7 +581,7 @@ public class SslTransportLayerTest {
public void testNetworkThreadTimeRecorded() throws Exception {
selector.close();
this.selector = new Selector(NetworkReceive.UNLIMITED, 5000, new Metrics(), Time.SYSTEM,
"MetricGroup", new HashMap<String, String>(), false, true, channelBuilder);
"MetricGroup", new HashMap<String, String>(), false, true, channelBuilder, MemoryPool.NONE);
String node = "0";
server = createEchoServer(SecurityProtocol.SSL);

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.junit.Assert;
import org.junit.Test;
public class ProtoUtilsTest {
@Test
public void testDelayedAllocationSchemaDetection() throws Exception {
//verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.
for (ApiKeys key : ApiKeys.values()) {
if (key == ApiKeys.PRODUCE || key == ApiKeys.JOIN_GROUP || key == ApiKeys.SYNC_GROUP) {
Assert.assertTrue(Protocol.requiresDelayedDeallocation(key.id));
} else {
Assert.assertFalse(Protocol.requiresDelayedDeallocation(key.id));
}
}
}
}

View File

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.Random;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.junit.Assert.assertArrayEquals;
@ -77,6 +78,17 @@ public class UtilsTest {
assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348", 5678));
}
@Test
public void testFormatBytes() {
assertEquals("-1", formatBytes(-1));
assertEquals("1023 B", formatBytes(1023));
assertEquals("1 KB", formatBytes(1024));
assertEquals("1024 KB", formatBytes((1024 * 1024) - 1));
assertEquals("1 MB", formatBytes(1024 * 1024));
assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));
assertEquals("10 MB", formatBytes(10 * 1024 * 1024));
}
@Test
public void testJoin() {
assertEquals("", Utils.join(Collections.emptyList(), ","));

View File

@ -29,9 +29,10 @@ import kafka.server.QuotaId
import kafka.utils.{Logging, NotNothing}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
import org.apache.kafka.common.record.{RecordBatch, MemoryRecords}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
@ -41,7 +42,7 @@ import scala.reflect.ClassTag
object RequestChannel extends Logging {
val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
buffer = shutdownReceive, startTimeNanos = 0, listenerName = new ListenerName(""),
buffer = shutdownReceive, memoryPool = MemoryPool.NONE, startTimeNanos = 0, listenerName = new ListenerName(""),
securityProtocol = SecurityProtocol.PLAINTEXT)
private val requestLogger = Logger.getLogger("kafka.request.logger")
@ -56,10 +57,12 @@ object RequestChannel extends Logging {
val sanitizedUser = QuotaId.sanitize(principal.getName)
}
case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer,
startTimeNanos: Long, listenerName: ListenerName, securityProtocol: SecurityProtocol) {
case class Request(processor: Int, connectionId: String, session: Session, buffer: ByteBuffer,
private val memoryPool: MemoryPool, startTimeNanos: Long, listenerName: ListenerName,
securityProtocol: SecurityProtocol) {
// These need to be volatile because the readers are in the network thread and the writers are in the request
// handler threads or the purgatory threads
@volatile var bufferReference = buffer
@volatile var requestDequeueTimeNanos = -1L
@volatile var apiLocalCompleteTimeNanos = -1L
@volatile var responseCompleteTimeNanos = -1L
@ -104,7 +107,12 @@ object RequestChannel extends Logging {
else
null
buffer = null
//most request types are parsed entirely into objects at this point. for those we can release the underlying buffer.
//some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference
//to the buffer. for those requests we cannot release the buffer early, but only when request processing is done.
if (!Protocol.requiresDelayedDeallocation(requestId)) {
dispose()
}
def requestDesc(details: Boolean): String = {
if (requestObj != null)
@ -194,6 +202,13 @@ object RequestChannel extends Logging {
.format(requestDesc(detailsEnabled), connectionId, totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs, responseSendTimeMs, securityProtocol, session.principal, listenerName.value))
}
}
def dispose(): Unit = {
if (bufferReference != null) {
memoryPool.release(bufferReference)
bufferReference = null
}
}
}
object Response {

View File

@ -32,7 +32,9 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Rate
import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.protocol.SecurityProtocol
@ -61,6 +63,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
@ -86,7 +92,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
val processorEndIndex = processorBeginIndex + numProcessorThreads
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
@ -109,7 +115,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}.sum / totalProcessorThreads
}
)
newGauge("MemoryPoolAvailable",
new Gauge[Long] {
def value = memoryPool.availableMemory()
}
)
newGauge("MemoryPoolUsed",
new Gauge[Long] {
def value = memoryPool.size() - memoryPool.availableMemory()
}
)
info("Started " + acceptors.size + " acceptor threads")
}
@ -138,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
/* `protected` for test usage */
protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol): Processor = {
securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id,
time,
config.socketRequestMaxBytes,
@ -149,7 +164,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
securityProtocol,
config,
metrics,
credentialProvider
credentialProvider,
memoryPool
)
}
@ -378,7 +394,8 @@ private[kafka] class Processor(val id: Int,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
credentialProvider: CredentialProvider,
memoryPool: MemoryPool) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
private object ConnectionId {
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@ -422,7 +439,8 @@ private[kafka] class Processor(val id: Int,
metricTags,
false,
true,
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache))
ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache),
memoryPool)
override def run() {
startupComplete()
@ -517,7 +535,8 @@ private[kafka] class Processor(val id: Int,
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeNanos = time.nanoseconds,
listenerName = listenerName, securityProtocol = securityProtocol)
listenerName = listenerName, securityProtocol = securityProtocol,
memoryPool = memoryPool)
requestChannel.sendRequest(req)
selector.mute(receive.source)
} catch {

View File

@ -52,6 +52,7 @@ object Defaults {
val NumIoThreads = 8
val BackgroundThreads = 10
val QueuedMaxRequests = 500
val QueuedMaxRequestBytes = -1
/************* Authorizer Configuration ***********/
val AuthorizerClassName = ""
@ -236,6 +237,7 @@ object KafkaConfig {
val NumIoThreadsProp = "num.io.threads"
val BackgroundThreadsProp = "background.threads"
val QueuedMaxRequestsProp = "queued.max.requests"
val QueuedMaxBytesProp = "queued.max.request.bytes"
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@ -420,6 +422,7 @@ object KafkaConfig {
val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization"
@ -684,6 +687,7 @@ object KafkaConfig {
.define(NumIoThreadsProp, INT, Defaults.NumIoThreads, atLeast(1), HIGH, NumIoThreadsDoc)
.define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
.define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
.define(QueuedMaxBytesProp, LONG, Defaults.QueuedMaxRequestBytes, MEDIUM, QueuedMaxRequestBytesDoc)
.define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
/************* Authorizer Configuration ***********/
@ -900,6 +904,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
val backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
val queuedMaxBytes = getLong(KafkaConfig.QueuedMaxBytesProp)
val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
@ -1191,5 +1196,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")
}
}

View File

@ -41,8 +41,8 @@ class KafkaRequestHandler(id: Int,
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
try {
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
@ -69,6 +69,9 @@ class KafkaRequestHandler(id: Int,
latch.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
if (req != null)
req.dispose()
}
}
}

View File

@ -29,6 +29,7 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, NetworkSend, Send}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
@ -328,9 +329,9 @@ class SocketServerTest extends JUnitSuite {
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol): Processor = {
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) {
config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE) {
override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
conn.close()
super.sendResponse(response, responseSend)

View File

@ -35,6 +35,7 @@ import kafka.server._
import kafka.utils.{MockTime, TestUtils, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
@ -395,7 +396,7 @@ class KafkaApisTest {
val header = new RequestHeader(builder.apiKey.id, request.version, "", 0)
val buffer = request.serialize(header)
val session = Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost)
(request, RequestChannel.Request(1, "1", session, buffer, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
(request, RequestChannel.Request(1, "1", session, buffer, MemoryPool.NONE, 0, new ListenerName(""), SecurityProtocol.PLAINTEXT))
}
private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {

View File

@ -545,6 +545,7 @@ class KafkaConfigTest {
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AuthorizerClassNameProp => //ignore string