KAFKA-1910; Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman

This commit is contained in:
Guozhang Wang 2015-03-19 15:52:54 -07:00
parent 82789e7519
commit b2c833aa41
4 changed files with 16 additions and 13 deletions

View File

@ -354,7 +354,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
private final Heartbeat heartbeat;
private final long retryBackoffMs;
private final boolean autoCommit;
private final long autoCommitIntervalMs;
@ -446,7 +445,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
else
this.rebalanceCallback = callback;
this.time = new SystemTime();
this.heartbeat = new Heartbeat(config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), time.milliseconds());
this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
this.lastCommitAttemptMs = time.milliseconds();
@ -538,7 +536,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized void subscribe(String... topics) {
ensureNotClosed();
log.debug("Subscribed to topic(s): ", Utils.join(topics, ", "));
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
for (String topic : topics)
this.subscriptions.subscribe(topic);
metadata.addTopics(topics);
@ -555,7 +553,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public synchronized void subscribe(TopicPartition... partitions) {
ensureNotClosed();
log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", "));
log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", "));
for (TopicPartition tp : partitions) {
this.subscriptions.subscribe(tp);
metadata.addTopics(tp.topic());
@ -570,7 +568,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
public synchronized void unsubscribe(String... topics) {
ensureNotClosed();
log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", "));
log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", "));
// throw an exception if the topic was never subscribed to
for (String topic : topics)
this.subscriptions.unsubscribe(topic);
@ -584,7 +582,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
public synchronized void unsubscribe(TopicPartition... partitions) {
ensureNotClosed();
log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", "));
log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", "));
// throw an exception if the partition was never subscribed to
for (TopicPartition partition : partitions)
this.subscriptions.unsubscribe(partition);
@ -878,7 +876,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// if the committed position is unknown reset the position
fetcher.resetOffset(tp);
} else {
log.debug("Resetting offset for partition {} to committed offset");
log.debug("Resetting offset for partition {} to the committed offset {}",
tp, subscriptions.committed(tp));
subscriptions.seek(tp, subscriptions.committed(tp));
}
}

View File

@ -258,7 +258,7 @@ public class Fetcher<K, V> {
Node node = cluster.leaderFor(partition);
// if there is a leader and no in-flight requests, issue a new fetch
if (node != null && this.client.inFlightRequestCount(node.id()) == 0) {
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node.id());
if (fetch == null) {
fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
fetchable.put(node.id(), fetch);

View File

@ -234,7 +234,7 @@ public class MemoryRecords implements Records {
rec.limit(size);
} else {
byte[] recordBuffer = new byte[size];
stream.read(recordBuffer, 0, size);
stream.readFully(recordBuffer, 0, size);
rec = ByteBuffer.wrap(recordBuffer);
}
LogEntry entry = new LogEntry(offset, new Record(rec));
@ -245,7 +245,9 @@ public class MemoryRecords implements Records {
return entry;
} else {
// init the inner iterator with the value payload of the message,
// which will de-compress the payload to a set of messages
// which will de-compress the payload to a set of messages;
// since we assume nested compression is not allowed, the deep iterator
// would not try to further decompress underlying messages
ByteBuffer value = entry.record().value();
innerIter = new RecordsIterator(value, compression, true);
return innerIter.next();

View File

@ -317,12 +317,14 @@ public final class Record {
}
public String toString() {
return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)",
magic(),
attributes(),
compressionType(),
checksum(),
key().limit(),
value().limit());
key() == null ? 0 : key().limit(),
value() == null ? 0: value().limit());
}
public boolean equals(Object other) {