KAFKA-1250 Add logging to new producer.

This commit is contained in:
Jay Kreps 2014-02-20 20:17:01 -08:00
parent a810b8ecbe
commit f1a53b972e
18 changed files with 291 additions and 173 deletions

View File

@ -310,6 +310,7 @@ project(':clients') {
archivesBaseName = "kafka-clients" archivesBaseName = "kafka-clients"
dependencies { dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
testCompile 'com.novocode:junit-interface:0.9' testCompile 'com.novocode:junit-interface:0.9'
} }

View File

@ -45,6 +45,8 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A Kafka client that publishes records to the Kafka cluster. * A Kafka client that publishes records to the Kafka cluster.
@ -56,6 +58,8 @@ import org.apache.kafka.common.utils.SystemTime;
*/ */
public class KafkaProducer implements Producer { public class KafkaProducer implements Producer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private final Partitioner partitioner; private final Partitioner partitioner;
private final int maxRequestSize; private final int maxRequestSize;
private final long metadataFetchTimeoutMs; private final long metadataFetchTimeoutMs;
@ -85,6 +89,7 @@ public class KafkaProducer implements Producer {
} }
private KafkaProducer(ProducerConfig config) { private KafkaProducer(ProducerConfig config) {
log.trace("Starting the Kafka producer");
this.metrics = new Metrics(new MetricConfig(), this.metrics = new Metrics(new MetricConfig(),
Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")), Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
new SystemTime()); new SystemTime());
@ -114,8 +119,10 @@ public class KafkaProducer implements Producer {
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
new SystemTime()); new SystemTime());
this.ioThread = new KafkaThread("kafka-network-thread", this.sender, true); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
this.ioThread.start(); this.ioThread.start();
config.logUnused();
log.debug("Kafka producer started");
} }
private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
@ -124,7 +131,7 @@ public class KafkaProducer implements Producer {
if (url != null && url.length() > 0) { if (url != null && url.length() > 0) {
String[] pieces = url.split(":"); String[] pieces = url.split(":");
if (pieces.length != 2) if (pieces.length != 2)
throw new ConfigException("Invalid url in metadata.broker.list: " + url); throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url);
try { try {
InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1])); InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
if (address.isUnresolved()) if (address.isUnresolved())
@ -215,12 +222,14 @@ public class KafkaProducer implements Producer {
int partition = partitioner.partition(record, cluster); int partition = partitioner.partition(record, cluster);
ensureValidSize(record.key(), record.value()); ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition); TopicPartition tp = new TopicPartition(record.topic(), partition);
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup(); this.sender.wakeup();
return future; return future;
// For API exceptions return them in the future; // For API exceptions return them in the future;
// for other exceptions throw directly // for other exceptions throw directly
} catch (ApiException e) { } catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null) if (callback != null)
callback.onCompletion(null, e); callback.onCompletion(null, e);
return new FutureFailure(e); return new FutureFailure(e);
@ -260,6 +269,7 @@ public class KafkaProducer implements Producer {
*/ */
@Override @Override
public void close() { public void close() {
log.trace("Closing the Kafka producer.");
this.sender.initiateClose(); this.sender.initiateClose();
try { try {
this.ioThread.join(); this.ioThread.join();
@ -267,6 +277,7 @@ public class KafkaProducer implements Producer {
throw new KafkaException(e); throw new KafkaException(e);
} }
this.metrics.close(); this.metrics.close();
log.debug("The Kafka producer has closed.");
} }
private static class FutureFailure implements Future<RecordMetadata> { private static class FutureFailure implements Future<RecordMetadata> {

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.clients.producer; package org.apache.kafka.clients.producer;
@ -97,4 +93,10 @@ public final class ProducerRecord {
return partition; return partition;
} }
@Override
public String toString() {
String key = this.key == null ? "null" : ("byte[" + this.key.length + "]");
String value = this.value == null ? "null" : ("byte[" + this.value.length + "]");
return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value;
}
} }

View File

@ -19,6 +19,8 @@ import java.util.Set;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A class encapsulating some of the logic around metadata. * A class encapsulating some of the logic around metadata.
@ -30,6 +32,8 @@ import org.apache.kafka.common.errors.TimeoutException;
*/ */
public final class Metadata { public final class Metadata {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
private final long refreshBackoffMs; private final long refreshBackoffMs;
private final long metadataExpireMs; private final long metadataExpireMs;
private long lastRefresh; private long lastRefresh;
@ -81,6 +85,7 @@ public final class Metadata {
topics.add(topic); topics.add(topic);
forceUpdate = true; forceUpdate = true;
try { try {
log.trace("Requesting metadata update for topic {}.", topic);
wait(maxWaitMs); wait(maxWaitMs);
} catch (InterruptedException e) { /* this is fine, just try again */ } catch (InterruptedException e) { /* this is fine, just try again */
} }
@ -127,6 +132,7 @@ public final class Metadata {
this.lastRefresh = now; this.lastRefresh = now;
this.cluster = cluster; this.cluster = cluster;
notifyAll(); notifyAll();
log.debug("Updated cluster metadata to {}", cluster);
} }
/** /**

View File

@ -33,6 +33,8 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords} * This class acts as a queue that accumulates records into {@link org.apache.kafka.common.record.MemoryRecords}
@ -43,6 +45,8 @@ import org.apache.kafka.common.utils.Utils;
*/ */
public final class RecordAccumulator { public final class RecordAccumulator {
private static final Logger log = LoggerFactory.getLogger(RecordAccumulator.class);
private volatile boolean closed; private volatile boolean closed;
private int drainIndex; private int drainIndex;
private final int batchSize; private final int batchSize;
@ -126,6 +130,7 @@ public final class RecordAccumulator {
// we don't have an in-progress record batch try to allocate a new batch // we don't have an in-progress record batch try to allocate a new batch
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
ByteBuffer buffer = free.allocate(size); ByteBuffer buffer = free.allocate(size);
synchronized (dq) { synchronized (dq) {
RecordBatch first = dq.peekLast(); RecordBatch first = dq.peekLast();

View File

@ -16,10 +16,11 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A batch of records that is or will be sent. * A batch of records that is or will be sent.
@ -27,6 +28,9 @@ import org.apache.kafka.common.record.MemoryRecords;
* This class is not thread safe and external synchronization must be used when modifying it * This class is not thread safe and external synchronization must be used when modifying it
*/ */
public final class RecordBatch { public final class RecordBatch {
private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
public int recordCount = 0; public int recordCount = 0;
public volatile int attempts = 0; public volatile int attempts = 0;
public final long created; public final long created;
@ -64,11 +68,15 @@ public final class RecordBatch {
/** /**
* Complete the request * Complete the request
* *
* @param offset The offset * @param baseOffset The base offset of the messages assigned by the server
* @param errorCode The error code or 0 if no error * @param errorCode The error code or 0 if no error
*/ */
public void done(long offset, RuntimeException exception) { public void done(long baseOffset, RuntimeException exception) {
this.produceFuture.done(topicPartition, offset, exception); this.produceFuture.done(topicPartition, baseOffset, exception);
log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
topicPartition,
baseOffset,
exception);
// execute callbacks // execute callbacks
for (int i = 0; i < this.thunks.size(); i++) { for (int i = 0; i < this.thunks.size(); i++) {
try { try {
@ -78,7 +86,7 @@ public final class RecordBatch {
else else
thunk.callback.onCompletion(null, exception); thunk.callback.onCompletion(null, exception);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
} }
} }
} }
@ -95,4 +103,9 @@ public final class RecordBatch {
this.future = future; this.future = future;
} }
} }
@Override
public String toString() {
return "RecordBatch(topicPartition=" + topicPartition + ", recordCount=" + recordCount + ")";
}
} }

View File

@ -45,6 +45,8 @@ import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
@ -52,6 +54,8 @@ import org.apache.kafka.common.utils.Utils;
*/ */
public class Sender implements Runnable { public class Sender implements Runnable {
private static final Logger log = LoggerFactory.getLogger(Sender.class);
/* the state of each nodes connection */ /* the state of each nodes connection */
private final NodeStates nodeStates; private final NodeStates nodeStates;
@ -138,15 +142,19 @@ public class Sender implements Runnable {
* The main run loop for the sender thread * The main run loop for the sender thread
*/ */
public void run() { public void run() {
log.trace("Starting Kafka producer I/O thread.");
// main loop, runs until close is called // main loop, runs until close is called
while (running) { while (running) {
try { try {
run(time.milliseconds()); run(time.milliseconds());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Uncaught error in kafka producer I/O thread: ", e);
} }
} }
log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment, // requests in the accumulator or waiting for acknowledgment,
// wait until these are completed. // wait until these are completed.
@ -155,12 +163,14 @@ public class Sender implements Runnable {
try { try {
unsent = run(time.milliseconds()); unsent = run(time.milliseconds());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Uncaught error in kafka producer I/O thread: ", e);
} }
} while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0); } while (unsent > 0 || this.inFlightRequests.totalInFlightRequests() > 0);
// close all the connections // close all the connections
this.selector.close(); this.selector.close();
log.trace("Shutdown of Kafka producer I/O thread has completed.");
} }
/** /**
@ -184,6 +194,13 @@ public class Sender implements Runnable {
// create produce requests // create produce requests
List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize); List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
List<InFlightRequest> requests = collate(cluster, batches); List<InFlightRequest> requests = collate(cluster, batches);
if (ready.size() > 0) {
log.trace("Partitions with complete batches: {}", ready);
log.trace("Partitions ready to initiate a request: {}", sendable);
log.trace("Created {} requests: {}", requests.size(), requests);
}
for (int i = 0; i < requests.size(); i++) { for (int i = 0; i < requests.size(); i++) {
InFlightRequest request = requests.get(i); InFlightRequest request = requests.get(i);
this.inFlightRequests.add(request); this.inFlightRequests.add(request);
@ -194,7 +211,7 @@ public class Sender implements Runnable {
try { try {
this.selector.poll(100L, sends); this.selector.poll(100L, sends);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error("Unexpected error during I/O in producer network thread", e);
} }
// handle responses, connections, and disconnections // handle responses, connections, and disconnections
@ -218,8 +235,10 @@ public class Sender implements Runnable {
return; return;
if (nodeStates.isConnected(node.id())) { if (nodeStates.isConnected(node.id())) {
Set<String> topics = metadata.topics();
log.debug("Sending metadata update request for topics {} to node {}", topics, node.id());
this.metadataFetchInProgress = true; this.metadataFetchInProgress = true;
InFlightRequest request = metadataRequest(node.id(), metadata.topics()); InFlightRequest request = metadataRequest(node.id(), topics);
sends.add(request.request); sends.add(request.request);
this.inFlightRequests.add(request); this.inFlightRequests.add(request);
} else if (nodeStates.canConnect(node.id(), now)) { } else if (nodeStates.canConnect(node.id(), now)) {
@ -308,6 +327,7 @@ public class Sender implements Runnable {
*/ */
private void initiateConnect(Node node, long now) { private void initiateConnect(Node node, long now) {
try { try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
this.nodeStates.connecting(node.id(), now); this.nodeStates.connecting(node.id(), now);
} catch (IOException e) { } catch (IOException e) {
@ -315,6 +335,7 @@ public class Sender implements Runnable {
nodeStates.disconnected(node.id()); nodeStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */ /* maybe the problem is our metadata, update it */
metadata.forceUpdate(); metadata.forceUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
} }
} }
@ -325,6 +346,7 @@ public class Sender implements Runnable {
// clear out the in-flight requests for the disconnected broker // clear out the in-flight requests for the disconnected broker
for (int node : disconnects) { for (int node : disconnects) {
nodeStates.disconnected(node); nodeStates.disconnected(node);
log.debug("Node {} disconnected.", node);
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
if (request.batches != null) { if (request.batches != null) {
for (RecordBatch batch : request.batches.values()) { for (RecordBatch batch : request.batches.values()) {
@ -347,9 +369,11 @@ public class Sender implements Runnable {
* Record any connections that completed in our node state * Record any connections that completed in our node state
*/ */
private void handleConnects(List<Integer> connects) { private void handleConnects(List<Integer> connects) {
for (Integer id : connects) for (Integer id : connects) {
log.debug("Completed connection to node {}", id);
this.nodeStates.connected(id); this.nodeStates.connected(id);
} }
}
/** /**
* Process completed sends * Process completed sends
@ -359,6 +383,7 @@ public class Sender implements Runnable {
for (NetworkSend send : sends) { for (NetworkSend send : sends) {
Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination()); Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
InFlightRequest request = requests.peekFirst(); InFlightRequest request = requests.peekFirst();
log.trace("Completed send of request to node {}: {}", request.request.destination(), request.request);
if (!request.expectResponse) { if (!request.expectResponse) {
requests.pollFirst(); requests.pollFirst();
if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) { if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
@ -382,14 +407,18 @@ public class Sender implements Runnable {
short apiKey = req.request.header().apiKey(); short apiKey = req.request.header().apiKey();
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request.header(), header); correlate(req.request.header(), header);
if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
handleProduceResponse(req, body, now); handleProduceResponse(req, body, now);
else if (req.request.header().apiKey() == ApiKeys.METADATA.id) } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
.correlationId());
handleMetadataResponse(body, now); handleMetadataResponse(body, now);
else } else {
throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey()); throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
} }
} }
}
private void handleMetadataResponse(Struct body, long now) { private void handleMetadataResponse(Struct body, long now) {
this.metadataFetchInProgress = false; this.metadataFetchInProgress = false;
@ -399,6 +428,8 @@ public class Sender implements Runnable {
// created which means we will get errors and no nodes until it exists // created which means we will get errors and no nodes until it exists
if (cluster.nodes().size() > 0) if (cluster.nodes().size() > 0)
this.metadata.update(cluster, now); this.metadata.update(cluster, now);
else
log.trace("Ignoring empty metadata response.");
} }
/** /**
@ -422,6 +453,7 @@ public class Sender implements Runnable {
RecordBatch batch = request.batches.get(new TopicPartition(topic, partition)); RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
if (canRetry(batch, error)) { if (canRetry(batch, error)) {
// retry // retry
log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error);
this.accumulator.reenqueue(batch, now); this.accumulator.reenqueue(batch, now);
} else { } else {
// tell the user the result of their request // tell the user the result of their request
@ -620,6 +652,11 @@ public class Sender implements Runnable {
this.request = request; this.request = request;
this.expectResponse = expectResponse; this.expectResponse = expectResponse;
} }
@Override
public String toString() {
return "InFlightRequest(expectResponse=" + expectResponse + ", batches=" + batches + ", request=" + request + ")";
}
} }
/** /**

View File

@ -73,9 +73,9 @@ public final class Cluster {
*/ */
public static Cluster bootstrap(List<InetSocketAddress> addresses) { public static Cluster bootstrap(List<InetSocketAddress> addresses) {
List<Node> nodes = new ArrayList<Node>(); List<Node> nodes = new ArrayList<Node>();
int nodeId = Integer.MIN_VALUE; int nodeId = -1;
for (InetSocketAddress address : addresses) for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId++, address.getHostName(), address.getPort())); nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
return new Cluster(nodes, new ArrayList<PartitionInfo>(0)); return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
} }
@ -117,4 +117,9 @@ public final class Cluster {
return this.partitionsByTopic.get(topic); return this.partitionsByTopic.get(topic);
} }
@Override
public String toString() {
return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
}
} }

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common; package org.apache.kafka.common;
@ -86,7 +82,7 @@ public class Node {
@Override @Override
public String toString() { public String toString() {
return "Node(" + id + ", " + host + ", " + port + ")"; return "Node(" + (id < 0 ? "" : id + ", ") + host + ", " + port + ")";
} }
} }

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common; package org.apache.kafka.common;
@ -71,4 +67,29 @@ public class PartitionInfo {
return inSyncReplicas; return inSyncReplicas;
} }
@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s",
topic,
partition,
leader.id(),
fmtNodeIds(replicas),
fmtNodeIds(inSyncReplicas));
}
/* Extract the node ids from each item in the array and format for display */
private String fmtNodeIds(Node[] nodes) {
StringBuilder b = new StringBuilder("[");
for (int i = 0; i < nodes.length - 1; i++) {
b.append(Integer.toString(nodes[i].id()));
b.append(',');
}
if (nodes.length > 0) {
b.append(Integer.toString(nodes[nodes.length - 1].id()));
b.append(',');
}
b.append("]");
return b.toString();
}
} }

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.config; package org.apache.kafka.common.config;
@ -25,7 +21,8 @@ import java.util.Set;
import org.apache.kafka.common.Configurable; import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A convenient base class for configurations to extend. * A convenient base class for configurations to extend.
@ -34,10 +31,17 @@ import org.apache.kafka.common.utils.Utils;
*/ */
public class AbstractConfig { public class AbstractConfig {
private final Logger log = LoggerFactory.getLogger(getClass());
/* configs for which values have been requested, used to detect unused configs */
private final Set<String> used; private final Set<String> used;
private final Map<String, Object> values;
/* the original values passed in by the user */
private final Map<String, ?> originals; private final Map<String, ?> originals;
/* the parsed values */
private final Map<String, Object> values;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public AbstractConfig(ConfigDef definition, Map<?, ?> originals) { public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
/* check that all the keys are really strings */ /* check that all the keys are really strings */
@ -47,6 +51,7 @@ public class AbstractConfig {
this.originals = (Map<String, ?>) originals; this.originals = (Map<String, ?>) originals;
this.values = definition.parse(this.originals); this.values = definition.parse(this.originals);
this.used = Collections.synchronizedSet(new HashSet<String>()); this.used = Collections.synchronizedSet(new HashSet<String>());
logAll();
} }
protected Object get(String key) { protected Object get(String key) {
@ -83,10 +88,30 @@ public class AbstractConfig {
public Set<String> unused() { public Set<String> unused() {
Set<String> keys = new HashSet<String>(originals.keySet()); Set<String> keys = new HashSet<String>(originals.keySet());
keys.remove(used); keys.removeAll(used);
return keys; return keys;
} }
private void logAll() {
StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName());
b.append(" values: ");
b.append(Utils.NL);
for (Map.Entry<String, Object> entry : this.values.entrySet()) {
b.append('\t');
b.append(entry.getKey());
b.append(" = ");
b.append(entry.getValue());
b.append(Utils.NL);
}
log.info(b.toString());
}
public void logUnused() {
for (String key : unused())
log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
}
/** /**
* Get a configured instance of the give class specified by the given configuration key. If the object implements * Get a configured instance of the give class specified by the given configuration key. If the object implements
* Configurable configure it using the configuration. * Configurable configure it using the configuration.

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.metrics; package org.apache.kafka.common.metrics;
@ -36,13 +32,16 @@ import javax.management.ObjectName;
import javax.management.ReflectionException; import javax.management.ReflectionException;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Register metrics in JMX as dynamic mbeans based on the metric names * Register metrics in JMX as dynamic mbeans based on the metric names
*/ */
public class JmxReporter implements MetricsReporter { public class JmxReporter implements MetricsReporter {
private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
private final String prefix; private final String prefix;
private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>(); private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
@ -160,7 +159,7 @@ public class JmxReporter implements MetricsReporter {
list.add(new Attribute(name, getAttribute(name))); list.add(new Attribute(name, getAttribute(name)));
return list; return list;
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); log.error("Error getting JMX attribute: ", e);
return new AttributeList(); return new AttributeList();
} }
} }

View File

@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A selector interface for doing non-blocking multi-connection network I/O. * A selector interface for doing non-blocking multi-connection network I/O.
@ -58,6 +60,8 @@ import org.apache.kafka.common.KafkaException;
*/ */
public class Selector implements Selectable { public class Selector implements Selectable {
private static final Logger log = LoggerFactory.getLogger(Selector.class);
private final java.nio.channels.Selector selector; private final java.nio.channels.Selector selector;
private final Map<Integer, SelectionKey> keys; private final Map<Integer, SelectionKey> keys;
private final List<NetworkSend> completedSends; private final List<NetworkSend> completedSends;
@ -140,17 +144,12 @@ public class Selector implements Selectable {
*/ */
@Override @Override
public void close() { public void close() {
for (SelectionKey key : this.selector.keys()) { for (SelectionKey key : this.selector.keys())
try {
close(key); close(key);
} catch (IOException e) {
e.printStackTrace();
}
}
try { try {
this.selector.close(); this.selector.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error("Exception closing selector:", e);
} }
} }
@ -201,9 +200,7 @@ public class Selector implements Selectable {
Transmissions transmissions = transmissions(key); Transmissions transmissions = transmissions(key);
SocketChannel channel = channel(key); SocketChannel channel = channel(key);
try { try {
/* /* complete any connections that have finished their handshake */
* complete any connections that have finished their handshake
*/
if (key.isConnectable()) { if (key.isConnectable()) {
channel.finishConnect(); channel.finishConnect();
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
@ -222,9 +219,7 @@ public class Selector implements Selectable {
} }
} }
/* /* write to any sockets that have space in their buffer and for which we have data */
* write to any sockets that have space in their buffer and for which we have data
*/
if (key.isWritable()) { if (key.isWritable()) {
transmissions.send.writeTo(channel); transmissions.send.writeTo(channel);
if (transmissions.send.remaining() <= 0) { if (transmissions.send.remaining() <= 0) {
@ -238,7 +233,7 @@ public class Selector implements Selectable {
if (!key.isValid()) if (!key.isValid())
close(key); close(key);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); log.error("Error in I/O: ", e);
close(key); close(key);
} }
} }
@ -294,7 +289,7 @@ public class Selector implements Selectable {
/** /**
* Begin closing this connection * Begin closing this connection
*/ */
private void close(SelectionKey key) throws IOException { private void close(SelectionKey key) {
SocketChannel channel = channel(key); SocketChannel channel = channel(key);
Transmissions trans = transmissions(key); Transmissions trans = transmissions(key);
if (trans != null) { if (trans != null) {
@ -305,8 +300,12 @@ public class Selector implements Selectable {
} }
key.attach(null); key.attach(null);
key.cancel(); key.cancel();
try {
channel.socket().close(); channel.socket().close();
channel.close(); channel.close();
} catch (IOException e) {
log.error("Exception closing connection to node {}:", trans.id, e);
}
} }
/** /**

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
@ -25,7 +21,6 @@ import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
/** /**
* The header for a request in the Kafka protocol * The header for a request in the Kafka protocol
*/ */
@ -82,4 +77,9 @@ public class RequestHeader {
public int sizeOf() { public int sizeOf() {
return header.sizeOf(); return header.sizeOf();
} }
@Override
public String toString() {
return header.toString();
}
} }

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.requests; package org.apache.kafka.common.requests;
@ -21,7 +17,6 @@ import java.nio.ByteBuffer;
import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
/** /**
* A send object for a kafka request * A send object for a kafka request
*/ */
@ -52,4 +47,9 @@ public class RequestSend extends NetworkSend {
return body; return body;
} }
@Override
public String toString() {
return "RequestSend(header=" + header.toString() + ", body=" + body.toString() + ")";
}
} }

View File

@ -1,32 +1,33 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.utils; package org.apache.kafka.common.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A wrapper for Thread that sets things up nicely * A wrapper for Thread that sets things up nicely
*/ */
public class KafkaThread extends Thread { public class KafkaThread extends Thread {
public KafkaThread(String name, Runnable runnable, boolean daemon) { private final Logger log = LoggerFactory.getLogger(getClass());
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name); super(runnable, name);
setDaemon(daemon); setDaemon(daemon);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace(); log.error("Uncaught exception in " + name + ": ", e);
} }
}); });
} }

View File

@ -1,18 +1,14 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.kafka.common.utils; package org.apache.kafka.common.utils;
@ -21,9 +17,10 @@ import java.nio.ByteBuffer;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
public class Utils { public class Utils {
public static String NL = System.getProperty("line.separator");
/** /**
* Turn the given UTF8 byte array into a string * Turn the given UTF8 byte array into a string
* *

View File

@ -474,6 +474,6 @@ class ReplicaManager(val config: KafkaConfig,
info("Shut down") info("Shut down")
replicaFetcherManager.shutdown() replicaFetcherManager.shutdown()
checkpointHighWatermarks() checkpointHighWatermarks()
info("Shutted down completely") info("Shut down completely")
} }
} }