mirror of https://github.com/apache/kafka.git
KAFKA-1286 Retry can block. Patch from Guozhang, reviewed by jay.
This commit is contained in:
parent
153ac8aa60
commit
5ba48348b3
|
@ -102,6 +102,7 @@ public class KafkaProducer implements Producer {
|
||||||
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
|
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
|
||||||
this.totalMemorySize,
|
this.totalMemorySize,
|
||||||
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
|
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
|
||||||
|
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
|
||||||
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
|
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
|
||||||
metrics,
|
metrics,
|
||||||
new SystemTime());
|
new SystemTime());
|
||||||
|
|
|
@ -135,6 +135,12 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
*/
|
*/
|
||||||
public static final String MAX_RETRIES_CONFIG = "request.retries";
|
public static final String MAX_RETRIES_CONFIG = "request.retries";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids
|
||||||
|
* repeated sending-and-failing in a tight loop
|
||||||
|
*/
|
||||||
|
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Should we register the Kafka metrics as JMX mbeans?
|
* Should we register the Kafka metrics as JMX mbeans?
|
||||||
*/
|
*/
|
||||||
|
@ -160,7 +166,8 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
|
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
|
||||||
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
|
.define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
|
||||||
.define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
|
.define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
|
||||||
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "");
|
.define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
|
||||||
|
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 500L, atLeast(0L), "blah blah");
|
||||||
}
|
}
|
||||||
|
|
||||||
ProducerConfig(Map<? extends Object, ? extends Object> props) {
|
ProducerConfig(Map<? extends Object, ? extends Object> props) {
|
||||||
|
|
|
@ -51,9 +51,10 @@ public final class RecordAccumulator {
|
||||||
private int drainIndex;
|
private int drainIndex;
|
||||||
private final int batchSize;
|
private final int batchSize;
|
||||||
private final long lingerMs;
|
private final long lingerMs;
|
||||||
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
|
private final long retryBackoffMs;
|
||||||
private final BufferPool free;
|
private final BufferPool free;
|
||||||
private final Time time;
|
private final Time time;
|
||||||
|
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new record accumulator
|
* Create a new record accumulator
|
||||||
|
@ -63,16 +64,25 @@ public final class RecordAccumulator {
|
||||||
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
|
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
|
||||||
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
|
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
|
||||||
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
|
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
|
||||||
|
* @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
|
||||||
|
* exhausting all retries in a short period of time.
|
||||||
* @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
|
* @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
|
||||||
* memory
|
* memory
|
||||||
* @param metrics The metrics
|
* @param metrics The metrics
|
||||||
* @param time The time instance to use
|
* @param time The time instance to use
|
||||||
*/
|
*/
|
||||||
public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) {
|
public RecordAccumulator(int batchSize,
|
||||||
|
long totalSize,
|
||||||
|
long lingerMs,
|
||||||
|
long retryBackoffMs,
|
||||||
|
boolean blockOnBufferFull,
|
||||||
|
Metrics metrics,
|
||||||
|
Time time) {
|
||||||
this.drainIndex = 0;
|
this.drainIndex = 0;
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
this.batchSize = batchSize;
|
this.batchSize = batchSize;
|
||||||
this.lingerMs = lingerMs;
|
this.lingerMs = lingerMs;
|
||||||
|
this.retryBackoffMs = retryBackoffMs;
|
||||||
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
|
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
|
||||||
this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
|
this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
|
||||||
this.time = time;
|
this.time = time;
|
||||||
|
@ -155,6 +165,7 @@ public final class RecordAccumulator {
|
||||||
*/
|
*/
|
||||||
public void reenqueue(RecordBatch batch, long now) {
|
public void reenqueue(RecordBatch batch, long now) {
|
||||||
batch.attempts++;
|
batch.attempts++;
|
||||||
|
batch.lastAttempt = now;
|
||||||
Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
|
Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
|
||||||
synchronized (deque) {
|
synchronized (deque) {
|
||||||
deque.addFirst(batch);
|
deque.addFirst(batch);
|
||||||
|
@ -181,9 +192,11 @@ public final class RecordAccumulator {
|
||||||
synchronized (deque) {
|
synchronized (deque) {
|
||||||
RecordBatch batch = deque.peekFirst();
|
RecordBatch batch = deque.peekFirst();
|
||||||
if (batch != null) {
|
if (batch != null) {
|
||||||
|
boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
|
||||||
boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
|
boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
|
||||||
boolean expired = now - batch.created >= lingerMs;
|
boolean expired = now - batch.created >= lingerMs;
|
||||||
if (full | expired | exhausted | closed)
|
boolean sendable = full | expired | exhausted | closed;
|
||||||
|
if (sendable & !backingOff)
|
||||||
ready.add(batch.topicPartition);
|
ready.add(batch.topicPartition);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ public final class RecordBatch {
|
||||||
public int recordCount = 0;
|
public int recordCount = 0;
|
||||||
public volatile int attempts = 0;
|
public volatile int attempts = 0;
|
||||||
public final long created;
|
public final long created;
|
||||||
|
public long lastAttempt;
|
||||||
public final MemoryRecords records;
|
public final MemoryRecords records;
|
||||||
public final TopicPartition topicPartition;
|
public final TopicPartition topicPartition;
|
||||||
private final ProduceRequestResult produceFuture;
|
private final ProduceRequestResult produceFuture;
|
||||||
|
@ -41,6 +42,7 @@ public final class RecordBatch {
|
||||||
|
|
||||||
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
|
public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
|
||||||
this.created = now;
|
this.created = now;
|
||||||
|
this.lastAttempt = now;
|
||||||
this.records = records;
|
this.records = records;
|
||||||
this.topicPartition = tp;
|
this.topicPartition = tp;
|
||||||
this.produceFuture = new ProduceRequestResult();
|
this.produceFuture = new ProduceRequestResult();
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
|
||||||
import org.apache.kafka.common.protocol.types.Struct;
|
import org.apache.kafka.common.protocol.types.Struct;
|
||||||
import org.apache.kafka.common.requests.MetadataRequest;
|
import org.apache.kafka.common.requests.MetadataRequest;
|
||||||
import org.apache.kafka.common.requests.MetadataResponse;
|
import org.apache.kafka.common.requests.MetadataResponse;
|
||||||
|
import org.apache.kafka.common.requests.ProduceResponse;
|
||||||
import org.apache.kafka.common.requests.RequestHeader;
|
import org.apache.kafka.common.requests.RequestHeader;
|
||||||
import org.apache.kafka.common.requests.RequestSend;
|
import org.apache.kafka.common.requests.RequestSend;
|
||||||
import org.apache.kafka.common.requests.ResponseHeader;
|
import org.apache.kafka.common.requests.ResponseHeader;
|
||||||
|
@ -142,7 +143,7 @@ public class Sender implements Runnable {
|
||||||
* The main run loop for the sender thread
|
* The main run loop for the sender thread
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
log.trace("Starting Kafka producer I/O thread.");
|
log.debug("Starting Kafka producer I/O thread.");
|
||||||
|
|
||||||
// main loop, runs until close is called
|
// main loop, runs until close is called
|
||||||
while (running) {
|
while (running) {
|
||||||
|
@ -153,7 +154,7 @@ public class Sender implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
|
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
|
||||||
|
|
||||||
// okay we stopped accepting requests but there may still be
|
// okay we stopped accepting requests but there may still be
|
||||||
// requests in the accumulator or waiting for acknowledgment,
|
// requests in the accumulator or waiting for acknowledgment,
|
||||||
|
@ -170,7 +171,7 @@ public class Sender implements Runnable {
|
||||||
// close all the connections
|
// close all the connections
|
||||||
this.selector.close();
|
this.selector.close();
|
||||||
|
|
||||||
log.trace("Shutdown of Kafka producer I/O thread has completed.");
|
log.debug("Shutdown of Kafka producer I/O thread has completed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -216,8 +217,8 @@ public class Sender implements Runnable {
|
||||||
|
|
||||||
// handle responses, connections, and disconnections
|
// handle responses, connections, and disconnections
|
||||||
handleSends(this.selector.completedSends());
|
handleSends(this.selector.completedSends());
|
||||||
handleResponses(this.selector.completedReceives(), now);
|
handleResponses(this.selector.completedReceives(), time.milliseconds());
|
||||||
handleDisconnects(this.selector.disconnected(), now);
|
handleDisconnects(this.selector.disconnected(), time.milliseconds());
|
||||||
handleConnects(this.selector.connected());
|
handleConnects(this.selector.connected());
|
||||||
|
|
||||||
return ready.size();
|
return ready.size();
|
||||||
|
@ -348,15 +349,25 @@ public class Sender implements Runnable {
|
||||||
nodeStates.disconnected(node);
|
nodeStates.disconnected(node);
|
||||||
log.debug("Node {} disconnected.", node);
|
log.debug("Node {} disconnected.", node);
|
||||||
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
|
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
|
||||||
if (request.batches != null) {
|
ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
|
||||||
for (RecordBatch batch : request.batches.values()) {
|
switch (requestKey) {
|
||||||
if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
|
case PRODUCE:
|
||||||
this.accumulator.reenqueue(batch, now);
|
for (RecordBatch batch : request.batches.values()) {
|
||||||
} else {
|
if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
|
||||||
batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
|
log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).",
|
||||||
this.accumulator.deallocate(batch);
|
batch.topicPartition, this.retries - batch.attempts - 1);
|
||||||
|
this.accumulator.reenqueue(batch, now);
|
||||||
|
} else {
|
||||||
|
batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
|
||||||
|
this.accumulator.deallocate(batch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
break;
|
||||||
|
case METADATA:
|
||||||
|
metadataFetchInProgress = false;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -409,18 +420,18 @@ public class Sender implements Runnable {
|
||||||
correlate(req.request.header(), header);
|
correlate(req.request.header(), header);
|
||||||
if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
|
if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
|
||||||
log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
|
log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
|
||||||
handleProduceResponse(req, body, now);
|
handleProduceResponse(req, req.request.header(), body, now);
|
||||||
} else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
|
} else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
|
||||||
log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
|
log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
|
||||||
.correlationId());
|
.correlationId());
|
||||||
handleMetadataResponse(body, now);
|
handleMetadataResponse(req.request.header(), body, now);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
|
throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleMetadataResponse(Struct body, long now) {
|
private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
|
||||||
this.metadataFetchInProgress = false;
|
this.metadataFetchInProgress = false;
|
||||||
MetadataResponse response = new MetadataResponse(body);
|
MetadataResponse response = new MetadataResponse(body);
|
||||||
Cluster cluster = response.cluster();
|
Cluster cluster = response.cluster();
|
||||||
|
@ -429,35 +440,30 @@ public class Sender implements Runnable {
|
||||||
if (cluster.nodes().size() > 0)
|
if (cluster.nodes().size() > 0)
|
||||||
this.metadata.update(cluster, now);
|
this.metadata.update(cluster, now);
|
||||||
else
|
else
|
||||||
log.trace("Ignoring empty metadata response.");
|
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a produce response
|
* Handle a produce response
|
||||||
*/
|
*/
|
||||||
private void handleProduceResponse(InFlightRequest request, Struct response, long now) {
|
private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long now) {
|
||||||
for (Object topicResponse : (Object[]) response.get("responses")) {
|
ProduceResponse pr = new ProduceResponse(body);
|
||||||
Struct topicRespStruct = (Struct) topicResponse;
|
for (Map<TopicPartition, ProduceResponse.PartitionResponse> responses : pr.responses().values()) {
|
||||||
String topic = (String) topicRespStruct.get("topic");
|
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : responses.entrySet()) {
|
||||||
for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
|
TopicPartition tp = entry.getKey();
|
||||||
Struct partRespStruct = (Struct) partResponse;
|
ProduceResponse.PartitionResponse response = entry.getValue();
|
||||||
int partition = (Integer) partRespStruct.get("partition");
|
Errors error = Errors.forCode(response.errorCode);
|
||||||
short errorCode = (Short) partRespStruct.get("error_code");
|
|
||||||
|
|
||||||
// if we got an error we may need to refresh our metadata
|
|
||||||
Errors error = Errors.forCode(errorCode);
|
|
||||||
if (error.exception() instanceof InvalidMetadataException)
|
if (error.exception() instanceof InvalidMetadataException)
|
||||||
metadata.forceUpdate();
|
metadata.forceUpdate();
|
||||||
|
RecordBatch batch = request.batches.get(tp);
|
||||||
long offset = (Long) partRespStruct.get("base_offset");
|
|
||||||
RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
|
|
||||||
if (canRetry(batch, error)) {
|
if (canRetry(batch, error)) {
|
||||||
// retry
|
// retry
|
||||||
log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error);
|
log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
|
||||||
|
header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error);
|
||||||
this.accumulator.reenqueue(batch, now);
|
this.accumulator.reenqueue(batch, now);
|
||||||
} else {
|
} else {
|
||||||
// tell the user the result of their request
|
// tell the user the result of their request
|
||||||
batch.done(offset, error.exception());
|
batch.done(response.baseOffset, error.exception());
|
||||||
this.accumulator.deallocate(batch);
|
this.accumulator.deallocate(batch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.common.protocol;
|
package org.apache.kafka.common.protocol;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Identifiers for all the Kafka APIs
|
* Identifiers for all the Kafka APIs
|
||||||
*/
|
*/
|
||||||
|
@ -29,12 +33,17 @@ public enum ApiKeys {
|
||||||
OFFSET_COMMIT(6, "offset_commit"),
|
OFFSET_COMMIT(6, "offset_commit"),
|
||||||
OFFSET_FETCH(7, "offset_fetch");
|
OFFSET_FETCH(7, "offset_fetch");
|
||||||
|
|
||||||
public static int MAX_API_KEY = 0;
|
private static ApiKeys[] codeToType;
|
||||||
|
public static int MAX_API_KEY = -1;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
for (ApiKeys key : ApiKeys.values()) {
|
for (ApiKeys key : ApiKeys.values()) {
|
||||||
MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
|
MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
|
||||||
}
|
}
|
||||||
|
codeToType = new ApiKeys[MAX_API_KEY+1];
|
||||||
|
for (ApiKeys key : ApiKeys.values()) {
|
||||||
|
codeToType[key.id] = key;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** the perminant and immutable id of an API--this can't change ever */
|
/** the perminant and immutable id of an API--this can't change ever */
|
||||||
|
@ -48,4 +57,7 @@ public enum ApiKeys {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static ApiKeys forId(int id) {
|
||||||
|
return codeToType[id];
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1,3 +1,15 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
|
||||||
|
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
package org.apache.kafka.common.requests;
|
package org.apache.kafka.common.requests;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -1,3 +1,15 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
|
||||||
|
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
package org.apache.kafka.common.requests;
|
package org.apache.kafka.common.requests;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
|
||||||
|
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.kafka.common.requests;
|
||||||
|
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.apache.kafka.common.protocol.types.Struct;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class ProduceResponse {
|
||||||
|
public class PartitionResponse {
|
||||||
|
public int partitionId;
|
||||||
|
public short errorCode;
|
||||||
|
public long baseOffset;
|
||||||
|
|
||||||
|
public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
|
||||||
|
this.partitionId = partitionId;
|
||||||
|
this.errorCode = errorCode;
|
||||||
|
this.baseOffset = baseOffset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Map<String, Map<TopicPartition, PartitionResponse>> responses;
|
||||||
|
|
||||||
|
public ProduceResponse(Struct struct) {
|
||||||
|
responses = new HashMap<String, Map<TopicPartition, PartitionResponse>>();
|
||||||
|
for (Object topicResponse : (Object[]) struct.get("responses")) {
|
||||||
|
Struct topicRespStruct = (Struct) topicResponse;
|
||||||
|
String topic = (String) topicRespStruct.get("topic");
|
||||||
|
Map<TopicPartition, PartitionResponse> topicResponses = new HashMap<TopicPartition, PartitionResponse>();
|
||||||
|
for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
|
||||||
|
Struct partRespStruct = (Struct) partResponse;
|
||||||
|
int partition = (Integer) partRespStruct.get("partition");
|
||||||
|
short errorCode = (Short) partRespStruct.get("error_code");
|
||||||
|
long offset = (Long) partRespStruct.get("base_offset");
|
||||||
|
TopicPartition tp = new TopicPartition(topic, partition);
|
||||||
|
topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset));
|
||||||
|
}
|
||||||
|
responses.put(topic, topicResponses);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Map<TopicPartition, PartitionResponse>> responses() {
|
||||||
|
return this.responses;
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,7 +44,7 @@ public class RecordAccumulatorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testFull() throws Exception {
|
public void testFull() throws Exception {
|
||||||
long now = time.milliseconds();
|
long now = time.milliseconds();
|
||||||
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
|
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
|
||||||
int appends = 1024 / msgSize;
|
int appends = 1024 / msgSize;
|
||||||
for (int i = 0; i < appends; i++) {
|
for (int i = 0; i < appends; i++) {
|
||||||
accum.append(tp, key, value, CompressionType.NONE, null);
|
accum.append(tp, key, value, CompressionType.NONE, null);
|
||||||
|
@ -67,7 +67,7 @@ public class RecordAccumulatorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testAppendLarge() throws Exception {
|
public void testAppendLarge() throws Exception {
|
||||||
int batchSize = 512;
|
int batchSize = 512;
|
||||||
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, false, metrics, time);
|
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
|
||||||
accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
|
accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
|
||||||
assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
|
assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ public class RecordAccumulatorTest {
|
||||||
@Test
|
@Test
|
||||||
public void testLinger() throws Exception {
|
public void testLinger() throws Exception {
|
||||||
long lingerMs = 10L;
|
long lingerMs = 10L;
|
||||||
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, false, metrics, time);
|
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
|
||||||
accum.append(tp, key, value, CompressionType.NONE, null);
|
accum.append(tp, key, value, CompressionType.NONE, null);
|
||||||
assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
|
assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
|
||||||
time.sleep(10);
|
time.sleep(10);
|
||||||
|
@ -92,7 +92,7 @@ public class RecordAccumulatorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPartialDrain() throws Exception {
|
public void testPartialDrain() throws Exception {
|
||||||
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
|
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
|
||||||
int appends = 1024 / msgSize + 1;
|
int appends = 1024 / msgSize + 1;
|
||||||
List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
|
List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
|
||||||
for (TopicPartition tp : partitions) {
|
for (TopicPartition tp : partitions) {
|
||||||
|
@ -110,7 +110,7 @@ public class RecordAccumulatorTest {
|
||||||
final int numThreads = 5;
|
final int numThreads = 5;
|
||||||
final int msgs = 10000;
|
final int msgs = 10000;
|
||||||
final int numParts = 10;
|
final int numParts = 10;
|
||||||
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, true, metrics, time);
|
final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
|
||||||
List<Thread> threads = new ArrayList<Thread>();
|
List<Thread> threads = new ArrayList<Thread>();
|
||||||
for (int i = 0; i < numThreads; i++) {
|
for (int i = 0; i < numThreads; i++) {
|
||||||
threads.add(new Thread() {
|
threads.add(new Thread() {
|
||||||
|
|
|
@ -58,7 +58,7 @@ public class SenderTest {
|
||||||
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
|
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
|
||||||
private Cluster cluster = TestUtils.singletonCluster("test", 1);
|
private Cluster cluster = TestUtils.singletonCluster("test", 1);
|
||||||
private Metrics metrics = new Metrics(time);
|
private Metrics metrics = new Metrics(time);
|
||||||
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
|
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time);
|
||||||
private Sender sender = new Sender(selector,
|
private Sender sender = new Sender(selector,
|
||||||
metadata,
|
metadata,
|
||||||
this.accumulator,
|
this.accumulator,
|
||||||
|
|
|
@ -337,7 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
|
||||||
inLock(controllerContext.controllerLock) {
|
inLock(controllerContext.controllerLock) {
|
||||||
if (config.autoLeaderRebalanceEnable)
|
if (config.autoLeaderRebalanceEnable)
|
||||||
autoRebalanceScheduler.shutdown()
|
autoRebalanceScheduler.shutdown()
|
||||||
deleteTopicManager.shutdown()
|
if (deleteTopicManager != null)
|
||||||
|
deleteTopicManager.shutdown()
|
||||||
Utils.unregisterMBean(KafkaController.MBeanName)
|
Utils.unregisterMBean(KafkaController.MBeanName)
|
||||||
partitionStateMachine.shutdown()
|
partitionStateMachine.shutdown()
|
||||||
replicaStateMachine.shutdown()
|
replicaStateMachine.shutdown()
|
||||||
|
@ -647,8 +648,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
|
||||||
if(controllerContext.controllerChannelManager != null) {
|
if(controllerContext.controllerChannelManager != null) {
|
||||||
controllerContext.controllerChannelManager.shutdown()
|
controllerContext.controllerChannelManager.shutdown()
|
||||||
controllerContext.controllerChannelManager = null
|
controllerContext.controllerChannelManager = null
|
||||||
info("Controller shutdown complete")
|
|
||||||
}
|
}
|
||||||
|
info("Controller shutdown complete")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,7 +162,7 @@ class LogManager(val logDirs: Array[File],
|
||||||
* Close all the logs
|
* Close all the logs
|
||||||
*/
|
*/
|
||||||
def shutdown() {
|
def shutdown() {
|
||||||
debug("Shutting down.")
|
info("Shutting down.")
|
||||||
try {
|
try {
|
||||||
// stop the cleaner first
|
// stop the cleaner first
|
||||||
if(cleaner != null)
|
if(cleaner != null)
|
||||||
|
@ -179,7 +179,7 @@ class LogManager(val logDirs: Array[File],
|
||||||
// regardless of whether the close succeeded, we need to unlock the data directories
|
// regardless of whether the close succeeded, we need to unlock the data directories
|
||||||
dirLocks.foreach(_.destroy())
|
dirLocks.foreach(_.destroy())
|
||||||
}
|
}
|
||||||
debug("Shutdown complete.")
|
info("Shutdown complete.")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -26,7 +26,7 @@ import java.lang.Integer
|
||||||
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
|
import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
|
||||||
|
|
||||||
import kafka.server.{KafkaConfig, KafkaServer}
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
import kafka.utils.{Utils, TestUtils}
|
import kafka.utils.{ShutdownableThread, Utils, TestUtils}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import kafka.consumer.SimpleConsumer
|
import kafka.consumer.SimpleConsumer
|
||||||
|
|
||||||
|
@ -267,18 +267,73 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
|
||||||
def testBrokerFailure() {
|
def testBrokerFailure() {
|
||||||
// create topic
|
// create topic
|
||||||
val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
|
val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
|
||||||
val leader = leaders(0)
|
val partition = 0
|
||||||
|
var leader = leaders(partition)
|
||||||
assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
|
assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
|
||||||
|
|
||||||
val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
|
val scheduler = new ProducerScheduler()
|
||||||
assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L)
|
scheduler.start
|
||||||
|
|
||||||
// shutdown broker
|
// rolling bounce brokers
|
||||||
val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
|
for (i <- 0 until 5) {
|
||||||
serverToShutdown.shutdown()
|
server1.shutdown()
|
||||||
serverToShutdown.awaitShutdown()
|
server1.awaitShutdown()
|
||||||
|
server1.startup
|
||||||
|
|
||||||
// send the message again, it should still succeed due-to retry
|
Thread.sleep(2000)
|
||||||
assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L)
|
|
||||||
|
server2.shutdown()
|
||||||
|
server2.awaitShutdown()
|
||||||
|
server2.startup
|
||||||
|
|
||||||
|
Thread.sleep(2000)
|
||||||
|
|
||||||
|
assertTrue(scheduler.failed == false)
|
||||||
|
}
|
||||||
|
|
||||||
|
scheduler.shutdown
|
||||||
|
leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500)
|
||||||
|
|
||||||
|
val fetchResponse = if(leader.get == server1.config.brokerId) {
|
||||||
|
consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
|
||||||
|
} else {
|
||||||
|
consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
val messages = fetchResponse.iterator.toList.map(_.message)
|
||||||
|
val uniqueMessages = messages.toSet
|
||||||
|
val uniqueMessageSize = uniqueMessages.size
|
||||||
|
|
||||||
|
assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
|
||||||
|
{
|
||||||
|
val numRecords = 1000
|
||||||
|
var sent = 0
|
||||||
|
var failed = false
|
||||||
|
|
||||||
|
val producerProps = new Properties()
|
||||||
|
producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
|
||||||
|
producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString)
|
||||||
|
producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
|
||||||
|
producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
|
||||||
|
producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
|
||||||
|
|
||||||
|
val producer = new KafkaProducer(producerProps)
|
||||||
|
|
||||||
|
override def doWork(): Unit = {
|
||||||
|
val responses =
|
||||||
|
for (i <- sent+1 to sent+numRecords)
|
||||||
|
yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes))
|
||||||
|
val futures = responses.toList
|
||||||
|
|
||||||
|
try {
|
||||||
|
futures.map(_.get)
|
||||||
|
sent += numRecords
|
||||||
|
} catch {
|
||||||
|
case e : Exception => failed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue