kafka-1302; cleanup logging in new producer; reviewed by Jay Kreps, Guozhang Wang and Neha Narkhede

This commit is contained in:
Jun Rao 2014-03-13 14:25:37 -07:00
parent c124bbbb6c
commit 84a3a9a3d9
7 changed files with 98 additions and 34 deletions

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ErrorLoggingCallback implements Callback {
private static final Logger log = LoggerFactory.getLogger(ErrorLoggingCallback.class);
private byte[] key;
private byte[] value;
private boolean logAsString;
public ErrorLoggingCallback(byte[] key, byte[] value, boolean logAsString) {
this.key = key;
this.value = value;
this.logAsString = logAsString;
}
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
String keyString = (key == null) ? "null" :
logAsString ? new String(key) : key.length + " bytes";
String valueString = (value == null) ? "null" :
logAsString ? new String(value) : value.length + " bytes";
log.error("Error when sending message with key: " + keyString + ", value: " + valueString +
" with error " + e.getMessage());
}
}
}

View File

@ -197,7 +197,7 @@ public class Sender implements Runnable {
if (ready.size() > 0) { if (ready.size() > 0) {
log.trace("Partitions with complete batches: {}", ready); log.trace("Partitions with complete batches: {}", ready);
log.trace("Partitions ready to initiate a request: {}", sendable); log.trace("Partitions ready to initiate a request: {}", sendable);
log.trace("Created {} requests: {}", requests.size(), requests); log.trace("Created {} produce requests: {}", requests.size(), requests);
} }
for (int i = 0; i < requests.size(); i++) { for (int i = 0; i < requests.size(); i++) {
@ -233,11 +233,11 @@ public class Sender implements Runnable {
if (nodeStates.isConnected(node.id())) { if (nodeStates.isConnected(node.id())) {
Set<String> topics = metadata.topics(); Set<String> topics = metadata.topics();
log.debug("Sending metadata update request for topics {} to node {}", topics, node.id());
this.metadataFetchInProgress = true; this.metadataFetchInProgress = true;
InFlightRequest request = metadataRequest(node.id(), topics); InFlightRequest metadataRequest = metadataRequest(node.id(), topics);
sends.add(request.request); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
this.inFlightRequests.add(request); sends.add(metadataRequest.request);
this.inFlightRequests.add(metadataRequest);
} else if (nodeStates.canConnect(node.id(), now)) { } else if (nodeStates.canConnect(node.id(), now)) {
// we don't have a connection to this node right now, make one // we don't have a connection to this node right now, make one
initiateConnect(node, now); initiateConnect(node, now);
@ -345,6 +345,7 @@ public class Sender implements Runnable {
nodeStates.disconnected(node); nodeStates.disconnected(node);
log.debug("Node {} disconnected.", node); log.debug("Node {} disconnected.", node);
for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, node);
ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey()); ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
switch (requestKey) { switch (requestKey) {
case PRODUCE: case PRODUCE:

View File

@ -230,9 +230,20 @@ public class Struct {
StringBuilder b = new StringBuilder(); StringBuilder b = new StringBuilder();
b.append('{'); b.append('{');
for (int i = 0; i < this.values.length; i++) { for (int i = 0; i < this.values.length; i++) {
b.append(this.schema.get(i).name); Field f = this.schema.get(i);
b.append(f.name);
b.append('='); b.append('=');
b.append(this.values[i]); if (f.type() instanceof ArrayOf) {
Object[] arrayValue = (Object[]) this.values[i];
b.append('[');
for (int j = 0; j < arrayValue.length; j++) {
b.append(arrayValue[j]);
if (j < arrayValue.length - 1)
b.append(',');
}
b.append(']');
} else
b.append(this.values[i]);
if (i < this.values.length - 1) if (i < this.values.length - 1)
b.append(','); b.append(',');
} }

View File

@ -29,6 +29,16 @@ public class ProduceResponse {
this.errorCode = errorCode; this.errorCode = errorCode;
this.baseOffset = baseOffset; this.baseOffset = baseOffset;
} }
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append('{');
b.append("pid: " + partitionId);
b.append(",error: " + errorCode);
b.append(",offset: " + baseOffset);
b.append('}');
return b.toString();
}
} }
private final Map<String, Map<TopicPartition, PartitionResponse>> responses; private final Map<String, Map<TopicPartition, PartitionResponse>> responses;
@ -54,4 +64,22 @@ public class ProduceResponse {
public Map<String, Map<TopicPartition, PartitionResponse>> responses() { public Map<String, Map<TopicPartition, PartitionResponse>> responses() {
return this.responses; return this.responses;
} }
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append('{');
boolean isFirst = true;
for (Map<TopicPartition, PartitionResponse> response : responses.values()) {
for (Map.Entry<TopicPartition, PartitionResponse> entry : response.entrySet()) {
if (isFirst)
isFirst = false;
else
b.append(',');
b.append(entry.getKey() + " : " + entry.getValue());
}
}
b.append('}');
return b.toString();
}
} }

View File

@ -24,6 +24,7 @@ import kafka.consumer._
import collection.mutable.ListBuffer import collection.mutable.ListBuffer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer} import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
object MirrorMaker extends Logging { object MirrorMaker extends Logging {
@ -168,10 +169,12 @@ object MirrorMaker extends Logging {
val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size
trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId)) trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId))
val producer = producers(producerId) val producer = producers(producerId)
producer.send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) producer.send(producerRecord,
new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false))
} else { } else {
val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size) val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
producers(producerId).send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value())) producers(producerId).send(producerRecord,
new ErrorLoggingCallback(producerRecord.key(), producerRecord.value(), false))
trace("Sent message to producer " + producerId) trace("Sent message to producer " + producerId)
} }
} }

