mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user
Author: Jason Gustafson <jason@confluent.io> Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Ismael Juma <ismael@juma.me.uk> Closes #1656 from hachikuji/KAFKA-3977
This commit is contained in:
		
							parent
							
								
									d5c821c198
								
							
						
					
					
						commit
						ff557f02ac
					
				|  | @ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException; | ||||||
| import org.apache.kafka.common.errors.InvalidTopicException; | import org.apache.kafka.common.errors.InvalidTopicException; | ||||||
| import org.apache.kafka.common.errors.RecordTooLargeException; | import org.apache.kafka.common.errors.RecordTooLargeException; | ||||||
| import org.apache.kafka.common.errors.RetriableException; | import org.apache.kafka.common.errors.RetriableException; | ||||||
|  | import org.apache.kafka.common.errors.SerializationException; | ||||||
| import org.apache.kafka.common.errors.TimeoutException; | import org.apache.kafka.common.errors.TimeoutException; | ||||||
| import org.apache.kafka.common.errors.TopicAuthorizationException; | import org.apache.kafka.common.errors.TopicAuthorizationException; | ||||||
| import org.apache.kafka.common.metrics.Metrics; | import org.apache.kafka.common.metrics.Metrics; | ||||||
|  | @ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max; | ||||||
| import org.apache.kafka.common.metrics.stats.Rate; | import org.apache.kafka.common.metrics.stats.Rate; | ||||||
| import org.apache.kafka.common.protocol.ApiKeys; | import org.apache.kafka.common.protocol.ApiKeys; | ||||||
| import org.apache.kafka.common.protocol.Errors; | import org.apache.kafka.common.protocol.Errors; | ||||||
|  | import org.apache.kafka.common.record.InvalidRecordException; | ||||||
| import org.apache.kafka.common.record.LogEntry; | import org.apache.kafka.common.record.LogEntry; | ||||||
| import org.apache.kafka.common.record.MemoryRecords; | import org.apache.kafka.common.record.MemoryRecords; | ||||||
|  | import org.apache.kafka.common.record.Record; | ||||||
| import org.apache.kafka.common.record.TimestampType; | import org.apache.kafka.common.record.TimestampType; | ||||||
| import org.apache.kafka.common.requests.FetchRequest; | import org.apache.kafka.common.requests.FetchRequest; | ||||||
| import org.apache.kafka.common.requests.FetchResponse; | import org.apache.kafka.common.requests.FetchResponse; | ||||||
|  | @ -59,7 +62,6 @@ import java.util.Collections; | ||||||
| import java.util.HashMap; | import java.util.HashMap; | ||||||
| import java.util.HashSet; | import java.util.HashSet; | ||||||
| import java.util.Iterator; | import java.util.Iterator; | ||||||
| import java.util.LinkedList; |  | ||||||
| import java.util.List; | import java.util.List; | ||||||
| import java.util.Locale; | import java.util.Locale; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
|  | @ -83,13 +85,11 @@ public class Fetcher<K, V> { | ||||||
|     private final Metadata metadata; |     private final Metadata metadata; | ||||||
|     private final FetchManagerMetrics sensors; |     private final FetchManagerMetrics sensors; | ||||||
|     private final SubscriptionState subscriptions; |     private final SubscriptionState subscriptions; | ||||||
|     private final List<PartitionRecords<K, V>> records; |     private final List<CompletedFetch> completedFetches; | ||||||
|     private final Deserializer<K> keyDeserializer; |     private final Deserializer<K> keyDeserializer; | ||||||
|     private final Deserializer<V> valueDeserializer; |     private final Deserializer<V> valueDeserializer; | ||||||
| 
 | 
 | ||||||
|     private final Map<TopicPartition, Long> offsetOutOfRangePartitions; |     private PartitionRecords<K, V> nextInLineRecords = null; | ||||||
|     private final Set<String> unauthorizedTopics; |  | ||||||
|     private final Map<TopicPartition, Long> recordTooLargePartitions; |  | ||||||
| 
 | 
 | ||||||
|     public Fetcher(ConsumerNetworkClient client, |     public Fetcher(ConsumerNetworkClient client, | ||||||
|                    int minBytes, |                    int minBytes, | ||||||
|  | @ -105,7 +105,6 @@ public class Fetcher<K, V> { | ||||||
|                    String metricGrpPrefix, |                    String metricGrpPrefix, | ||||||
|                    Time time, |                    Time time, | ||||||
|                    long retryBackoffMs) { |                    long retryBackoffMs) { | ||||||
| 
 |  | ||||||
|         this.time = time; |         this.time = time; | ||||||
|         this.client = client; |         this.client = client; | ||||||
|         this.metadata = metadata; |         this.metadata = metadata; | ||||||
|  | @ -115,31 +114,37 @@ public class Fetcher<K, V> { | ||||||
|         this.fetchSize = fetchSize; |         this.fetchSize = fetchSize; | ||||||
|         this.maxPollRecords = maxPollRecords; |         this.maxPollRecords = maxPollRecords; | ||||||
|         this.checkCrcs = checkCrcs; |         this.checkCrcs = checkCrcs; | ||||||
| 
 |  | ||||||
|         this.keyDeserializer = keyDeserializer; |         this.keyDeserializer = keyDeserializer; | ||||||
|         this.valueDeserializer = valueDeserializer; |         this.valueDeserializer = valueDeserializer; | ||||||
| 
 |         this.completedFetches = new ArrayList<>(); | ||||||
|         this.records = new LinkedList<>(); |  | ||||||
|         this.offsetOutOfRangePartitions = new HashMap<>(); |  | ||||||
|         this.unauthorizedTopics = new HashSet<>(); |  | ||||||
|         this.recordTooLargePartitions = new HashMap<>(); |  | ||||||
| 
 |  | ||||||
|         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); |         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix); | ||||||
|         this.retryBackoffMs = retryBackoffMs; |         this.retryBackoffMs = retryBackoffMs; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|      * Set-up a fetch request for any node that we have assigned partitions for which doesn't have one. |      * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have | ||||||
|      * |      * an in-flight fetch or pending fetch data. | ||||||
|      */ |      */ | ||||||
|     public void sendFetches() { |     public void sendFetches() { | ||||||
|         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) { |         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) { | ||||||
|             final FetchRequest fetch = fetchEntry.getValue(); |             final FetchRequest request = fetchEntry.getValue(); | ||||||
|             client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch) |             client.send(fetchEntry.getKey(), ApiKeys.FETCH, request) | ||||||
|                     .addListener(new RequestFutureListener<ClientResponse>() { |                     .addListener(new RequestFutureListener<ClientResponse>() { | ||||||
|                         @Override |                         @Override | ||||||
|                         public void onSuccess(ClientResponse response) { |                         public void onSuccess(ClientResponse resp) { | ||||||
|                             handleFetchResponse(response, fetch); |                             FetchResponse response = new FetchResponse(resp.responseBody()); | ||||||
|  |                             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); | ||||||
|  |                             FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); | ||||||
|  | 
 | ||||||
|  |                             for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) { | ||||||
|  |                                 TopicPartition partition = entry.getKey(); | ||||||
|  |                                 long fetchOffset = request.fetchData().get(partition).offset; | ||||||
|  |                                 FetchResponse.PartitionData fetchData = entry.getValue(); | ||||||
|  |                                 completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); | ||||||
|  |                             } | ||||||
|  | 
 | ||||||
|  |                             sensors.fetchLatency.record(resp.requestLatencyMs()); | ||||||
|  |                             sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); | ||||||
|                         } |                         } | ||||||
| 
 | 
 | ||||||
|                         @Override |                         @Override | ||||||
|  | @ -152,7 +157,7 @@ public class Fetcher<K, V> { | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|      * Update the fetch positions for the provided partitions. |      * Update the fetch positions for the provided partitions. | ||||||
|      * @param partitions |      * @param partitions the partitions to update positions for | ||||||
|      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available |      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available | ||||||
|      */ |      */ | ||||||
|     public void updateFetchPositions(Set<TopicPartition> partitions) { |     public void updateFetchPositions(Set<TopicPartition> partitions) { | ||||||
|  | @ -323,62 +328,6 @@ public class Fetcher<K, V> { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /** |  | ||||||
|      * If any partition from previous fetchResponse contains OffsetOutOfRange error and |  | ||||||
|      * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException |  | ||||||
|      * |  | ||||||
|      * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse |  | ||||||
|      */ |  | ||||||
|     private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException { |  | ||||||
|         Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>(); |  | ||||||
| 
 |  | ||||||
|         // filter offsetOutOfRangePartitions to retain only the fetchable partitions |  | ||||||
|         for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) { |  | ||||||
|             if (!subscriptions.isFetchable(entry.getKey())) { |  | ||||||
|                 log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey()); |  | ||||||
|                 continue; |  | ||||||
|             } |  | ||||||
|             Long position = subscriptions.position(entry.getKey()); |  | ||||||
|             // ignore partition if the current position != the offset in fetchResponse, e.g. after seek() |  | ||||||
|             if (position != null && entry.getValue().equals(position)) |  | ||||||
|                 currentOutOfRangePartitions.put(entry.getKey(), entry.getValue()); |  | ||||||
|         } |  | ||||||
|         this.offsetOutOfRangePartitions.clear(); |  | ||||||
|         if (!currentOutOfRangePartitions.isEmpty()) |  | ||||||
|             throw new OffsetOutOfRangeException(currentOutOfRangePartitions); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /** |  | ||||||
|      * If any topic from previous fetchResponse contains an Authorization error, raise an exception |  | ||||||
|      * @throws TopicAuthorizationException |  | ||||||
|      */ |  | ||||||
|     private void throwIfUnauthorizedTopics() throws TopicAuthorizationException { |  | ||||||
|         if (!unauthorizedTopics.isEmpty()) { |  | ||||||
|             Set<String> topics = new HashSet<>(unauthorizedTopics); |  | ||||||
|             unauthorizedTopics.clear(); |  | ||||||
|             throw new TopicAuthorizationException(topics); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /** |  | ||||||
|      * If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException |  | ||||||
|      * |  | ||||||
|      * @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned |  | ||||||
|      */ |  | ||||||
|     private void throwIfRecordTooLarge() throws RecordTooLargeException { |  | ||||||
|         Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions); |  | ||||||
|         this.recordTooLargePartitions.clear(); |  | ||||||
| 
 |  | ||||||
|         if (!copiedRecordTooLargePartitions.isEmpty()) |  | ||||||
|             throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " |  | ||||||
|                 + copiedRecordTooLargePartitions |  | ||||||
|                 + " whose size is larger than the fetch size " |  | ||||||
|                 + this.fetchSize |  | ||||||
|                 + " and hence cannot be ever returned." |  | ||||||
|                 + " Increase the fetch size, or decrease the maximum message size the broker will allow.", |  | ||||||
|                 copiedRecordTooLargePartitions); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /** |     /** | ||||||
|      * Return the fetched records, empty the record buffer and update the consumed position. |      * Return the fetched records, empty the record buffer and update the consumed position. | ||||||
|      * |      * | ||||||
|  | @ -393,60 +342,68 @@ public class Fetcher<K, V> { | ||||||
|             return Collections.emptyMap(); |             return Collections.emptyMap(); | ||||||
|         } else { |         } else { | ||||||
|             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); |             Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); | ||||||
|             throwIfOffsetOutOfRange(); |             int recordsRemaining = maxPollRecords; | ||||||
|             throwIfUnauthorizedTopics(); |             Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator(); | ||||||
|             throwIfRecordTooLarge(); |  | ||||||
| 
 | 
 | ||||||
|             int maxRecords = maxPollRecords; |             while (recordsRemaining > 0) { | ||||||
|             Iterator<PartitionRecords<K, V>> iterator = records.iterator(); |                 if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { | ||||||
|             while (iterator.hasNext() && maxRecords > 0) { |                     if (!completedFetchesIterator.hasNext()) | ||||||
|                 PartitionRecords<K, V> part = iterator.next(); |                         break; | ||||||
|                 maxRecords -= append(drained, part, maxRecords); | 
 | ||||||
|                 if (part.isConsumed()) |                     CompletedFetch completion = completedFetchesIterator.next(); | ||||||
|                     iterator.remove(); |                     completedFetchesIterator.remove(); | ||||||
|  |                     nextInLineRecords = parseFetchedData(completion); | ||||||
|  |                 } else { | ||||||
|  |                     recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|  | 
 | ||||||
|             return drained; |             return drained; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, |     private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained, | ||||||
|                        PartitionRecords<K, V> part, |                        PartitionRecords<K, V> partitionRecords, | ||||||
|                        int maxRecords) { |                        int maxRecords) { | ||||||
|         if (!subscriptions.isAssigned(part.partition)) { |         if (partitionRecords.isEmpty()) | ||||||
|  |             return 0; | ||||||
|  | 
 | ||||||
|  |         if (!subscriptions.isAssigned(partitionRecords.partition)) { | ||||||
|             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call |             // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call | ||||||
|             log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition); |             log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition); | ||||||
|         } else { |         } else { | ||||||
|             // note that the consumed position should always be available as long as the partition is still assigned |             // note that the consumed position should always be available as long as the partition is still assigned | ||||||
|             long position = subscriptions.position(part.partition); |             long position = subscriptions.position(partitionRecords.partition); | ||||||
|             if (!subscriptions.isFetchable(part.partition)) { |             if (!subscriptions.isFetchable(partitionRecords.partition)) { | ||||||
|                 // this can happen when a partition is paused before fetched records are returned to the consumer's poll call |                 // this can happen when a partition is paused before fetched records are returned to the consumer's poll call | ||||||
|                 log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition); |                 log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); | ||||||
|             } else if (part.fetchOffset == position) { |             } else if (partitionRecords.fetchOffset == position) { | ||||||
|                 List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords); |                 // we are ensured to have at least one record since we already checked for emptiness | ||||||
|  |                 List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords); | ||||||
|                 long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; |                 long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; | ||||||
| 
 | 
 | ||||||
|                 log.trace("Returning fetched records at offset {} for assigned partition {} and update " + |                 log.trace("Returning fetched records at offset {} for assigned partition {} and update " + | ||||||
|                         "position to {}", position, part.partition, nextOffset); |                         "position to {}", position, partitionRecords.partition, nextOffset); | ||||||
| 
 | 
 | ||||||
|                 List<ConsumerRecord<K, V>> records = drained.get(part.partition); |                 List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition); | ||||||
|                 if (records == null) { |                 if (records == null) { | ||||||
|                     records = partRecords; |                     records = partRecords; | ||||||
|                     drained.put(part.partition, records); |                     drained.put(partitionRecords.partition, records); | ||||||
|                 } else { |                 } else { | ||||||
|                     records.addAll(partRecords); |                     records.addAll(partRecords); | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|                 subscriptions.position(part.partition, nextOffset); |                 subscriptions.position(partitionRecords.partition, nextOffset); | ||||||
|                 return partRecords.size(); |                 return partRecords.size(); | ||||||
|             } else { |             } else { | ||||||
|                 // these records aren't next in line based on the last consumed position, ignore them |                 // these records aren't next in line based on the last consumed position, ignore them | ||||||
|                 // they must be from an obsolete request |                 // they must be from an obsolete request | ||||||
|                 log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", |                 log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", | ||||||
|                         part.partition, part.fetchOffset, position); |                         partitionRecords.partition, partitionRecords.fetchOffset, position); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         part.discard(); |         partitionRecords.discard(); | ||||||
|         return 0; |         return 0; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -513,10 +470,10 @@ public class Fetcher<K, V> { | ||||||
| 
 | 
 | ||||||
|     private Set<TopicPartition> fetchablePartitions() { |     private Set<TopicPartition> fetchablePartitions() { | ||||||
|         Set<TopicPartition> fetchable = subscriptions.fetchablePartitions(); |         Set<TopicPartition> fetchable = subscriptions.fetchablePartitions(); | ||||||
|         if (records.isEmpty()) |         if (nextInLineRecords != null && !nextInLineRecords.isEmpty()) | ||||||
|             return fetchable; |             fetchable.remove(nextInLineRecords.partition); | ||||||
|         for (PartitionRecords<K, V> partitionRecords : records) |         for (CompletedFetch completedFetch : completedFetches) | ||||||
|             fetchable.remove(partitionRecords.partition); |             fetchable.remove(completedFetch.partition); | ||||||
|         return fetchable; |         return fetchable; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -559,30 +516,29 @@ public class Fetcher<K, V> { | ||||||
|     /** |     /** | ||||||
|      * The callback for fetch completion |      * The callback for fetch completion | ||||||
|      */ |      */ | ||||||
|     private void handleFetchResponse(ClientResponse resp, FetchRequest request) { |     private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) { | ||||||
|         int totalBytes = 0; |         TopicPartition tp = completedFetch.partition; | ||||||
|         int totalCount = 0; |         FetchResponse.PartitionData partition = completedFetch.partitionData; | ||||||
|         FetchResponse response = new FetchResponse(resp.responseBody()); |         long fetchOffset = completedFetch.fetchedOffset; | ||||||
|         for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) { |         int bytes = 0; | ||||||
|             TopicPartition tp = entry.getKey(); |         int recordsCount = 0; | ||||||
|             FetchResponse.PartitionData partition = entry.getValue(); |         PartitionRecords<K, V> parsedRecords = null; | ||||||
|  | 
 | ||||||
|  |         try { | ||||||
|             if (!subscriptions.isFetchable(tp)) { |             if (!subscriptions.isFetchable(tp)) { | ||||||
|                 // this can happen when a rebalance happened or a partition consumption paused |                 // this can happen when a rebalance happened or a partition consumption paused | ||||||
|                 // while fetch is still in-flight |                 // while fetch is still in-flight | ||||||
|                 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); |                 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); | ||||||
|             } else if (partition.errorCode == Errors.NONE.code()) { |             } else if (partition.errorCode == Errors.NONE.code()) { | ||||||
|                 long fetchOffset = request.fetchData().get(tp).offset; |  | ||||||
| 
 |  | ||||||
|                 // we are interested in this fetch only if the beginning offset matches the |                 // we are interested in this fetch only if the beginning offset matches the | ||||||
|                 // current consumed position |                 // current consumed position | ||||||
|                 Long position = subscriptions.position(tp); |                 Long position = subscriptions.position(tp); | ||||||
|                 if (position == null || position != fetchOffset) { |                 if (position == null || position != fetchOffset) { | ||||||
|                     log.debug("Discarding fetch response for partition {} since its offset {} does not match " + |                     log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + | ||||||
|                             "the expected offset {}", tp, fetchOffset, position); |                             "the expected offset {}", tp, fetchOffset, position); | ||||||
|                     continue; |                     return null; | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|                 int bytes = 0; |  | ||||||
|                 ByteBuffer buffer = partition.recordSet; |                 ByteBuffer buffer = partition.recordSet; | ||||||
|                 MemoryRecords records = MemoryRecords.readableRecords(buffer); |                 MemoryRecords records = MemoryRecords.readableRecords(buffer); | ||||||
|                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); |                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); | ||||||
|  | @ -597,79 +553,95 @@ public class Fetcher<K, V> { | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|  |                 recordsCount = parsed.size(); | ||||||
|  |                 this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount); | ||||||
|  | 
 | ||||||
|                 if (!parsed.isEmpty()) { |                 if (!parsed.isEmpty()) { | ||||||
|                     log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position); |                     log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position); | ||||||
|  |                     parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); | ||||||
|                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); |                     ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); | ||||||
|                     this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed)); |  | ||||||
|                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); |                     this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); | ||||||
|                 } else if (buffer.limit() > 0 && !skippedRecords) { |                 } else if (buffer.limit() > 0 && !skippedRecords) { | ||||||
|                     // we did not read a single message from a non-empty buffer |                     // we did not read a single message from a non-empty buffer | ||||||
|                     // because that message's size is larger than fetch size, in this case |                     // because that message's size is larger than fetch size, in this case | ||||||
|                     // record this exception |                     // record this exception | ||||||
|                     this.recordTooLargePartitions.put(tp, fetchOffset); |                     Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); | ||||||
|  |                     throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " | ||||||
|  |                             + recordTooLargePartitions | ||||||
|  |                             + " whose size is larger than the fetch size " | ||||||
|  |                             + this.fetchSize | ||||||
|  |                             + " and hence cannot be ever returned." | ||||||
|  |                             + " Increase the fetch size on the client (using max.partition.fetch.bytes)," | ||||||
|  |                             + " or decrease the maximum message size the broker will allow (using message.max.bytes).", | ||||||
|  |                             recordTooLargePartitions); | ||||||
|                 } |                 } | ||||||
| 
 |  | ||||||
