KAFKA-1252 Implement retries in new producer.

This commit is contained in:
Jay Kreps 2014-02-13 13:48:21 -08:00
parent f550cc76cd
commit 3f0b67b6ac
23 changed files with 652 additions and 401 deletions

View File

@ -96,7 +96,7 @@ public class KafkaProducer implements Producer {
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
this.totalMemorySize, this.totalMemorySize,
config.getLong(ProducerConfig.LINGER_MS_CONFIG), config.getLong(ProducerConfig.LINGER_MS_CONFIG),
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL), config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
metrics, metrics,
new SystemTime()); new SystemTime());
List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
@ -108,7 +108,10 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
(short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
config.getInt(ProducerConfig.MAX_RETRIES_CONFIG),
config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
new SystemTime()); new SystemTime());
this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true);
this.ioThread.start(); this.ioThread.start();

View File

@ -62,7 +62,7 @@ public class ProducerConfig extends AbstractConfig {
/** /**
* The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
* faster than they can be delivered to the server the producer will either block or throw an exception based on the * faster than they can be delivered to the server the producer will either block or throw an exception based on the
* preference specified by {@link #BLOCK_ON_BUFFER_FULL}. * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}.
*/ */
public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
@ -106,6 +106,11 @@ public class ProducerConfig extends AbstractConfig {
*/ */
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
/**
* The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this)
*/
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
/** /**
* The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
* has its own cap on record size which may be different from this. * has its own cap on record size which may be different from this.
@ -123,9 +128,17 @@ public class ProducerConfig extends AbstractConfig {
* this setting is true and we block, however users who want to guarantee we never block can turn this into an * this setting is true and we block, however users who want to guarantee we never block can turn this into an
* error. * error.
*/ */
public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full"; public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
public static final String ENABLE_JMX = "enable.jmx"; /**
* The maximum number of times to attempt resending the request before giving up.
*/
public static final String MAX_RETRIES_CONFIG = "request.retries";
/**
* Should we register the Kafka metrics as JMX mbeans?
*/
public static final String ENABLE_JMX_CONFIG = "enable.jmx";
static { static {
/* TODO: add docs */ /* TODO: add docs */
@ -142,10 +155,12 @@ public class ProducerConfig extends AbstractConfig {
.define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah") .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")
.define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
.define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah") .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
.define(ENABLE_JMX, Type.BOOLEAN, true, ""); .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "");
} }
ProducerConfig(Map<? extends Object, ? extends Object> props) { ProducerConfig(Map<? extends Object, ? extends Object> props) {

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
/** /**
* A class encapsulating some of the logic around metadata. * A class encapsulating some of the logic around metadata.
* <p> * <p>
@ -134,4 +129,11 @@ public final class Metadata {
notifyAll(); notifyAll();
} }
/**
* The last time metadata was updated.
*/
public synchronized long lastUpdate() {
return this.lastRefresh;
}
} }

View File

@ -1,25 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
@ -39,10 +34,9 @@ import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
/** /**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
* sent to the server. * instances to be sent to the server.
* <p> * <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled. * this behavior is explicitly disabled.
@ -151,6 +145,17 @@ public final class RecordAccumulator {
} }
} }
/**
* Re-enqueue the given record batch in the accumulator to retry
*/
public void reenqueue(RecordBatch batch, long now) {
batch.attempts++;
Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
synchronized (deque) {
deque.addFirst(batch);
}
}
/** /**
* Get a list of topic-partitions which are ready to be sent. * Get a list of topic-partitions which are ready to be sent.
* <p> * <p>
@ -229,16 +234,10 @@ public final class RecordAccumulator {
} }
/** /**
* Deallocate the list of record batches * Deallocate the record batch
*/ */
public void deallocate(Collection<RecordBatch> batches) { public void deallocate(RecordBatch batch) {
ByteBuffer[] buffers = new ByteBuffer[batches.size()]; free.deallocate(batch.records.buffer());
int i = 0;
for (RecordBatch batch : batches) {
buffers[i] = batch.records.buffer();
i++;
}
free.deallocate(buffers);
} }
/** /**

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer.internals; package org.apache.kafka.clients.producer.internals;
@ -25,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
/** /**
* A batch of records that is or will be sent. * A batch of records that is or will be sent.
* *
@ -33,6 +28,7 @@ import org.apache.kafka.common.record.MemoryRecords;
*/ */
public final class RecordBatch { public final class RecordBatch {
public int recordCount = 0; public int recordCount = 0;
public volatile int attempts = 0;
public final long created; public final long created;
public final MemoryRecords records; public final MemoryRecords records;
public final TopicPartition topicPartition; public final TopicPartition topicPartition;

View File

@ -22,12 +22,15 @@ import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.network.Selectable;
@ -41,6 +44,7 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
/** /**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
@ -48,19 +52,55 @@ import org.apache.kafka.common.utils.Time;
*/ */
public class Sender implements Runnable { public class Sender implements Runnable {
private final Map<Integer, NodeState> nodeState; /* the state of each nodes connection */
private final NodeStates nodeStates;
/* the record accumulator that batches records */
private final RecordAccumulator accumulator; private final RecordAccumulator accumulator;
/* the selector used to perform network i/o */
private final Selectable selector; private final Selectable selector;
/* the client id used to identify this client in requests to the server */
private final String clientId; private final String clientId;
/* the maximum request size to attempt to send to the server */
private final int maxRequestSize; private final int maxRequestSize;
private final long reconnectBackoffMs;
/* the number of acknowledgements to request from the server */
private final short acks; private final short acks;
/* the max time in ms for the server to wait for acknowlegements */
private final int requestTimeout; private final int requestTimeout;
/* the number of times to retry a failed request before giving up */
private final int retries;
/* the socket send buffer size in bytes */
private final int socketSendBuffer;
/* the socket receive size buffer in bytes */
private final int socketReceiveBuffer;
/* the set of currently in-flight requests awaiting a response from the server */
private final InFlightRequests inFlightRequests; private final InFlightRequests inFlightRequests;
/* a reference to the current Cluster instance */
private final Metadata metadata; private final Metadata metadata;
/* the clock instance used for getting the time */
private final Time time; private final Time time;
/* the current node to attempt to use for metadata requests (will round-robin over nodes) */
private int metadataFetchNodeIndex;
/* the current correlation id to use when sending requests to servers */
private int correlation; private int correlation;
/* true iff there is a metadata request that has been sent and for which we have not yet received a response */
private boolean metadataFetchInProgress; private boolean metadataFetchInProgress;
/* true while the sender thread is still running */
private volatile boolean running; private volatile boolean running;
public Sender(Selectable selector, public Sender(Selectable selector,
@ -70,22 +110,28 @@ public class Sender implements Runnable {
int maxRequestSize, int maxRequestSize,
long reconnectBackoffMs, long reconnectBackoffMs,
short acks, short acks,
int retries,
int requestTimeout, int requestTimeout,
int socketSendBuffer,
int socketReceiveBuffer,
Time time) { Time time) {
this.nodeState = new HashMap<Integer, NodeState>(); this.nodeStates = new NodeStates(reconnectBackoffMs);
this.accumulator = accumulator; this.accumulator = accumulator;
this.selector = selector; this.selector = selector;
this.maxRequestSize = maxRequestSize; this.maxRequestSize = maxRequestSize;
this.reconnectBackoffMs = reconnectBackoffMs;
this.metadata = metadata; this.metadata = metadata;
this.clientId = clientId; this.clientId = clientId;
this.running = true; this.running = true;
this.requestTimeout = requestTimeout; this.requestTimeout = requestTimeout;
this.acks = acks; this.acks = acks;
this.retries = retries;
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.inFlightRequests = new InFlightRequests(); this.inFlightRequests = new InFlightRequests();
this.correlation = 0; this.correlation = 0;
this.metadataFetchInProgress = false; this.metadataFetchInProgress = false;
this.time = time; this.time = time;
this.metadataFetchNodeIndex = new Random().nextInt();
} }
/** /**
@ -130,11 +176,7 @@ public class Sender implements Runnable {
// should we update our metadata? // should we update our metadata?
List<NetworkSend> sends = new ArrayList<NetworkSend>(); List<NetworkSend> sends = new ArrayList<NetworkSend>();
InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); maybeUpdateMetadata(cluster, sends, now);
if (metadataReq != null) {
sends.add(metadataReq.request);
this.inFlightRequests.add(metadataReq);
}
// prune the list of ready topics to eliminate any that we aren't ready to send yet // prune the list of ready topics to eliminate any that we aren't ready to send yet
List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now); List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
@ -158,43 +200,76 @@ public class Sender implements Runnable {
// handle responses, connections, and disconnections // handle responses, connections, and disconnections
handleSends(this.selector.completedSends()); handleSends(this.selector.completedSends());
handleResponses(this.selector.completedReceives(), now); handleResponses(this.selector.completedReceives(), now);
handleDisconnects(this.selector.disconnected()); handleDisconnects(this.selector.disconnected(), now);
handleConnects(this.selector.connected()); handleConnects(this.selector.connected());
return ready.size(); return ready.size();
} }
private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) { /**
* Add a metadata request to the list of sends if we need to make one
*/
private void maybeUpdateMetadata(Cluster cluster, List<NetworkSend> sends, long now) {
if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
return null; return;
Node node = nextFreeNode(cluster); Node node = selectMetadataDestination(cluster);
if (node == null) if (node == null)
return null; return;
NodeState state = nodeState.get(node.id()); if (nodeStates.isConnected(node.id())) {
if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { this.metadataFetchInProgress = true;
InFlightRequest request = metadataRequest(node.id(), metadata.topics());
sends.add(request.request);
this.inFlightRequests.add(request);
} else if (nodeStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one // we don't have a connection to this node right now, make one
initiateConnect(node, now); initiateConnect(node, now);
return null;
} else if (state.state == ConnectionState.CONNECTED) {
this.metadataFetchInProgress = true;
return metadataRequest(node.id(), metadata.topics());
} else {
return null;
} }
} }
/** /**
* Find a good node to make a metadata request to. This method will first look for a node that has an existing
* connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding
* requests.
* @return A node with no requests currently being sent or null if no such node exists * @return A node with no requests currently being sent or null if no such node exists
*/ */
private Node nextFreeNode(Cluster cluster) { private Node selectMetadataDestination(Cluster cluster) {
for (int i = 0; i < cluster.nodes().size(); i++) { List<Node> nodes = cluster.nodes();
Node node = cluster.nextNode();
if (this.inFlightRequests.canSendMore(node.id())) // first look for a node to which we are connected and have no outstanding requests
boolean connectionInProgress = false;
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) {
this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
return node; return node;
} else if (nodeStates.isConnecting(node.id())) {
connectionInProgress = true;
}
} }
return null;
// if we have a connection that is being established now, just wait for that don't make another
if (connectionInProgress)
return null;
// okay, no luck, pick a random unused node
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get(metadataNodeIndex(i, nodes.size()));
if (this.inFlightRequests.canSendMore(node.id())) {
this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size());
return node;
}
}
return null; // we failed to find a good destination
}
/**
* Get the index in the node list of the node to use for the metadata request
*/
private int metadataNodeIndex(int offset, int size) {
return Utils.abs(offset + this.metadataFetchNodeIndex) % size;
} }
/** /**
@ -209,7 +284,7 @@ public class Sender implements Runnable {
/** /**
* Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
* it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
* metdata to be able to do so * metadata to be able to do so
*/ */
private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) { private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size()); List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
@ -218,15 +293,11 @@ public class Sender implements Runnable {
if (node == null) { if (node == null) {
// we don't know about this topic/partition or it has no leader, re-fetch metadata // we don't know about this topic/partition or it has no leader, re-fetch metadata
metadata.forceUpdate(); metadata.forceUpdate();
} else { } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) {
NodeState state = nodeState.get(node.id()); sendable.add(tp);
// TODO: encapsulate this logic somehow } else if (nodeStates.canConnect(node.id(), now)) {
if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { // we don't have a connection to this node right now, make one
// we don't have a connection to this node right now, make one initiateConnect(node, now);
initiateConnect(node, now);
} else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
sendable.add(tp);
}
} }
} }
return sendable; return sendable;
@ -237,13 +308,11 @@ public class Sender implements Runnable {
*/ */
private void initiateConnect(Node node, long now) { private void initiateConnect(Node node, long now) {
try { try {
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
// socket this.nodeStates.connecting(node.id(), now);
// buffers
nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
} catch (IOException e) { } catch (IOException e) {
/* attempt failed, we'll try again after the backoff */ /* attempt failed, we'll try again after the backoff */
nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now)); nodeStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */ /* maybe the problem is our metadata, update it */
metadata.forceUpdate(); metadata.forceUpdate();
} }
@ -252,19 +321,26 @@ public class Sender implements Runnable {
/** /**
* Handle any closed connections * Handle any closed connections
*/ */
private void handleDisconnects(List<Integer> disconnects) { private void handleDisconnects(List<Integer> disconnects, long now) {
// clear out the in-flight requests for the disconnected broker
for (int node : disconnects) { for (int node : disconnects) {
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
if (request.batches != null) { if (request.batches != null) {
for (RecordBatch batch : request.batches.values()) for (RecordBatch batch : request.batches.values()) {
batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
this.accumulator.deallocate(request.batches.values()); this.accumulator.reenqueue(batch, now);
} else {
batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
this.accumulator.deallocate(batch);
}
}
} }
NodeState state = this.nodeState.get(request.request.destination()); nodeStates.disconnected(request.request.destination());
if (state != null)
state.state = ConnectionState.DISCONNECTED;
} }
} }
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (disconnects.size() > 0)
this.metadata.forceUpdate();
} }
/** /**
@ -272,7 +348,7 @@ public class Sender implements Runnable {
*/ */
private void handleConnects(List<Integer> connects) { private void handleConnects(List<Integer> connects) {
for (Integer id : connects) for (Integer id : connects)
this.nodeState.get(id).state = ConnectionState.CONNECTED; this.nodeStates.connected(id);
} }
/** /**
@ -286,9 +362,10 @@ public class Sender implements Runnable {
if (!request.expectResponse) { if (!request.expectResponse) {
requests.pollFirst(); requests.pollFirst();
if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
for (RecordBatch batch : request.batches.values()) for (RecordBatch batch : request.batches.values()) {
batch.done(-1L, Errors.NONE.exception()); batch.done(-1L, Errors.NONE.exception());
this.accumulator.deallocate(request.batches.values()); this.accumulator.deallocate(batch);
}
} }
} }
} }
@ -306,7 +383,7 @@ public class Sender implements Runnable {
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request.header(), header); correlate(req.request.header(), header);
if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
handleProduceResponse(req, body); handleProduceResponse(req, body, now);
else if (req.request.header().apiKey() == ApiKeys.METADATA.id) else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
handleMetadataResponse(body, now); handleMetadataResponse(body, now);
else else
@ -327,7 +404,7 @@ public class Sender implements Runnable {
/** /**
* Handle a produce response * Handle a produce response
*/ */
private void handleProduceResponse(InFlightRequest request, Struct response) { private void handleProduceResponse(InFlightRequest request, Struct response, long now) {
for (Object topicResponse : (Object[]) response.get("responses")) { for (Object topicResponse : (Object[]) response.get("responses")) {
Struct topicRespStruct = (Struct) topicResponse; Struct topicRespStruct = (Struct) topicResponse;
String topic = (String) topicRespStruct.get("topic"); String topic = (String) topicRespStruct.get("topic");
@ -335,12 +412,31 @@ public class Sender implements Runnable {
Struct partRespStruct = (Struct) partResponse; Struct partRespStruct = (Struct) partResponse;
int partition = (Integer) partRespStruct.get("partition"); int partition = (Integer) partRespStruct.get("partition");
short errorCode = (Short) partRespStruct.get("error_code"); short errorCode = (Short) partRespStruct.get("error_code");
// if we got an error we may need to refresh our metadata
Errors error = Errors.forCode(errorCode);
if (error.exception() instanceof InvalidMetadataException)
metadata.forceUpdate();
long offset = (Long) partRespStruct.get("base_offset"); long offset = (Long) partRespStruct.get("base_offset");
RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
batch.done(offset, Errors.forCode(errorCode).exception()); if (canRetry(batch, error)) {
// retry
this.accumulator.reenqueue(batch, now);
} else {
// tell the user the result of their request
batch.done(offset, error.exception());
this.accumulator.deallocate(batch);
}
} }
} }
this.accumulator.deallocate(request.batches.values()); }
/**
* We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
*/
private boolean canRetry(RecordBatch batch, Errors error) {
return batch.attempts < this.retries && error.exception() instanceof RetriableException;
} }
/** /**
@ -459,6 +555,53 @@ public class Sender implements Runnable {
} }
} }
private static class NodeStates {
private final long reconnectBackoffMs;
private final Map<Integer, NodeState> nodeState;
public NodeStates(long reconnectBackoffMs) {
this.reconnectBackoffMs = reconnectBackoffMs;
this.nodeState = new HashMap<Integer, NodeState>();
}
public boolean canConnect(int node, long now) {
NodeState state = nodeState.get(node);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs;
}
public void connecting(int node, long now) {
nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now));
}
public boolean isConnected(int node) {
NodeState state = nodeState.get(node);
return state != null && state.state == ConnectionState.CONNECTED;
}
public boolean isConnecting(int node) {
NodeState state = nodeState.get(node);
return state != null && state.state == ConnectionState.CONNECTING;
}
public void connected(int node) {
nodeState(node).state = ConnectionState.CONNECTED;
}
public void disconnected(int node) {
nodeState(node).state = ConnectionState.DISCONNECTED;
}
private NodeState nodeState(int node) {
NodeState state = this.nodeState.get(node);
if (state == null)
throw new IllegalStateException("No entry found for node " + node);
return state;
}
}
/** /**
* An request that hasn't been fully processed yet * An request that hasn't been fully processed yet
*/ */

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common; package org.apache.kafka.common;
@ -23,17 +19,12 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.Utils;
/** /**
* A representation of a subset of the nodes, topics, and partitions in the Kafka cluster. * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/ */
public final class Cluster { public final class Cluster {
private final AtomicInteger counter = new AtomicInteger(0);
private final List<Node> nodes; private final List<Node> nodes;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic; private final Map<String, List<PartitionInfo>> partitionsByTopic;
@ -126,15 +117,4 @@ public final class Cluster {
return this.partitionsByTopic.get(topic); return this.partitionsByTopic.get(topic);
} }
/**
* Round-robin over the nodes in this cluster
*/
public Node nextNode() {
int size = nodes.size();
if (size == 0)
throw new IllegalStateException("No known nodes.");
int idx = Utils.abs(counter.getAndIncrement()) % size;
return this.nodes.get(idx);
}
} }

