diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d180e885a2..e4bc9727958 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -96,7 +96,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG), this.totalMemorySize, config.getLong(ProducerConfig.LINGER_MS_CONFIG), - config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL), + config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG), metrics, new SystemTime()); List addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG)); @@ -108,7 +108,10 @@ public class KafkaProducer implements Producer { config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG), + config.getInt(ProducerConfig.MAX_RETRIES_CONFIG), config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), new SystemTime()); this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); this.ioThread.start(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index dca9802c8d0..d8e35e7d0e4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -62,7 +62,7 @@ public class ProducerConfig extends AbstractConfig { /** * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent * faster than they can be delivered to the server the producer will either block or throw an exception based on the - * preference specified by {@link #BLOCK_ON_BUFFER_FULL}. + * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}. */ public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes"; @@ -106,6 +106,11 @@ public class ProducerConfig extends AbstractConfig { */ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; + /** + * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this) + */ + public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + /** * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server * has its own cap on record size which may be different from this. @@ -123,9 +128,17 @@ public class ProducerConfig extends AbstractConfig { * this setting is true and we block, however users who want to guarantee we never block can turn this into an * error. */ - public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full"; + public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; - public static final String ENABLE_JMX = "enable.jmx"; + /** + * The maximum number of times to attempt resending the request before giving up. + */ + public static final String MAX_RETRIES_CONFIG = "request.retries"; + + /** + * Should we register the Kafka metrics as JMX mbeans? + */ + public static final String ENABLE_JMX_CONFIG = "enable.jmx"; static { /* TODO: add docs */ @@ -142,10 +155,12 @@ public class ProducerConfig extends AbstractConfig { .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah") .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah") - .define(BLOCK_ON_BUFFER_FULL, Type.BOOLEAN, true, "blah blah") - .define(ENABLE_JMX, Type.BOOLEAN, true, ""); + .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah") + .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "") + .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), ""); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 52d30a86d04..62613a3e29a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -24,7 +20,6 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; - /** * A class encapsulating some of the logic around metadata. *

@@ -134,4 +129,11 @@ public final class Metadata { notifyAll(); } + /** + * The last time metadata was updated. + */ + public synchronized long lastUpdate() { + return this.lastRefresh; + } + } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index be8a4a399b8..ce5cf27efa0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -1,25 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; @@ -39,10 +34,9 @@ import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; - /** - * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} instances to be - * sent to the server. + * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} + * instances to be sent to the server. *

* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless * this behavior is explicitly disabled. @@ -151,6 +145,17 @@ public final class RecordAccumulator { } } + /** + * Re-enqueue the given record batch in the accumulator to retry + */ + public void reenqueue(RecordBatch batch, long now) { + batch.attempts++; + Deque deque = dequeFor(batch.topicPartition); + synchronized (deque) { + deque.addFirst(batch); + } + } + /** * Get a list of topic-partitions which are ready to be sent. *

@@ -229,16 +234,10 @@ public final class RecordAccumulator { } /** - * Deallocate the list of record batches + * Deallocate the record batch */ - public void deallocate(Collection batches) { - ByteBuffer[] buffers = new ByteBuffer[batches.size()]; - int i = 0; - for (RecordBatch batch : batches) { - buffers[i] = batch.records.buffer(); - i++; - } - free.deallocate(buffers); + public void deallocate(RecordBatch batch) { + free.deallocate(batch.records.buffer()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7a440a3dd29..eb16f6d236e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer.internals; @@ -25,7 +21,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; - /** * A batch of records that is or will be sent. * @@ -33,6 +28,7 @@ import org.apache.kafka.common.record.MemoryRecords; */ public final class RecordBatch { public int recordCount = 0; + public volatile int attempts = 0; public final long created; public final MemoryRecords records; public final TopicPartition topicPartition; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d93a455827a..e373265f19f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -22,12 +22,15 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; @@ -41,6 +44,7 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; /** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata @@ -48,19 +52,55 @@ import org.apache.kafka.common.utils.Time; */ public class Sender implements Runnable { - private final Map nodeState; + /* the state of each nodes connection */ + private final NodeStates nodeStates; + + /* the record accumulator that batches records */ private final RecordAccumulator accumulator; + + /* the selector used to perform network i/o */ private final Selectable selector; + + /* the client id used to identify this client in requests to the server */ private final String clientId; + + /* the maximum request size to attempt to send to the server */ private final int maxRequestSize; - private final long reconnectBackoffMs; + + /* the number of acknowledgements to request from the server */ private final short acks; + + /* the max time in ms for the server to wait for acknowlegements */ private final int requestTimeout; + + /* the number of times to retry a failed request before giving up */ + private final int retries; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the set of currently in-flight requests awaiting a response from the server */ private final InFlightRequests inFlightRequests; + + /* a reference to the current Cluster instance */ private final Metadata metadata; + + /* the clock instance used for getting the time */ private final Time time; + + /* the current node to attempt to use for metadata requests (will round-robin over nodes) */ + private int metadataFetchNodeIndex; + + /* the current correlation id to use when sending requests to servers */ private int correlation; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; + + /* true while the sender thread is still running */ private volatile boolean running; public Sender(Selectable selector, @@ -70,22 +110,28 @@ public class Sender implements Runnable { int maxRequestSize, long reconnectBackoffMs, short acks, + int retries, int requestTimeout, + int socketSendBuffer, + int socketReceiveBuffer, Time time) { - this.nodeState = new HashMap(); + this.nodeStates = new NodeStates(reconnectBackoffMs); this.accumulator = accumulator; this.selector = selector; this.maxRequestSize = maxRequestSize; - this.reconnectBackoffMs = reconnectBackoffMs; this.metadata = metadata; this.clientId = clientId; this.running = true; this.requestTimeout = requestTimeout; this.acks = acks; + this.retries = retries; + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; this.inFlightRequests = new InFlightRequests(); this.correlation = 0; this.metadataFetchInProgress = false; this.time = time; + this.metadataFetchNodeIndex = new Random().nextInt(); } /** @@ -130,11 +176,7 @@ public class Sender implements Runnable { // should we update our metadata? List sends = new ArrayList(); - InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); - if (metadataReq != null) { - sends.add(metadataReq.request); - this.inFlightRequests.add(metadataReq); - } + maybeUpdateMetadata(cluster, sends, now); // prune the list of ready topics to eliminate any that we aren't ready to send yet List sendable = processReadyPartitions(cluster, ready, now); @@ -158,43 +200,76 @@ public class Sender implements Runnable { // handle responses, connections, and disconnections handleSends(this.selector.completedSends()); handleResponses(this.selector.completedReceives(), now); - handleDisconnects(this.selector.disconnected()); + handleDisconnects(this.selector.disconnected(), now); handleConnects(this.selector.connected()); return ready.size(); } - private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) { + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(Cluster cluster, List sends, long now) { if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) - return null; + return; - Node node = nextFreeNode(cluster); + Node node = selectMetadataDestination(cluster); if (node == null) - return null; + return; - NodeState state = nodeState.get(node.id()); - if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { + if (nodeStates.isConnected(node.id())) { + this.metadataFetchInProgress = true; + InFlightRequest request = metadataRequest(node.id(), metadata.topics()); + sends.add(request.request); + this.inFlightRequests.add(request); + } else if (nodeStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one initiateConnect(node, now); - return null; - } else if (state.state == ConnectionState.CONNECTED) { - this.metadataFetchInProgress = true; - return metadataRequest(node.id(), metadata.topics()); - } else { - return null; } } /** + * Find a good node to make a metadata request to. This method will first look for a node that has an existing + * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding + * requests. * @return A node with no requests currently being sent or null if no such node exists */ - private Node nextFreeNode(Cluster cluster) { - for (int i = 0; i < cluster.nodes().size(); i++) { - Node node = cluster.nextNode(); - if (this.inFlightRequests.canSendMore(node.id())) + private Node selectMetadataDestination(Cluster cluster) { + List nodes = cluster.nodes(); + + // first look for a node to which we are connected and have no outstanding requests + boolean connectionInProgress = false; + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); return node; + } else if (nodeStates.isConnecting(node.id())) { + connectionInProgress = true; + } } - return null; + + // if we have a connection that is being established now, just wait for that don't make another + if (connectionInProgress) + return null; + + // okay, no luck, pick a random unused node + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (this.inFlightRequests.canSendMore(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } + } + + return null; // we failed to find a good destination + } + + /** + * Get the index in the node list of the node to use for the metadata request + */ + private int metadataNodeIndex(int offset, int size) { + return Utils.abs(offset + this.metadataFetchNodeIndex) % size; } /** @@ -209,7 +284,7 @@ public class Sender implements Runnable { /** * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate - * metdata to be able to do so + * metadata to be able to do so */ private List processReadyPartitions(Cluster cluster, List ready, long now) { List sendable = new ArrayList(ready.size()); @@ -218,15 +293,11 @@ public class Sender implements Runnable { if (node == null) { // we don't know about this topic/partition or it has no leader, re-fetch metadata metadata.forceUpdate(); - } else { - NodeState state = nodeState.get(node.id()); - // TODO: encapsulate this logic somehow - if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { - // we don't have a connection to this node right now, make one - initiateConnect(node, now); - } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) { - sendable.add(tp); - } + } else if (nodeStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + sendable.add(tp); + } else if (nodeStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); } } return sendable; @@ -237,13 +308,11 @@ public class Sender implements Runnable { */ private void initiateConnect(Node node, long now) { try { - selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO - // socket - // buffers - nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now)); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.nodeStates.connecting(node.id(), now); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now)); + nodeStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ metadata.forceUpdate(); } @@ -252,19 +321,26 @@ public class Sender implements Runnable { /** * Handle any closed connections */ - private void handleDisconnects(List disconnects) { + private void handleDisconnects(List disconnects, long now) { + // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { - for (RecordBatch batch : request.batches.values()) - batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); - this.accumulator.deallocate(request.batches.values()); + for (RecordBatch batch : request.batches.values()) { + if (canRetry(batch, Errors.NETWORK_EXCEPTION)) { + this.accumulator.reenqueue(batch, now); + } else { + batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response.")); + this.accumulator.deallocate(batch); + } + } } - NodeState state = this.nodeState.get(request.request.destination()); - if (state != null) - state.state = ConnectionState.DISCONNECTED; + nodeStates.disconnected(request.request.destination()); } } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (disconnects.size() > 0) + this.metadata.forceUpdate(); } /** @@ -272,7 +348,7 @@ public class Sender implements Runnable { */ private void handleConnects(List connects) { for (Integer id : connects) - this.nodeState.get(id).state = ConnectionState.CONNECTED; + this.nodeStates.connected(id); } /** @@ -286,9 +362,10 @@ public class Sender implements Runnable { if (!request.expectResponse) { requests.pollFirst(); if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { - for (RecordBatch batch : request.batches.values()) + for (RecordBatch batch : request.batches.values()) { batch.done(-1L, Errors.NONE.exception()); - this.accumulator.deallocate(request.batches.values()); + this.accumulator.deallocate(batch); + } } } } @@ -306,7 +383,7 @@ public class Sender implements Runnable { Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request.header(), header); if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) - handleProduceResponse(req, body); + handleProduceResponse(req, body, now); else if (req.request.header().apiKey() == ApiKeys.METADATA.id) handleMetadataResponse(body, now); else @@ -327,7 +404,7 @@ public class Sender implements Runnable { /** * Handle a produce response */ - private void handleProduceResponse(InFlightRequest request, Struct response) { + private void handleProduceResponse(InFlightRequest request, Struct response, long now) { for (Object topicResponse : (Object[]) response.get("responses")) { Struct topicRespStruct = (Struct) topicResponse; String topic = (String) topicRespStruct.get("topic"); @@ -335,12 +412,31 @@ public class Sender implements Runnable { Struct partRespStruct = (Struct) partResponse; int partition = (Integer) partRespStruct.get("partition"); short errorCode = (Short) partRespStruct.get("error_code"); + + // if we got an error we may need to refresh our metadata + Errors error = Errors.forCode(errorCode); + if (error.exception() instanceof InvalidMetadataException) + metadata.forceUpdate(); + long offset = (Long) partRespStruct.get("base_offset"); RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); - batch.done(offset, Errors.forCode(errorCode).exception()); + if (canRetry(batch, error)) { + // retry + this.accumulator.reenqueue(batch, now); + } else { + // tell the user the result of their request + batch.done(offset, error.exception()); + this.accumulator.deallocate(batch); + } } } - this.accumulator.deallocate(request.batches.values()); + } + + /** + * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed + */ + private boolean canRetry(RecordBatch batch, Errors error) { + return batch.attempts < this.retries && error.exception() instanceof RetriableException; } /** @@ -459,6 +555,53 @@ public class Sender implements Runnable { } } + private static class NodeStates { + private final long reconnectBackoffMs; + private final Map nodeState; + + public NodeStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap(); + } + + public boolean canConnect(int node, long now) { + NodeState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs; + } + + public void connecting(int node, long now) { + nodeState.put(node, new NodeState(ConnectionState.CONNECTING, now)); + } + + public boolean isConnected(int node) { + NodeState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + public boolean isConnecting(int node) { + NodeState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + private NodeState nodeState(int node) { + NodeState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } + } + /** * An request that hasn't been fully processed yet */ diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index c17a8f8162d..5caaaae1d2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common; @@ -23,17 +19,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kafka.common.utils.Utils; - /** * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster. */ public final class Cluster { - private final AtomicInteger counter = new AtomicInteger(0); private final List nodes; private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; @@ -126,15 +117,4 @@ public final class Cluster { return this.partitionsByTopic.get(topic); } - /** - * Round-robin over the nodes in this cluster - */ - public Node nextNode() { - int size = nodes.size(); - if (size == 0) - throw new IllegalStateException("No known nodes."); - int idx = Utils.abs(counter.getAndIncrement()) % size; - return this.nodes.get(idx); - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java index 673f61d6271..eaccf276dbf 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/CorruptRecordException.java @@ -1,22 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class CorruptRecordException extends ApiException { +/** + * This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk + * corruption. + */ +public class CorruptRecordException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java new file mode 100644 index 00000000000..8841badb2d7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * An exception that may indicate the client's metadata is out of date + */ +public abstract class InvalidMetadataException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public InvalidMetadataException() { + super(); + } + + public InvalidMetadataException(String message) { + super(message); + } + + public InvalidMetadataException(String message, Throwable cause) { + super(message, cause); + } + + public InvalidMetadataException(Throwable cause) { + super(cause); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java index 0bde6b5a351..9d7ebd47a84 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/LeaderNotAvailableException.java @@ -1,35 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class LeaderNotAvailableException extends RetryableException { +/** + * There is no currently available leader for the given partition (either because a leadership election is in progress + * or because all replicas are down). + */ +public class LeaderNotAvailableException extends InvalidMetadataException { private static final long serialVersionUID = 1L; - public LeaderNotAvailableException(String message, Throwable cause) { - super(message, cause); - } - public LeaderNotAvailableException(String message) { super(message); } - public LeaderNotAvailableException(Throwable cause) { - super(cause); - } - } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java index 3a041593d76..f0baa983f08 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NetworkException.java @@ -1,22 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class NetworkException extends ApiException { +/** + * A misc. network-related IOException occurred when making a request. This could be because the client's metadata is + * out of date and it is making a request to a node that is now dead. + */ +public class NetworkException extends InvalidMetadataException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java index 5adc72ccf2d..ad9c77c41c4 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java @@ -1,22 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class NotLeaderForPartitionException extends RetryableException { +/** + * This server is not the leader for the given partition + */ +public class NotLeaderForPartitionException extends InvalidMetadataException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java index a3159bb1034..0be2f500685 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMetadataTooLarge.java @@ -1,21 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; +/** + * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server. + */ public class OffsetMetadataTooLarge extends ApiException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java index d01698a3efc..fc7c6e3471b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetOutOfRangeException.java @@ -1,22 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class OffsetOutOfRangeException extends ApiException { +/** + * This offset is either larger or smaller than the range of offsets the server has for the given partition. + * + */ +public class OffsetOutOfRangeException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java index ce95ca04aa8..737b7f07b16 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/RecordTooLargeException.java @@ -1,21 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; +/** + * This record is larger than the maximum allowable size + */ public class RecordTooLargeException extends ApiException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java new file mode 100644 index 00000000000..6c639a972d7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/RetriableException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * A retryable exception is a transient exception that if retried may succeed. + */ +public abstract class RetriableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public RetriableException(String message, Throwable cause) { + super(message, cause); + } + + public RetriableException(String message) { + super(message); + } + + public RetriableException(Throwable cause) { + super(cause); + } + + public RetriableException() { + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java b/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java deleted file mode 100644 index c7f2f222f71..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/RetryableException.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.errors; - -/** - * A retryable exception is an exception that is safe to retry. To be retryable an exception should be - *

    - *
  1. Transient, there is no point retrying a error due to a non-existant topic or message too large - *
  2. Idempotent, the exception is known to not change any state on the server - *
- * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to - * retry. - */ -public abstract class RetryableException extends ApiException { - - private static final long serialVersionUID = 1L; - - public RetryableException(String message, Throwable cause) { - super(message, cause); - } - - public RetryableException(String message) { - super(message); - } - - public RetryableException(Throwable cause) { - super(cause); - } - - public RetryableException() { - } - -} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java index dffd64d19c3..c7f569ca873 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/TimeoutException.java @@ -1,22 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class TimeoutException extends ApiException { +/** + * Indicates that a request timed out. + */ +public class TimeoutException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java index a0690fe2870..963ef081db5 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownServerException.java @@ -1,21 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; +/** + * An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an + * unexpected error. + * + */ public class UnknownServerException extends ApiException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java index 73d1953cbe0..ec423bd0129 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java @@ -1,22 +1,21 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.errors; -public class UnknownTopicOrPartitionException extends ApiException { +/** + * This topic/partition doesn't exist + */ +public class UnknownTopicOrPartitionException extends RetriableException { private static final long serialVersionUID = 1L; diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 8ed4c73146b..f1e474cd530 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -32,7 +28,6 @@ import java.util.Set; import org.apache.kafka.common.KafkaException; - /** * A selector interface for doing non-blocking multi-connection network I/O. *

@@ -302,8 +297,11 @@ public class Selector implements Selectable { private void close(SelectionKey key) throws IOException { SocketChannel channel = channel(key); Transmissions trans = transmissions(key); - if (trans != null) + if (trans != null) { this.disconnected.add(trans.id); + trans.clearReceive(); + trans.clearSend(); + } key.attach(null); key.cancel(); channel.socket().close(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 1bbe83c1bfd..a3bf07e4ae2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; @@ -25,7 +21,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.RecordBatch; import org.apache.kafka.common.TopicPartition; @@ -140,8 +135,8 @@ public class RecordAccumulatorTest { for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) read++; + accum.deallocate(batch); } - accum.deallocate(batches); } for (Thread t : threads) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java index 41c028bffbd..19a01258f74 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java @@ -1,29 +1,25 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.RecordAccumulator; import org.apache.kafka.clients.producer.internals.Sender; @@ -46,6 +42,16 @@ import org.junit.Test; public class SenderTest { + private static final String CLIENT_ID = ""; + private static final int MAX_REQUEST_SIZE = 1024 * 1024; + private static final long RECONNECT_BACKOFF_MS = 0L; + private static final short ACKS_ALL = -1; + private static final int MAX_RETRIES = 0; + private static final int REQUEST_TIMEOUT_MS = 10000; + private static final int SEND_BUFFER_SIZE = 64 * 1024; + private static final int RECEIVE_BUFFER_SIZE = 64 * 1024; + + private TopicPartition tp = new TopicPartition("test", 0); private MockTime time = new MockTime(); private MockSelector selector = new MockSelector(time); private int batchSize = 16 * 1024; @@ -53,7 +59,18 @@ public class SenderTest { private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = new Metrics(time); private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time); - private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time); + private Sender sender = new Sender(selector, + metadata, + this.accumulator, + CLIENT_ID, + MAX_REQUEST_SIZE, + RECONNECT_BACKOFF_MS, + ACKS_ALL, + MAX_RETRIES, + REQUEST_TIMEOUT_MS, + SEND_BUFFER_SIZE, + RECEIVE_BUFFER_SIZE, + time); @Before public void setup() { @@ -62,7 +79,6 @@ public class SenderTest { @Test public void testSimple() throws Exception { - TopicPartition tp = new TopicPartition("test", 0); Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); sender.run(time.milliseconds()); assertEquals("We should have connected", 1, selector.connected().size()); @@ -83,6 +99,93 @@ public class SenderTest { assertEquals(offset, future.get().offset()); } + @Test + public void testRetries() throws Exception { + // create a sender with retries = 1 + int maxRetries = 1; + Sender sender = new Sender(selector, + metadata, + this.accumulator, + CLIENT_ID, + MAX_REQUEST_SIZE, + RECONNECT_BACKOFF_MS, + ACKS_ALL, + maxRetries, + REQUEST_TIMEOUT_MS, + SEND_BUFFER_SIZE, + RECEIVE_BUFFER_SIZE, + time); + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + RequestSend request1 = completeSend(sender); + selector.clear(); + selector.completeReceive(produceResponse(request1.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + -1, + Errors.REQUEST_TIMED_OUT.code())); + sender.run(time.milliseconds()); + selector.clear(); + sender.run(time.milliseconds()); + RequestSend request2 = completeSend(sender); + selector.completeReceive(produceResponse(request2.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + 42, + Errors.NONE.code())); + sender.run(time.milliseconds()); + assertTrue("Request should retry and complete", future.isDone()); + assertEquals(42, future.get().offset()); + } + + @Test + public void testMetadataRefreshOnNoLeaderException() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + RequestSend request = completeSend(); + selector.clear(); + selector.completeReceive(produceResponse(request.header().correlationId(), + cluster.leaderFor(tp).id(), + tp.topic(), + tp.partition(), + -1, + Errors.NOT_LEADER_FOR_PARTITION.code())); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION); + assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + @Test + public void testMetadataRefreshOnDisconnect() throws Exception { + Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null); + completeSend(); + selector.clear(); + selector.disconnect(cluster.leaderFor(tp).id()); + sender.run(time.milliseconds()); + completedWithError(future, Errors.NETWORK_EXCEPTION); + assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds())); + } + + private void completedWithError(Future future, Errors error) throws Exception { + assertTrue("Request should be completed", future.isDone()); + try { + future.get(); + fail("Should have thrown an exception."); + } catch (ExecutionException e) { + assertEquals(error.exception().getClass(), e.getCause().getClass()); + } + } + + private RequestSend completeSend() { + return completeSend(sender); + } + + private RequestSend completeSend(Sender sender) { + while (selector.completedSends().size() == 0) + sender.run(time.milliseconds()); + return (RequestSend) selector.completedSends().get(0); + } + private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) { Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); Struct response = struct.instance("responses");