|                 this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); |  | ||||||
|                 totalBytes += bytes; |  | ||||||
|                 totalCount += parsed.size(); |  | ||||||
|             } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() |             } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() | ||||||
|                 || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { |                     || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { | ||||||
|                 this.metadata.requestUpdate(); |                 this.metadata.requestUpdate(); | ||||||
|             } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { |             } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { | ||||||
|                 long fetchOffset = request.fetchData().get(tp).offset; |                 if (fetchOffset != subscriptions.position(tp)) { | ||||||
|                 if (subscriptions.hasDefaultOffsetResetPolicy()) |                     log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" + | ||||||
|  |                             "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); | ||||||
|  |                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) { | ||||||
|  |                     log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); | ||||||
|                     subscriptions.needOffsetReset(tp); |                     subscriptions.needOffsetReset(tp); | ||||||
|                 else |                 } else { | ||||||
|                     this.offsetOutOfRangePartitions.put(tp, fetchOffset); |                     throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); | ||||||
|                 log.info("Fetch offset {} is out of range, resetting offset", fetchOffset); |                 } | ||||||
|             } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { |             } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { | ||||||
|                 log.warn("Not authorized to read from topic {}.", tp.topic()); |                 log.warn("Not authorized to read from topic {}.", tp.topic()); | ||||||
|                 unauthorizedTopics.add(tp.topic()); |                 throw new TopicAuthorizationException(Collections.singleton(tp.topic())); | ||||||
|             } else if (partition.errorCode == Errors.UNKNOWN.code()) { |             } else if (partition.errorCode == Errors.UNKNOWN.code()) { | ||||||
|                 log.warn("Unknown error fetching data for topic-partition {}", tp); |                 log.warn("Unknown error fetching data for topic-partition {}", tp); | ||||||
|             } else { |             } else { | ||||||
|                 throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); |                 throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); | ||||||
|             } |             } | ||||||
|  |         } finally { | ||||||
|  |             completedFetch.metricAggregator.record(tp, bytes, recordsCount); | ||||||
|         } |         } | ||||||
|         this.sensors.bytesFetched.record(totalBytes); | 
 | ||||||
