From b2c833aa41cb9a7a6232781b273402042e021607 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 19 Mar 2015 15:52:54 -0700 Subject: [PATCH] KAFKA-1910; Fix two bugs on MemoryRecords and KafkaConsumer; reviewed by Onur Karaman --- .../kafka/clients/consumer/KafkaConsumer.java | 13 ++++++------- .../kafka/clients/consumer/internals/Fetcher.java | 2 +- .../apache/kafka/common/record/MemoryRecords.java | 6 ++++-- .../java/org/apache/kafka/common/record/Record.java | 8 +++++--- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2e246532634..c7bc56c4957 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -354,7 +354,6 @@ public class KafkaConsumer implements Consumer { private final Metrics metrics; private final SubscriptionState subscriptions; private final Metadata metadata; - private final Heartbeat heartbeat; private final long retryBackoffMs; private final boolean autoCommit; private final long autoCommitIntervalMs; @@ -446,7 +445,6 @@ public class KafkaConsumer implements Consumer { else this.rebalanceCallback = callback; this.time = new SystemTime(); - this.heartbeat = new Heartbeat(config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), time.milliseconds()); this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); this.lastCommitAttemptMs = time.milliseconds(); @@ -538,7 +536,7 @@ public class KafkaConsumer implements Consumer { @Override public synchronized void subscribe(String... topics) { ensureNotClosed(); - log.debug("Subscribed to topic(s): ", Utils.join(topics, ", ")); + log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); for (String topic : topics) this.subscriptions.subscribe(topic); metadata.addTopics(topics); @@ -555,7 +553,7 @@ public class KafkaConsumer implements Consumer { @Override public synchronized void subscribe(TopicPartition... partitions) { ensureNotClosed(); - log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", ")); + log.debug("Subscribed to partitions(s): {}", Utils.join(partitions, ", ")); for (TopicPartition tp : partitions) { this.subscriptions.subscribe(tp); metadata.addTopics(tp.topic()); @@ -570,7 +568,7 @@ public class KafkaConsumer implements Consumer { */ public synchronized void unsubscribe(String... topics) { ensureNotClosed(); - log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", ")); + log.debug("Unsubscribed from topic(s): {}", Utils.join(topics, ", ")); // throw an exception if the topic was never subscribed to for (String topic : topics) this.subscriptions.unsubscribe(topic); @@ -584,7 +582,7 @@ public class KafkaConsumer implements Consumer { */ public synchronized void unsubscribe(TopicPartition... partitions) { ensureNotClosed(); - log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", ")); + log.debug("Unsubscribed from partitions(s): {}", Utils.join(partitions, ", ")); // throw an exception if the partition was never subscribed to for (TopicPartition partition : partitions) this.subscriptions.unsubscribe(partition); @@ -878,7 +876,8 @@ public class KafkaConsumer implements Consumer { // if the committed position is unknown reset the position fetcher.resetOffset(tp); } else { - log.debug("Resetting offset for partition {} to committed offset"); + log.debug("Resetting offset for partition {} to the committed offset {}", + tp, subscriptions.committed(tp)); subscriptions.seek(tp, subscriptions.committed(tp)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 8b71fbad5c4..ef9dd5238fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -258,7 +258,7 @@ public class Fetcher { Node node = cluster.leaderFor(partition); // if there is a leader and no in-flight requests, issue a new fetch if (node != null && this.client.inFlightRequestCount(node.id()) == 0) { - Map fetch = fetchable.get(node); + Map fetch = fetchable.get(node.id()); if (fetch == null) { fetch = new HashMap(); fetchable.put(node.id(), fetch); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index c049bff305a..b2db2403868 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -234,7 +234,7 @@ public class MemoryRecords implements Records { rec.limit(size); } else { byte[] recordBuffer = new byte[size]; - stream.read(recordBuffer, 0, size); + stream.readFully(recordBuffer, 0, size); rec = ByteBuffer.wrap(recordBuffer); } LogEntry entry = new LogEntry(offset, new Record(rec)); @@ -245,7 +245,9 @@ public class MemoryRecords implements Records { return entry; } else { // init the inner iterator with the value payload of the message, - // which will de-compress the payload to a set of messages + // which will de-compress the payload to a set of messages; + // since we assume nested compression is not allowed, the deep iterator + // would not try to further decompress underlying messages ByteBuffer value = entry.record().value(); innerIter = new RecordsIterator(value, compression, true); return innerIter.next(); diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 10df9fd8d3f..197d60e5c1b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -317,12 +317,14 @@ public final class Record { } public String toString() { - return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)", + return String.format("Record(magic = %d, attributes = %d, compression = %s, crc = %d, key = %d bytes, value = %d bytes)", magic(), attributes(), + compressionType(), checksum(), - key().limit(), - value().limit()); + key() == null ? 0 : key().limit(), + value() == null ? 0: value().limit()); + } public boolean equals(Object other) {