View File

@ -1,22 +1,22 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class CorruptRecordException extends ApiException { /**
* This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk
* corruption.
*/
public class CorruptRecordException extends RetriableException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* An exception that may indicate the client's metadata is out of date
*/
public abstract class InvalidMetadataException extends RetriableException {
private static final long serialVersionUID = 1L;
public InvalidMetadataException() {
super();
}
public InvalidMetadataException(String message) {
super(message);
}
public InvalidMetadataException(String message, Throwable cause) {
super(message, cause);
}
public InvalidMetadataException(Throwable cause) {
super(cause);
}
}

View File

@ -1,35 +1,27 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class LeaderNotAvailableException extends RetryableException { /**
* There is no currently available leader for the given partition (either because a leadership election is in progress
* or because all replicas are down).
*/
public class LeaderNotAvailableException extends InvalidMetadataException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public LeaderNotAvailableException(String message, Throwable cause) {
super(message, cause);
}
public LeaderNotAvailableException(String message) { public LeaderNotAvailableException(String message) {
super(message); super(message);
} }
public LeaderNotAvailableException(Throwable cause) {
super(cause);
}
} }

View File

@ -1,22 +1,22 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class NetworkException extends ApiException { /**
* A misc. network-related IOException occurred when making a request. This could be because the client's metadata is
* out of date and it is making a request to a node that is now dead.
*/
public class NetworkException extends InvalidMetadataException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,22 +1,21 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class NotLeaderForPartitionException extends RetryableException { /**
* This server is not the leader for the given partition
*/
public class NotLeaderForPartitionException extends InvalidMetadataException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,21 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
/**
* The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server.
*/
public class OffsetMetadataTooLarge extends ApiException { public class OffsetMetadataTooLarge extends ApiException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,22 +1,22 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class OffsetOutOfRangeException extends ApiException { /**
* This offset is either larger or smaller than the range of offsets the server has for the given partition.
*
*/
public class OffsetOutOfRangeException extends RetriableException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,21 +1,20 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
/**
* This record is larger than the maximum allowable size
*/
public class RecordTooLargeException extends ApiException { public class RecordTooLargeException extends ApiException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* A retryable exception is a transient exception that if retried may succeed.
*/
public abstract class RetriableException extends ApiException {
private static final long serialVersionUID = 1L;
public RetriableException(String message, Throwable cause) {
super(message, cause);
}
public RetriableException(String message) {
super(message);
}
public RetriableException(Throwable cause) {
super(cause);
}
public RetriableException() {
}
}

View File

@ -1,47 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* A retryable exception is an exception that is safe to retry. To be retryable an exception should be
* <ol>
* <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
* <li>Idempotent, the exception is known to not change any state on the server
* </ol>
* A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
* retry.
*/
public abstract class RetryableException extends ApiException {
private static final long serialVersionUID = 1L;
public RetryableException(String message, Throwable cause) {
super(message, cause);
}
public RetryableException(String message) {
super(message);
}
public RetryableException(Throwable cause) {
super(cause);
}
public RetryableException() {
}
}

View File

@ -1,22 +1,21 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class TimeoutException extends ApiException { /**
* Indicates that a request timed out.
*/
public class TimeoutException extends RetriableException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,21 +1,22 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
/**
* An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an
* unexpected error.
*
*/
public class UnknownServerException extends ApiException { public class UnknownServerException extends ApiException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,22 +1,21 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.errors; package org.apache.kafka.common.errors;
public class UnknownTopicOrPartitionException extends ApiException { /**
* This topic/partition doesn't exist
*/
public class UnknownTopicOrPartitionException extends RetriableException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.network; package org.apache.kafka.common.network;
@ -32,7 +28,6 @@ import java.util.Set;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
/** /**
* A selector interface for doing non-blocking multi-connection network I/O. * A selector interface for doing non-blocking multi-connection network I/O.
* <p> * <p>
@ -302,8 +297,11 @@ public class Selector implements Selectable {
private void close(SelectionKey key) throws IOException { private void close(SelectionKey key) throws IOException {
SocketChannel channel = channel(key); SocketChannel channel = channel(key);
Transmissions trans = transmissions(key); Transmissions trans = transmissions(key);
if (trans != null) if (trans != null) {
this.disconnected.add(trans.id); this.disconnected.add(trans.id);
trans.clearReceive();
trans.clearSend();
}
key.attach(null); key.attach(null);
key.cancel(); key.cancel();
channel.socket().close(); channel.socket().close();

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
@ -25,7 +21,6 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.RecordBatch; import org.apache.kafka.clients.producer.internals.RecordBatch;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -140,8 +135,8 @@ public class RecordAccumulatorTest {
for (RecordBatch batch : batches) { for (RecordBatch batch : batches) {
for (LogEntry entry : batch.records) for (LogEntry entry : batch.records)
read++; read++;
accum.deallocate(batch);
} }
accum.deallocate(batches);
} }
for (Thread t : threads) for (Thread t : threads)

View File

@ -1,29 +1,25 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* Unless required by applicable law or agreed to in writing, software * specific language governing permissions and limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Metadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender; import org.apache.kafka.clients.producer.internals.Sender;
@ -46,6 +42,16 @@ import org.junit.Test;
public class SenderTest { public class SenderTest {
private static final String CLIENT_ID = "";
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final long RECONNECT_BACKOFF_MS = 0L;
private static final short ACKS_ALL = -1;
private static final int MAX_RETRIES = 0;
private static final int REQUEST_TIMEOUT_MS = 10000;
private static final int SEND_BUFFER_SIZE = 64 * 1024;
private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
private TopicPartition tp = new TopicPartition("test", 0);
private MockTime time = new MockTime(); private MockTime time = new MockTime();
private MockSelector selector = new MockSelector(time); private MockSelector selector = new MockSelector(time);
private int batchSize = 16 * 1024; private int batchSize = 16 * 1024;
@ -53,7 +59,18 @@ public class SenderTest {
private Cluster cluster = TestUtils.singletonCluster("test", 1); private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = new Metrics(time); private Metrics metrics = new Metrics(time);
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time); private Sender sender = new Sender(selector,
metadata,
this.accumulator,
CLIENT_ID,
MAX_REQUEST_SIZE,
RECONNECT_BACKOFF_MS,
ACKS_ALL,
MAX_RETRIES,
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
time);
@Before @Before
public void setup() { public void setup() {
@ -62,7 +79,6 @@ public class SenderTest {
@Test @Test
public void testSimple() throws Exception { public void testSimple() throws Exception {
TopicPartition tp = new TopicPartition("test", 0);
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
sender.run(time.milliseconds()); sender.run(time.milliseconds());
assertEquals("We should have connected", 1, selector.connected().size()); assertEquals("We should have connected", 1, selector.connected().size());
@ -83,6 +99,93 @@ public class SenderTest {
assertEquals(offset, future.get().offset()); assertEquals(offset, future.get().offset());
} }
@Test
public void testRetries() throws Exception {
// create a sender with retries = 1
int maxRetries = 1;
Sender sender = new Sender(selector,
metadata,
this.accumulator,
CLIENT_ID,
MAX_REQUEST_SIZE,
RECONNECT_BACKOFF_MS,
ACKS_ALL,
maxRetries,
REQUEST_TIMEOUT_MS,
SEND_BUFFER_SIZE,
RECEIVE_BUFFER_SIZE,
time);
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
RequestSend request1 = completeSend(sender);
selector.clear();
selector.completeReceive(produceResponse(request1.header().correlationId(),
cluster.leaderFor(tp).id(),
tp.topic(),
tp.partition(),
-1,
Errors.REQUEST_TIMED_OUT.code()));
sender.run(time.milliseconds());
selector.clear();
sender.run(time.milliseconds());
RequestSend request2 = completeSend(sender);
selector.completeReceive(produceResponse(request2.header().correlationId(),
cluster.leaderFor(tp).id(),
tp.topic(),
tp.partition(),
42,
Errors.NONE.code()));
sender.run(time.milliseconds());
assertTrue("Request should retry and complete", future.isDone());
assertEquals(42, future.get().offset());
}
@Test
public void testMetadataRefreshOnNoLeaderException() throws Exception {
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
RequestSend request = completeSend();
selector.clear();
selector.completeReceive(produceResponse(request.header().correlationId(),
cluster.leaderFor(tp).id(),
tp.topic(),
tp.partition(),
-1,
Errors.NOT_LEADER_FOR_PARTITION.code()));
sender.run(time.milliseconds());
completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION);
assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
}
@Test
public void testMetadataRefreshOnDisconnect() throws Exception {
Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
completeSend();
selector.clear();
selector.disconnect(cluster.leaderFor(tp).id());
sender.run(time.milliseconds());
completedWithError(future, Errors.NETWORK_EXCEPTION);
assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
}
private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
assertTrue("Request should be completed", future.isDone());
try {
future.get();
fail("Should have thrown an exception.");
} catch (ExecutionException e) {
assertEquals(error.exception().getClass(), e.getCause().getClass());
}
}
private RequestSend completeSend() {
return completeSend(sender);
}
private RequestSend completeSend(Sender sender) {
while (selector.completedSends().size() == 0)
sender.run(time.milliseconds());
return (RequestSend) selector.completedSends().get(0);
}
private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
Struct response = struct.instance("responses"); Struct response = struct.instance("responses");