|         this.sensors.recordsFetched.record(totalCount); |         return parsedRecords; | ||||||
|         this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); |  | ||||||
|         this.sensors.fetchLatency.record(resp.requestLatencyMs()); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|      * Parse the record entry, deserializing the key / value fields if necessary |      * Parse the record entry, deserializing the key / value fields if necessary | ||||||
|      */ |      */ | ||||||
|     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) { |     private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) { | ||||||
|  |         Record record = logEntry.record(); | ||||||
|  | 
 | ||||||
|  |         if (this.checkCrcs && !record.isValid()) | ||||||
|  |             throw new InvalidRecordException("Record for partition " + partition + " at offset " | ||||||
|  |                     + logEntry.offset() + " is corrupt (stored crc = " + record.checksum() | ||||||
|  |                     + ", computed crc = " | ||||||
|  |                     + record.computeChecksum() | ||||||
|  |                     + ")"); | ||||||
|  | 
 | ||||||
|         try { |         try { | ||||||
|             if (this.checkCrcs) |  | ||||||
|                 logEntry.record().ensureValid(); |  | ||||||
|             long offset = logEntry.offset(); |             long offset = logEntry.offset(); | ||||||
|             long timestamp = logEntry.record().timestamp(); |             long timestamp = record.timestamp(); | ||||||
|             TimestampType timestampType = logEntry.record().timestampType(); |             TimestampType timestampType = record.timestampType(); | ||||||
|             ByteBuffer keyBytes = logEntry.record().key(); |             ByteBuffer keyBytes = record.key(); | ||||||
|             byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); |             byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); | ||||||
|             K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray); |             K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray); | ||||||
|             ByteBuffer valueBytes = logEntry.record().value(); |             ByteBuffer valueBytes = record.value(); | ||||||
|             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); |             byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); | ||||||
|             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray); |             V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray); | ||||||
| 
 | 
 | ||||||