View File

@ -25,12 +25,10 @@ import java.util.concurrent.locks.Lock
import java.lang.management._ import java.lang.management._
import javax.management._ import javax.management._
import scala.collection._ import scala.collection._
import mutable.ListBuffer
import scala.collection.mutable import scala.collection.mutable
import java.util.Properties import java.util.Properties
import kafka.common.KafkaException import kafka.common.KafkaException
import kafka.common.KafkaStorageException import kafka.common.KafkaStorageException
import org.apache.kafka.clients.producer.{RecordMetadata, Callback}
/** /**
@ -541,25 +539,4 @@ object Utils extends Logging {
lock.unlock() lock.unlock()
} }
} }
def errorLoggingCallback(key: Array[Byte], value: Array[Byte], logAsString: Boolean = false) = {
new Callback() {
def onCompletion(metadata: RecordMetadata, e: Exception) {
if (e != null) {
val keyString = if (key == null)
"null"
else {
if (logAsString) new String(key) else key.length + " bytes"
}
val valueString = if (value == null)
"null"
else {
if (logAsString) new String(value) else value.length + " bytes"
}
error("Error when sending message with key: " + keyString + ", value: " + valueString +
" with exception " + e.getMessage)
}
}
}
}
} }

View File

@ -26,9 +26,10 @@ import java.text.SimpleDateFormat
import kafka.serializer._ import kafka.serializer._
import java.util._ import java.util._
import collection.immutable.List import collection.immutable.List
import kafka.utils.{ VerifiableProperties, Logging, Utils } import kafka.utils.{VerifiableProperties, Logging, Utils}
import kafka.metrics.KafkaMetricsReporter import kafka.metrics.KafkaMetricsReporter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
/** /**
* Load test for the producer * Load test for the producer
@ -219,7 +220,7 @@ object ProducerPerformance extends Logging {
this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get() this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get()
} else { } else {
this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes), this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes),
Utils.errorLoggingCallback(null, bytes, if (config.seqIdMode) true else false)) new ErrorLoggingCallback(null, bytes, if (config.seqIdMode) true else false))
} }
} }