|             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, |             return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, | ||||||
|                                         timestamp, timestampType, logEntry.record().checksum(), |                                         timestamp, timestampType, record.checksum(), | ||||||
|                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, |                                         keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, | ||||||
|                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, |                                         valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, | ||||||
|                                         key, value); |                                         key, value); | ||||||
|         } catch (KafkaException e) { |  | ||||||
|             throw e; |  | ||||||
|         } catch (RuntimeException e) { |         } catch (RuntimeException e) { | ||||||
|             throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e); |             throw new SerializationException("Error deserializing key/value for partition " + partition + | ||||||
|  |                     " at offset " + logEntry.offset(), e); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private static class PartitionRecords<K, V> { |     private static class PartitionRecords<K, V> { | ||||||
|         public long fetchOffset; |         private long fetchOffset; | ||||||
|         public TopicPartition partition; |         private TopicPartition partition; | ||||||
|         public List<ConsumerRecord<K, V>> records; |         private List<ConsumerRecord<K, V>> records; | ||||||
| 
 | 
 | ||||||
|         public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) { |         public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) { | ||||||
|             this.fetchOffset = fetchOffset; |             this.fetchOffset = fetchOffset; | ||||||
|  | @ -677,7 +649,7 @@ public class Fetcher<K, V> { | ||||||
|             this.records = records; |             this.records = records; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         private boolean isConsumed() { |         private boolean isEmpty() { | ||||||
|             return records == null || records.isEmpty(); |             return records == null || records.isEmpty(); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  | @ -687,7 +659,7 @@ public class Fetcher<K, V> { | ||||||
| 
 | 
 | ||||||
|         private List<ConsumerRecord<K, V>> take(int n) { |         private List<ConsumerRecord<K, V>> take(int n) { | ||||||
|             if (records == null) |             if (records == null) | ||||||
|                 return Collections.emptyList(); |                 return new ArrayList<>(); | ||||||
| 
 | 
 | ||||||
|             if (n >= records.size()) { |             if (n >= records.size()) { | ||||||
|                 List<ConsumerRecord<K, V>> res = this.records; |                 List<ConsumerRecord<K, V>> res = this.records; | ||||||
|  | @ -709,7 +681,59 @@ public class Fetcher<K, V> { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private class FetchManagerMetrics { |     private static class CompletedFetch { | ||||||
|  |         private final TopicPartition partition; | ||||||
|  |         private final long fetchedOffset; | ||||||
|  |         private final FetchResponse.PartitionData partitionData; | ||||||
|  |         private final FetchResponseMetricAggregator metricAggregator; | ||||||
|  | 
 | ||||||
|  |         public CompletedFetch(TopicPartition partition, | ||||||
|  |                               long fetchedOffset, | ||||||
|  |                               FetchResponse.PartitionData partitionData, | ||||||
|  |                               FetchResponseMetricAggregator metricAggregator) { | ||||||
|  |             this.partition = partition; | ||||||
|  |             this.fetchedOffset = fetchedOffset; | ||||||
|  |             this.partitionData = partitionData; | ||||||
|  |             this.metricAggregator = metricAggregator; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Since we parse the message data for each partition from each fetch response lazily, fetch-level | ||||||
|  |      * metrics need to be aggregated as the messages from each partition are parsed. This class is used | ||||||
|  |      * to facilitate this incremental aggregation. | ||||||
|  |      */ | ||||||
|  |     private static class FetchResponseMetricAggregator { | ||||||
|  |         private final FetchManagerMetrics sensors; | ||||||
|  |         private final Set<TopicPartition> unrecordedPartitions; | ||||||
|  | 
 | ||||||
|  |         private int totalBytes; | ||||||
|  |         private int totalRecords; | ||||||
|  | 
 | ||||||
|  |         public FetchResponseMetricAggregator(FetchManagerMetrics sensors, | ||||||
|  |                                              Set<TopicPartition> partitions) { | ||||||
|  |             this.sensors = sensors; | ||||||
|  |             this.unrecordedPartitions = partitions; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         /** | ||||||
|  |          * After each partition is parsed, we update the current metric totals with the total bytes | ||||||
|  |          * and number of records parsed. After all partitions have reported, we write the metric. | ||||||
|  |          */ | ||||||
|  |         public void record(TopicPartition partition, int bytes, int records) { | ||||||
|  |             unrecordedPartitions.remove(partition); | ||||||
|  |             totalBytes += bytes; | ||||||
|  |             totalRecords += records; | ||||||
|  | 
 | ||||||
|  |             if (unrecordedPartitions.isEmpty()) { | ||||||
|  |                 // once all expected partitions from the fetch have reported in, record the metrics | ||||||
|  |                 sensors.bytesFetched.record(totalBytes); | ||||||
|  |                 sensors.recordsFetched.record(totalRecords); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private static class FetchManagerMetrics { | ||||||
|         public final Metrics metrics; |         public final Metrics metrics; | ||||||
|         public final String metricGrpName; |         public final String metricGrpName; | ||||||
| 
 | 
 | ||||||
|  | @ -719,7 +743,6 @@ public class Fetcher<K, V> { | ||||||
|         public final Sensor recordsFetchLag; |         public final Sensor recordsFetchLag; | ||||||
|         public final Sensor fetchThrottleTimeSensor; |         public final Sensor fetchThrottleTimeSensor; | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|         public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) { |         public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) { | ||||||
|             this.metrics = metrics; |             this.metrics = metrics; | ||||||
|             this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; |             this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; | ||||||
|  |  | ||||||
|  | @ -242,7 +242,7 @@ public class Compressor { | ||||||
| 
 | 
 | ||||||
|     // the following two functions also need to be public since they are used in MemoryRecords.iteration |     // the following two functions also need to be public since they are used in MemoryRecords.iteration | ||||||
| 
 | 
 | ||||||
|     static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { |     public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { | ||||||
|         try { |         try { | ||||||
|             switch (type) { |             switch (type) { | ||||||
|                 case NONE: |                 case NONE: | ||||||
|  | @ -271,7 +271,7 @@ public class Compressor { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { |     public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) { | ||||||
|         try { |         try { | ||||||
|             switch (type) { |             switch (type) { | ||||||
|                 case NONE: |                 case NONE: | ||||||
|  |  | ||||||
|  | @ -16,7 +16,9 @@ | ||||||
|  */ |  */ | ||||||
| package org.apache.kafka.common.record; | package org.apache.kafka.common.record; | ||||||
| 
 | 
 | ||||||
| public class InvalidRecordException extends RuntimeException { | import org.apache.kafka.common.KafkaException; | ||||||
|  | 
 | ||||||
|  | public class InvalidRecordException extends KafkaException { | ||||||
| 
 | 
 | ||||||
|     private static final long serialVersionUID = 1; |     private static final long serialVersionUID = 1; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo; | ||||||
| import org.apache.kafka.common.TopicPartition; | import org.apache.kafka.common.TopicPartition; | ||||||
| import org.apache.kafka.common.errors.InvalidTopicException; | import org.apache.kafka.common.errors.InvalidTopicException; | ||||||
| import org.apache.kafka.common.errors.RecordTooLargeException; | import org.apache.kafka.common.errors.RecordTooLargeException; | ||||||
|  | import org.apache.kafka.common.errors.SerializationException; | ||||||
| import org.apache.kafka.common.errors.TimeoutException; | import org.apache.kafka.common.errors.TimeoutException; | ||||||
| import org.apache.kafka.common.errors.TopicAuthorizationException; | import org.apache.kafka.common.errors.TopicAuthorizationException; | ||||||
| import org.apache.kafka.common.metrics.KafkaMetric; | import org.apache.kafka.common.metrics.KafkaMetric; | ||||||
|  | @ -38,6 +39,8 @@ import org.apache.kafka.common.metrics.Metrics; | ||||||
| import org.apache.kafka.common.protocol.Errors; | import org.apache.kafka.common.protocol.Errors; | ||||||
| import org.apache.kafka.common.protocol.types.Struct; | import org.apache.kafka.common.protocol.types.Struct; | ||||||
| import org.apache.kafka.common.record.CompressionType; | import org.apache.kafka.common.record.CompressionType; | ||||||
|  | import org.apache.kafka.common.record.Compressor; | ||||||
|  | import org.apache.kafka.common.record.InvalidRecordException; | ||||||
| import org.apache.kafka.common.record.MemoryRecords; | import org.apache.kafka.common.record.MemoryRecords; | ||||||
| import org.apache.kafka.common.record.Record; | import org.apache.kafka.common.record.Record; | ||||||
| import org.apache.kafka.common.requests.FetchRequest; | import org.apache.kafka.common.requests.FetchRequest; | ||||||
|  | @ -47,6 +50,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse; | ||||||
| import org.apache.kafka.common.requests.MetadataRequest; | import org.apache.kafka.common.requests.MetadataRequest; | ||||||
| import org.apache.kafka.common.requests.MetadataResponse; | import org.apache.kafka.common.requests.MetadataResponse; | ||||||
| import org.apache.kafka.common.serialization.ByteArrayDeserializer; | import org.apache.kafka.common.serialization.ByteArrayDeserializer; | ||||||
|  | import org.apache.kafka.common.serialization.Deserializer; | ||||||
| import org.apache.kafka.common.utils.MockTime; | import org.apache.kafka.common.utils.MockTime; | ||||||
| import org.apache.kafka.test.TestUtils; | import org.apache.kafka.test.TestUtils; | ||||||
| import org.junit.After; | import org.junit.After; | ||||||
|  | @ -128,7 +132,7 @@ public class FetcherTest { | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         records = fetcher.fetchedRecords().get(tp); |         records = fetcher.fetchedRecords().get(tp); | ||||||
|         assertEquals(3, records.size()); |         assertEquals(3, records.size()); | ||||||
|         assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position |         assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position | ||||||
|         long offset = 1; |         long offset = 1; | ||||||
|         for (ConsumerRecord<byte[], byte[]> record : records) { |         for (ConsumerRecord<byte[], byte[]> record : records) { | ||||||
|             assertEquals(offset, record.offset()); |             assertEquals(offset, record.offset()); | ||||||
|  | @ -147,9 +151,83 @@ public class FetcherTest { | ||||||
|         }; |         }; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Test | ||||||
|  |     public void testFetchedRecordsRaisesOnSerializationErrors() { | ||||||
|  |         // raise an exception from somewhere in the middle of the fetch response | ||||||
|  |         // so that we can verify that our position does not advance after raising | ||||||
|  |         ByteArrayDeserializer deserializer = new ByteArrayDeserializer() { | ||||||
|  |             int i = 0; | ||||||
|  |             @Override | ||||||
|  |             public byte[] deserialize(String topic, byte[] data) { | ||||||
|  |                 if (i++ == 1) | ||||||
|  |                     throw new SerializationException(); | ||||||
|  |                 return data; | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer); | ||||||
|  | 
 | ||||||
|  |         subscriptions.assignFromUser(Collections.singleton(tp)); | ||||||
|  |         subscriptions.seek(tp, 1); | ||||||
|  | 
 | ||||||
|  |         client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); | ||||||
|  | 
 | ||||||
|  |         fetcher.sendFetches(); | ||||||
|  |         consumerClient.poll(0); | ||||||
|  |         try { | ||||||
|  |             fetcher.fetchedRecords(); | ||||||
|  |             fail("fetchedRecords should have raised"); | ||||||
|  |         } catch (SerializationException e) { | ||||||
|  |             // the position should not advance since no data has been returned | ||||||
|  |             assertEquals(1, subscriptions.position(tp).longValue()); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Test | ||||||
|  |     public void testParseInvalidRecord() { | ||||||
|  |         ByteBuffer buffer = ByteBuffer.allocate(1024); | ||||||
|  |         Compressor compressor = new Compressor(buffer, CompressionType.NONE); | ||||||
|  | 
 | ||||||
|  |         byte[] key = "foo".getBytes(); | ||||||
|  |         byte[] value = "baz".getBytes(); | ||||||
|  |         long offset = 0; | ||||||
|  |         long timestamp = 500L; | ||||||
|  | 
 | ||||||
|  |         int size = Record.recordSize(key, value); | ||||||
|  |         long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1); | ||||||
|  | 
 | ||||||
|  |         // write one valid record | ||||||
|  |         compressor.putLong(offset); | ||||||
|  |         compressor.putInt(size); | ||||||
|  |         Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); | ||||||
|  | 
 | ||||||
|  |         // and one invalid record (note the crc) | ||||||
|  |         compressor.putLong(offset); | ||||||
|  |         compressor.putInt(size); | ||||||
|  |         Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1); | ||||||
|  | 
 | ||||||
|  |         compressor.close(); | ||||||
|  |         buffer.flip(); | ||||||
|  | 
 | ||||||
|  |         subscriptions.assignFromUser(Arrays.asList(tp)); | ||||||
|  |         subscriptions.seek(tp, 0); | ||||||
|  | 
 | ||||||
|  |         // normal fetch | ||||||
|  |         fetcher.sendFetches(); | ||||||
|  |         client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0)); | ||||||
|  |         consumerClient.poll(0); | ||||||
|  |         try { | ||||||
|  |             fetcher.fetchedRecords(); | ||||||
|  |             fail("fetchedRecords should have raised"); | ||||||
|  |         } catch (InvalidRecordException e) { | ||||||
|  |             // the position should not advance since no data has been returned | ||||||
|  |             assertEquals(0, subscriptions.position(tp).longValue()); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Test |     @Test | ||||||
|     public void testFetchMaxPollRecords() { |     public void testFetchMaxPollRecords() { | ||||||
|         Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time)); |         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2); | ||||||
| 
 | 
 | ||||||
|         List<ConsumerRecord<byte[], byte[]>> records; |         List<ConsumerRecord<byte[], byte[]>> records; | ||||||
|         subscriptions.assignFromUser(Arrays.asList(tp)); |         subscriptions.assignFromUser(Arrays.asList(tp)); | ||||||
|  | @ -162,7 +240,7 @@ public class FetcherTest { | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         records = fetcher.fetchedRecords().get(tp); |         records = fetcher.fetchedRecords().get(tp); | ||||||
|         assertEquals(2, records.size()); |         assertEquals(2, records.size()); | ||||||
|         assertEquals(3L, (long) subscriptions.position(tp)); |         assertEquals(3L, subscriptions.position(tp).longValue()); | ||||||
|         assertEquals(1, records.get(0).offset()); |         assertEquals(1, records.get(0).offset()); | ||||||
|         assertEquals(2, records.get(1).offset()); |         assertEquals(2, records.get(1).offset()); | ||||||
| 
 | 
 | ||||||
|  | @ -170,14 +248,14 @@ public class FetcherTest { | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         records = fetcher.fetchedRecords().get(tp); |         records = fetcher.fetchedRecords().get(tp); | ||||||
|         assertEquals(1, records.size()); |         assertEquals(1, records.size()); | ||||||
|         assertEquals(4L, (long) subscriptions.position(tp)); |         assertEquals(4L, subscriptions.position(tp).longValue()); | ||||||
|         assertEquals(3, records.get(0).offset()); |         assertEquals(3, records.get(0).offset()); | ||||||
| 
 | 
 | ||||||
|         fetcher.sendFetches(); |         fetcher.sendFetches(); | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         records = fetcher.fetchedRecords().get(tp); |         records = fetcher.fetchedRecords().get(tp); | ||||||
|         assertEquals(2, records.size()); |         assertEquals(2, records.size()); | ||||||
|         assertEquals(6L, (long) subscriptions.position(tp)); |         assertEquals(6L, subscriptions.position(tp).longValue()); | ||||||
|         assertEquals(4, records.get(0).offset()); |         assertEquals(4, records.get(0).offset()); | ||||||
|         assertEquals(5, records.get(1).offset()); |         assertEquals(5, records.get(1).offset()); | ||||||
|     } |     } | ||||||
|  | @ -203,7 +281,7 @@ public class FetcherTest { | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         consumerRecords = fetcher.fetchedRecords().get(tp); |         consumerRecords = fetcher.fetchedRecords().get(tp); | ||||||
|         assertEquals(3, consumerRecords.size()); |         assertEquals(3, consumerRecords.size()); | ||||||
|         assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position |         assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position | ||||||
| 
 | 
 | ||||||
|         assertEquals(15L, consumerRecords.get(0).offset()); |         assertEquals(15L, consumerRecords.get(0).offset()); | ||||||
|         assertEquals(20L, consumerRecords.get(1).offset()); |         assertEquals(20L, consumerRecords.get(1).offset()); | ||||||
|  | @ -318,11 +396,27 @@ public class FetcherTest { | ||||||
|         fetcher.sendFetches(); |         fetcher.sendFetches(); | ||||||
|         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); |         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); | ||||||
|         consumerClient.poll(0); |         consumerClient.poll(0); | ||||||
|         assertTrue(subscriptions.isOffsetResetNeeded(tp)); |  | ||||||
|         assertEquals(0, fetcher.fetchedRecords().size()); |         assertEquals(0, fetcher.fetchedRecords().size()); | ||||||
|  |         assertTrue(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertEquals(null, subscriptions.position(tp)); |         assertEquals(null, subscriptions.position(tp)); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Test | ||||||
|  |     public void testStaleOutOfRangeError() { | ||||||
|  |         // verify that an out of range error which arrives after a seek | ||||||
|  |         // does not cause us to reset our position or throw an exception | ||||||
|  |         subscriptions.assignFromUser(Arrays.asList(tp)); | ||||||
|  |         subscriptions.seek(tp, 0); | ||||||
|  | 
 | ||||||
|  |         fetcher.sendFetches(); | ||||||
|  |         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); | ||||||
|  |         subscriptions.seek(tp, 1); | ||||||
|  |         consumerClient.poll(0); | ||||||
|  |         assertEquals(0, fetcher.fetchedRecords().size()); | ||||||
|  |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|  |         assertEquals(1, subscriptions.position(tp).longValue()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Test |     @Test | ||||||
|     public void testFetchedRecordsAfterSeek() { |     public void testFetchedRecordsAfterSeek() { | ||||||
|         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); |         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); | ||||||
|  | @ -368,7 +462,7 @@ public class FetcherTest { | ||||||
|         // disconnects should have no affect on subscription state |         // disconnects should have no affect on subscription state | ||||||
|         assertFalse(subscriptions.isOffsetResetNeeded(tp)); |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(0, (long) subscriptions.position(tp)); |         assertEquals(0, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -380,7 +474,7 @@ public class FetcherTest { | ||||||
| 
 | 
 | ||||||
|         fetcher.updateFetchPositions(Collections.singleton(tp)); |         fetcher.updateFetchPositions(Collections.singleton(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(5, (long) subscriptions.position(tp)); |         assertEquals(5, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -393,7 +487,7 @@ public class FetcherTest { | ||||||
|         fetcher.updateFetchPositions(Collections.singleton(tp)); |         fetcher.updateFetchPositions(Collections.singleton(tp)); | ||||||
|         assertFalse(subscriptions.isOffsetResetNeeded(tp)); |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(5, (long) subscriptions.position(tp)); |         assertEquals(5, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -406,7 +500,7 @@ public class FetcherTest { | ||||||
|         fetcher.updateFetchPositions(Collections.singleton(tp)); |         fetcher.updateFetchPositions(Collections.singleton(tp)); | ||||||
|         assertFalse(subscriptions.isOffsetResetNeeded(tp)); |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(5, (long) subscriptions.position(tp)); |         assertEquals(5, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -419,7 +513,7 @@ public class FetcherTest { | ||||||
|         fetcher.updateFetchPositions(Collections.singleton(tp)); |         fetcher.updateFetchPositions(Collections.singleton(tp)); | ||||||
|         assertFalse(subscriptions.isOffsetResetNeeded(tp)); |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(5, (long) subscriptions.position(tp)); |         assertEquals(5, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -437,7 +531,7 @@ public class FetcherTest { | ||||||
|         fetcher.updateFetchPositions(Collections.singleton(tp)); |         fetcher.updateFetchPositions(Collections.singleton(tp)); | ||||||
|         assertFalse(subscriptions.isOffsetResetNeeded(tp)); |         assertFalse(subscriptions.isOffsetResetNeeded(tp)); | ||||||
|         assertTrue(subscriptions.isFetchable(tp)); |         assertTrue(subscriptions.isFetchable(tp)); | ||||||
|         assertEquals(5, (long) subscriptions.position(tp)); |         assertEquals(5, subscriptions.position(tp).longValue()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Test |     @Test | ||||||
|  | @ -575,17 +669,36 @@ public class FetcherTest { | ||||||
|         return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata)); |         return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata)); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords, |     private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, | ||||||
|                                                   SubscriptionState subscriptions, |                                                   Metrics metrics, | ||||||
|                                                   Metrics metrics) { |                                                   int maxPollRecords) { | ||||||
|  |         return createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) { | ||||||
|  |         return createFetcher(subscriptions, metrics, Integer.MAX_VALUE); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, | ||||||
|  |                                                Metrics metrics, | ||||||
|  |                                                Deserializer<K> keyDeserializer, | ||||||
|  |                                                Deserializer<V> valueDeserializer) { | ||||||
|  |         return createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, | ||||||
|  |                                                Metrics metrics, | ||||||
|  |                                                Deserializer<K> keyDeserializer, | ||||||
|  |                                                Deserializer<V> valueDeserializer, | ||||||
|  |                                                int maxPollRecords) { | ||||||
|         return new Fetcher<>(consumerClient, |         return new Fetcher<>(consumerClient, | ||||||
|                 minBytes, |                 minBytes, | ||||||
|                 maxWaitMs, |                 maxWaitMs, | ||||||
|                 fetchSize, |                 fetchSize, | ||||||
|                 maxPollRecords, |                 maxPollRecords, | ||||||
|                 true, // check crc |                 true, // check crc | ||||||
|                 new ByteArrayDeserializer(), |                 keyDeserializer, | ||||||
|                 new ByteArrayDeserializer(), |                 valueDeserializer, | ||||||
|                 metadata, |                 metadata, | ||||||
|                 subscriptions, |                 subscriptions, | ||||||
|                 metrics, |                 metrics, | ||||||
|  | @ -594,8 +707,4 @@ public class FetcherTest { | ||||||
|                 retryBackoffMs); |                 retryBackoffMs); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|     private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) { |  | ||||||
|         return createFetcher(Integer.MAX_VALUE, subscriptions, metrics); |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue