mirror of https://github.com/apache/kafka.git
KAFKA-3977; Defer fetch parsing for space efficiency and to ensure exceptions are raised to the user
Author: Jason Gustafson <jason@confluent.io> Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Ismael Juma <ismael@juma.me.uk> Closes #1656 from hachikuji/KAFKA-3977
This commit is contained in:
parent
d5c821c198
commit
ff557f02ac
|
|
@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.InvalidMetadataException;
|
||||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||||
import org.apache.kafka.common.errors.RecordTooLargeException;
|
import org.apache.kafka.common.errors.RecordTooLargeException;
|
||||||
import org.apache.kafka.common.errors.RetriableException;
|
import org.apache.kafka.common.errors.RetriableException;
|
||||||
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
|
|
@ -38,8 +39,10 @@ import org.apache.kafka.common.metrics.stats.Max;
|
||||||
import org.apache.kafka.common.metrics.stats.Rate;
|
import org.apache.kafka.common.metrics.stats.Rate;
|
||||||
import org.apache.kafka.common.protocol.ApiKeys;
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
|
import org.apache.kafka.common.record.InvalidRecordException;
|
||||||
import org.apache.kafka.common.record.LogEntry;
|
import org.apache.kafka.common.record.LogEntry;
|
||||||
import org.apache.kafka.common.record.MemoryRecords;
|
import org.apache.kafka.common.record.MemoryRecords;
|
||||||
|
import org.apache.kafka.common.record.Record;
|
||||||
import org.apache.kafka.common.record.TimestampType;
|
import org.apache.kafka.common.record.TimestampType;
|
||||||
import org.apache.kafka.common.requests.FetchRequest;
|
import org.apache.kafka.common.requests.FetchRequest;
|
||||||
import org.apache.kafka.common.requests.FetchResponse;
|
import org.apache.kafka.common.requests.FetchResponse;
|
||||||
|
|
@ -59,7 +62,6 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
@ -83,13 +85,11 @@ public class Fetcher<K, V> {
|
||||||
private final Metadata metadata;
|
private final Metadata metadata;
|
||||||
private final FetchManagerMetrics sensors;
|
private final FetchManagerMetrics sensors;
|
||||||
private final SubscriptionState subscriptions;
|
private final SubscriptionState subscriptions;
|
||||||
private final List<PartitionRecords<K, V>> records;
|
private final List<CompletedFetch> completedFetches;
|
||||||
private final Deserializer<K> keyDeserializer;
|
private final Deserializer<K> keyDeserializer;
|
||||||
private final Deserializer<V> valueDeserializer;
|
private final Deserializer<V> valueDeserializer;
|
||||||
|
|
||||||
private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
|
private PartitionRecords<K, V> nextInLineRecords = null;
|
||||||
private final Set<String> unauthorizedTopics;
|
|
||||||
private final Map<TopicPartition, Long> recordTooLargePartitions;
|
|
||||||
|
|
||||||
public Fetcher(ConsumerNetworkClient client,
|
public Fetcher(ConsumerNetworkClient client,
|
||||||
int minBytes,
|
int minBytes,
|
||||||
|
|
@ -105,7 +105,6 @@ public class Fetcher<K, V> {
|
||||||
String metricGrpPrefix,
|
String metricGrpPrefix,
|
||||||
Time time,
|
Time time,
|
||||||
long retryBackoffMs) {
|
long retryBackoffMs) {
|
||||||
|
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.metadata = metadata;
|
this.metadata = metadata;
|
||||||
|
|
@ -115,31 +114,37 @@ public class Fetcher<K, V> {
|
||||||
this.fetchSize = fetchSize;
|
this.fetchSize = fetchSize;
|
||||||
this.maxPollRecords = maxPollRecords;
|
this.maxPollRecords = maxPollRecords;
|
||||||
this.checkCrcs = checkCrcs;
|
this.checkCrcs = checkCrcs;
|
||||||
|
|
||||||
this.keyDeserializer = keyDeserializer;
|
this.keyDeserializer = keyDeserializer;
|
||||||
this.valueDeserializer = valueDeserializer;
|
this.valueDeserializer = valueDeserializer;
|
||||||
|
this.completedFetches = new ArrayList<>();
|
||||||
this.records = new LinkedList<>();
|
|
||||||
this.offsetOutOfRangePartitions = new HashMap<>();
|
|
||||||
this.unauthorizedTopics = new HashSet<>();
|
|
||||||
this.recordTooLargePartitions = new HashMap<>();
|
|
||||||
|
|
||||||
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
|
this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
|
||||||
this.retryBackoffMs = retryBackoffMs;
|
this.retryBackoffMs = retryBackoffMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set-up a fetch request for any node that we have assigned partitions for which doesn't have one.
|
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
|
||||||
*
|
* an in-flight fetch or pending fetch data.
|
||||||
*/
|
*/
|
||||||
public void sendFetches() {
|
public void sendFetches() {
|
||||||
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
|
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
|
||||||
final FetchRequest fetch = fetchEntry.getValue();
|
final FetchRequest request = fetchEntry.getValue();
|
||||||
client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
|
client.send(fetchEntry.getKey(), ApiKeys.FETCH, request)
|
||||||
.addListener(new RequestFutureListener<ClientResponse>() {
|
.addListener(new RequestFutureListener<ClientResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(ClientResponse response) {
|
public void onSuccess(ClientResponse resp) {
|
||||||
handleFetchResponse(response, fetch);
|
FetchResponse response = new FetchResponse(resp.responseBody());
|
||||||
|
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
|
||||||
|
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
|
||||||
|
|
||||||
|
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
|
||||||
|
TopicPartition partition = entry.getKey();
|
||||||
|
long fetchOffset = request.fetchData().get(partition).offset;
|
||||||
|
FetchResponse.PartitionData fetchData = entry.getValue();
|
||||||
|
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
|
||||||
|
}
|
||||||
|
|
||||||
|
sensors.fetchLatency.record(resp.requestLatencyMs());
|
||||||
|
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -152,7 +157,7 @@ public class Fetcher<K, V> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the fetch positions for the provided partitions.
|
* Update the fetch positions for the provided partitions.
|
||||||
* @param partitions
|
* @param partitions the partitions to update positions for
|
||||||
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
|
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
|
||||||
*/
|
*/
|
||||||
public void updateFetchPositions(Set<TopicPartition> partitions) {
|
public void updateFetchPositions(Set<TopicPartition> partitions) {
|
||||||
|
|
@ -323,62 +328,6 @@ public class Fetcher<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If any partition from previous fetchResponse contains OffsetOutOfRange error and
|
|
||||||
* the defaultResetPolicy is NONE, throw OffsetOutOfRangeException
|
|
||||||
*
|
|
||||||
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
|
|
||||||
*/
|
|
||||||
private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
|
|
||||||
Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>();
|
|
||||||
|
|
||||||
// filter offsetOutOfRangePartitions to retain only the fetchable partitions
|
|
||||||
for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) {
|
|
||||||
if (!subscriptions.isFetchable(entry.getKey())) {
|
|
||||||
log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Long position = subscriptions.position(entry.getKey());
|
|
||||||
// ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
|
|
||||||
if (position != null && entry.getValue().equals(position))
|
|
||||||
currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
this.offsetOutOfRangePartitions.clear();
|
|
||||||
if (!currentOutOfRangePartitions.isEmpty())
|
|
||||||
throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If any topic from previous fetchResponse contains an Authorization error, raise an exception
|
|
||||||
* @throws TopicAuthorizationException
|
|
||||||
*/
|
|
||||||
private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
|
|
||||||
if (!unauthorizedTopics.isEmpty()) {
|
|
||||||
Set<String> topics = new HashSet<>(unauthorizedTopics);
|
|
||||||
unauthorizedTopics.clear();
|
|
||||||
throw new TopicAuthorizationException(topics);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If any partition from previous fetchResponse gets a RecordTooLarge error, throw RecordTooLargeException
|
|
||||||
*
|
|
||||||
* @throws RecordTooLargeException If there is a message larger than fetch size and hence cannot be ever returned
|
|
||||||
*/
|
|
||||||
private void throwIfRecordTooLarge() throws RecordTooLargeException {
|
|
||||||
Map<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<>(this.recordTooLargePartitions);
|
|
||||||
this.recordTooLargePartitions.clear();
|
|
||||||
|
|
||||||
if (!copiedRecordTooLargePartitions.isEmpty())
|
|
||||||
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
|
|
||||||
+ copiedRecordTooLargePartitions
|
|
||||||
+ " whose size is larger than the fetch size "
|
|
||||||
+ this.fetchSize
|
|
||||||
+ " and hence cannot be ever returned."
|
|
||||||
+ " Increase the fetch size, or decrease the maximum message size the broker will allow.",
|
|
||||||
copiedRecordTooLargePartitions);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the fetched records, empty the record buffer and update the consumed position.
|
* Return the fetched records, empty the record buffer and update the consumed position.
|
||||||
*
|
*
|
||||||
|
|
@ -393,60 +342,68 @@ public class Fetcher<K, V> {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
} else {
|
} else {
|
||||||
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
|
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
|
||||||
throwIfOffsetOutOfRange();
|
int recordsRemaining = maxPollRecords;
|
||||||
throwIfUnauthorizedTopics();
|
Iterator<CompletedFetch> completedFetchesIterator = completedFetches.iterator();
|
||||||
throwIfRecordTooLarge();
|
|
||||||
|
|
||||||
int maxRecords = maxPollRecords;
|
while (recordsRemaining > 0) {
|
||||||
Iterator<PartitionRecords<K, V>> iterator = records.iterator();
|
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
|
||||||
while (iterator.hasNext() && maxRecords > 0) {
|
if (!completedFetchesIterator.hasNext())
|
||||||
PartitionRecords<K, V> part = iterator.next();
|
break;
|
||||||
maxRecords -= append(drained, part, maxRecords);
|
|
||||||
if (part.isConsumed())
|
CompletedFetch completion = completedFetchesIterator.next();
|
||||||
iterator.remove();
|
completedFetchesIterator.remove();
|
||||||
|
nextInLineRecords = parseFetchedData(completion);
|
||||||
|
} else {
|
||||||
|
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return drained;
|
return drained;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
|
private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
|
||||||
PartitionRecords<K, V> part,
|
PartitionRecords<K, V> partitionRecords,
|
||||||
int maxRecords) {
|
int maxRecords) {
|
||||||
if (!subscriptions.isAssigned(part.partition)) {
|
if (partitionRecords.isEmpty())
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (!subscriptions.isAssigned(partitionRecords.partition)) {
|
||||||
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
|
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
|
||||||
log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
|
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
|
||||||
} else {
|
} else {
|
||||||
// note that the consumed position should always be available as long as the partition is still assigned
|
// note that the consumed position should always be available as long as the partition is still assigned
|
||||||
long position = subscriptions.position(part.partition);
|
long position = subscriptions.position(partitionRecords.partition);
|
||||||
if (!subscriptions.isFetchable(part.partition)) {
|
if (!subscriptions.isFetchable(partitionRecords.partition)) {
|
||||||
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
|
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
|
||||||
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
|
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
|
||||||
} else if (part.fetchOffset == position) {
|
} else if (partitionRecords.fetchOffset == position) {
|
||||||
List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords);
|
// we are ensured to have at least one record since we already checked for emptiness
|
||||||
|
List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
|
||||||
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
|
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
|
||||||
|
|
||||||
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
|
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
|
||||||
"position to {}", position, part.partition, nextOffset);
|
"position to {}", position, partitionRecords.partition, nextOffset);
|
||||||
|
|
||||||
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
|
List<ConsumerRecord<K, V>> records = drained.get(partitionRecords.partition);
|
||||||
if (records == null) {
|
if (records == null) {
|
||||||
records = partRecords;
|
records = partRecords;
|
||||||
drained.put(part.partition, records);
|
drained.put(partitionRecords.partition, records);
|
||||||
} else {
|
} else {
|
||||||
records.addAll(partRecords);
|
records.addAll(partRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptions.position(part.partition, nextOffset);
|
subscriptions.position(partitionRecords.partition, nextOffset);
|
||||||
return partRecords.size();
|
return partRecords.size();
|
||||||
} else {
|
} else {
|
||||||
// these records aren't next in line based on the last consumed position, ignore them
|
// these records aren't next in line based on the last consumed position, ignore them
|
||||||
// they must be from an obsolete request
|
// they must be from an obsolete request
|
||||||
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
|
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
|
||||||
part.partition, part.fetchOffset, position);
|
partitionRecords.partition, partitionRecords.fetchOffset, position);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
part.discard();
|
partitionRecords.discard();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -513,10 +470,10 @@ public class Fetcher<K, V> {
|
||||||
|
|
||||||
private Set<TopicPartition> fetchablePartitions() {
|
private Set<TopicPartition> fetchablePartitions() {
|
||||||
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
|
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
|
||||||
if (records.isEmpty())
|
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
|
||||||
return fetchable;
|
fetchable.remove(nextInLineRecords.partition);
|
||||||
for (PartitionRecords<K, V> partitionRecords : records)
|
for (CompletedFetch completedFetch : completedFetches)
|
||||||
fetchable.remove(partitionRecords.partition);
|
fetchable.remove(completedFetch.partition);
|
||||||
return fetchable;
|
return fetchable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -559,30 +516,29 @@ public class Fetcher<K, V> {
|
||||||
/**
|
/**
|
||||||
* The callback for fetch completion
|
* The callback for fetch completion
|
||||||
*/
|
*/
|
||||||
private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
|
private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
|
||||||
int totalBytes = 0;
|
TopicPartition tp = completedFetch.partition;
|
||||||
int totalCount = 0;
|
FetchResponse.PartitionData partition = completedFetch.partitionData;
|
||||||
FetchResponse response = new FetchResponse(resp.responseBody());
|
long fetchOffset = completedFetch.fetchedOffset;
|
||||||
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
|
int bytes = 0;
|
||||||
TopicPartition tp = entry.getKey();
|
int recordsCount = 0;
|
||||||
FetchResponse.PartitionData partition = entry.getValue();
|
PartitionRecords<K, V> parsedRecords = null;
|
||||||
|
|
||||||
|
try {
|
||||||
if (!subscriptions.isFetchable(tp)) {
|
if (!subscriptions.isFetchable(tp)) {
|
||||||
// this can happen when a rebalance happened or a partition consumption paused
|
// this can happen when a rebalance happened or a partition consumption paused
|
||||||
// while fetch is still in-flight
|
// while fetch is still in-flight
|
||||||
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
|
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
|
||||||
} else if (partition.errorCode == Errors.NONE.code()) {
|
} else if (partition.errorCode == Errors.NONE.code()) {
|
||||||
long fetchOffset = request.fetchData().get(tp).offset;
|
|
||||||
|
|
||||||
// we are interested in this fetch only if the beginning offset matches the
|
// we are interested in this fetch only if the beginning offset matches the
|
||||||
// current consumed position
|
// current consumed position
|
||||||
Long position = subscriptions.position(tp);
|
Long position = subscriptions.position(tp);
|
||||||
if (position == null || position != fetchOffset) {
|
if (position == null || position != fetchOffset) {
|
||||||
log.debug("Discarding fetch response for partition {} since its offset {} does not match " +
|
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
|
||||||
"the expected offset {}", tp, fetchOffset, position);
|
"the expected offset {}", tp, fetchOffset, position);
|
||||||
continue;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
int bytes = 0;
|
|
||||||
ByteBuffer buffer = partition.recordSet;
|
ByteBuffer buffer = partition.recordSet;
|
||||||
MemoryRecords records = MemoryRecords.readableRecords(buffer);
|
MemoryRecords records = MemoryRecords.readableRecords(buffer);
|
||||||
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
|
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
|
||||||
|
|
@ -597,79 +553,95 @@ public class Fetcher<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
recordsCount = parsed.size();
|
||||||
|
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
|
||||||
|
|
||||||
if (!parsed.isEmpty()) {
|
if (!parsed.isEmpty()) {
|
||||||
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
|
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
|
||||||
|
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
|
||||||
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
|
||||||
this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
|
|
||||||
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
|
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
|
||||||
} else if (buffer.limit() > 0 && !skippedRecords) {
|
} else if (buffer.limit() > 0 && !skippedRecords) {
|
||||||
// we did not read a single message from a non-empty buffer
|
// we did not read a single message from a non-empty buffer
|
||||||
// because that message's size is larger than fetch size, in this case
|
// because that message's size is larger than fetch size, in this case
|
||||||
// record this exception
|
// record this exception
|
||||||
this.recordTooLargePartitions.put(tp, fetchOffset);
|
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
|
||||||
|
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
|
||||||
|
+ recordTooLargePartitions
|
||||||
|
+ " whose size is larger than the fetch size "
|
||||||
|
+ this.fetchSize
|
||||||
|
+ " and hence cannot be ever returned."
|
||||||
|
+ " Increase the fetch size on the client (using max.partition.fetch.bytes),"
|
||||||
|
+ " or decrease the maximum message size the broker will allow (using message.max.bytes).",
|
||||||
|
recordTooLargePartitions);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
|
|
||||||
totalBytes += bytes;
|
|
||||||
totalCount += parsed.size();
|
|
||||||
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|
||||||
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
|
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
|
||||||
this.metadata.requestUpdate();
|
this.metadata.requestUpdate();
|
||||||
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
|
} else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
|
||||||
long fetchOffset = request.fetchData().get(tp).offset;
|
if (fetchOffset != subscriptions.position(tp)) {
|
||||||
if (subscriptions.hasDefaultOffsetResetPolicy())
|
log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
|
||||||
|
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
|
||||||
|
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
|
||||||
|
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
|
||||||
subscriptions.needOffsetReset(tp);
|
subscriptions.needOffsetReset(tp);
|
||||||
else
|
} else {
|
||||||
this.offsetOutOfRangePartitions.put(tp, fetchOffset);
|
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
|
||||||
log.info("Fetch offset {} is out of range, resetting offset", fetchOffset);
|
}
|
||||||
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
|
} else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
|
||||||
log.warn("Not authorized to read from topic {}.", tp.topic());
|
log.warn("Not authorized to read from topic {}.", tp.topic());
|
||||||
unauthorizedTopics.add(tp.topic());
|
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
|
||||||
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
|
} else if (partition.errorCode == Errors.UNKNOWN.code()) {
|
||||||
log.warn("Unknown error fetching data for topic-partition {}", tp);
|
log.warn("Unknown error fetching data for topic-partition {}", tp);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
|
throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
|
||||||
}
|
}
|
||||||
this.sensors.bytesFetched.record(totalBytes);
|
|
||||||
this.sensors.recordsFetched.record(totalCount);
|
return parsedRecords;
|
||||||
this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
|
|
||||||
this.sensors.fetchLatency.record(resp.requestLatencyMs());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse the record entry, deserializing the key / value fields if necessary
|
* Parse the record entry, deserializing the key / value fields if necessary
|
||||||
*/
|
*/
|
||||||
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
|
private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
|
||||||
|
Record record = logEntry.record();
|
||||||
|
|
||||||
|
if (this.checkCrcs && !record.isValid())
|
||||||
|
throw new InvalidRecordException("Record for partition " + partition + " at offset "
|
||||||
|
+ logEntry.offset() + " is corrupt (stored crc = " + record.checksum()
|
||||||
|
+ ", computed crc = "
|
||||||
|
+ record.computeChecksum()
|
||||||
|
+ ")");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (this.checkCrcs)
|
|
||||||
logEntry.record().ensureValid();
|
|
||||||
long offset = logEntry.offset();
|
long offset = logEntry.offset();
|
||||||
long timestamp = logEntry.record().timestamp();
|
long timestamp = record.timestamp();
|
||||||
TimestampType timestampType = logEntry.record().timestampType();
|
TimestampType timestampType = record.timestampType();
|
||||||
ByteBuffer keyBytes = logEntry.record().key();
|
ByteBuffer keyBytes = record.key();
|
||||||
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
|
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
|
||||||
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
|
K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), keyByteArray);
|
||||||
ByteBuffer valueBytes = logEntry.record().value();
|
ByteBuffer valueBytes = record.value();
|
||||||
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
|
byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
|
||||||
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
|
V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), valueByteArray);
|
||||||
|
|
||||||
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
|
return new ConsumerRecord<>(partition.topic(), partition.partition(), offset,
|
||||||
timestamp, timestampType, logEntry.record().checksum(),
|
timestamp, timestampType, record.checksum(),
|
||||||
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
|
keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length,
|
||||||
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
|
valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length,
|
||||||
key, value);
|
key, value);
|
||||||
} catch (KafkaException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
|
throw new SerializationException("Error deserializing key/value for partition " + partition +
|
||||||
|
" at offset " + logEntry.offset(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class PartitionRecords<K, V> {
|
private static class PartitionRecords<K, V> {
|
||||||
public long fetchOffset;
|
private long fetchOffset;
|
||||||
public TopicPartition partition;
|
private TopicPartition partition;
|
||||||
public List<ConsumerRecord<K, V>> records;
|
private List<ConsumerRecord<K, V>> records;
|
||||||
|
|
||||||
public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
|
public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
|
||||||
this.fetchOffset = fetchOffset;
|
this.fetchOffset = fetchOffset;
|
||||||
|
|
@ -677,7 +649,7 @@ public class Fetcher<K, V> {
|
||||||
this.records = records;
|
this.records = records;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isConsumed() {
|
private boolean isEmpty() {
|
||||||
return records == null || records.isEmpty();
|
return records == null || records.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -687,7 +659,7 @@ public class Fetcher<K, V> {
|
||||||
|
|
||||||
private List<ConsumerRecord<K, V>> take(int n) {
|
private List<ConsumerRecord<K, V>> take(int n) {
|
||||||
if (records == null)
|
if (records == null)
|
||||||
return Collections.emptyList();
|
return new ArrayList<>();
|
||||||
|
|
||||||
if (n >= records.size()) {
|
if (n >= records.size()) {
|
||||||
List<ConsumerRecord<K, V>> res = this.records;
|
List<ConsumerRecord<K, V>> res = this.records;
|
||||||
|
|
@ -709,7 +681,59 @@ public class Fetcher<K, V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FetchManagerMetrics {
|
private static class CompletedFetch {
|
||||||
|
private final TopicPartition partition;
|
||||||
|
private final long fetchedOffset;
|
||||||
|
private final FetchResponse.PartitionData partitionData;
|
||||||
|
private final FetchResponseMetricAggregator metricAggregator;
|
||||||
|
|
||||||
|
public CompletedFetch(TopicPartition partition,
|
||||||
|
long fetchedOffset,
|
||||||
|
FetchResponse.PartitionData partitionData,
|
||||||
|
FetchResponseMetricAggregator metricAggregator) {
|
||||||
|
this.partition = partition;
|
||||||
|
this.fetchedOffset = fetchedOffset;
|
||||||
|
this.partitionData = partitionData;
|
||||||
|
this.metricAggregator = metricAggregator;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since we parse the message data for each partition from each fetch response lazily, fetch-level
|
||||||
|
* metrics need to be aggregated as the messages from each partition are parsed. This class is used
|
||||||
|
* to facilitate this incremental aggregation.
|
||||||
|
*/
|
||||||
|
private static class FetchResponseMetricAggregator {
|
||||||
|
private final FetchManagerMetrics sensors;
|
||||||
|
private final Set<TopicPartition> unrecordedPartitions;
|
||||||
|
|
||||||
|
private int totalBytes;
|
||||||
|
private int totalRecords;
|
||||||
|
|
||||||
|
public FetchResponseMetricAggregator(FetchManagerMetrics sensors,
|
||||||
|
Set<TopicPartition> partitions) {
|
||||||
|
this.sensors = sensors;
|
||||||
|
this.unrecordedPartitions = partitions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After each partition is parsed, we update the current metric totals with the total bytes
|
||||||
|
* and number of records parsed. After all partitions have reported, we write the metric.
|
||||||
|
*/
|
||||||
|
public void record(TopicPartition partition, int bytes, int records) {
|
||||||
|
unrecordedPartitions.remove(partition);
|
||||||
|
totalBytes += bytes;
|
||||||
|
totalRecords += records;
|
||||||
|
|
||||||
|
if (unrecordedPartitions.isEmpty()) {
|
||||||
|
// once all expected partitions from the fetch have reported in, record the metrics
|
||||||
|
sensors.bytesFetched.record(totalBytes);
|
||||||
|
sensors.recordsFetched.record(totalRecords);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class FetchManagerMetrics {
|
||||||
public final Metrics metrics;
|
public final Metrics metrics;
|
||||||
public final String metricGrpName;
|
public final String metricGrpName;
|
||||||
|
|
||||||
|
|
@ -719,7 +743,6 @@ public class Fetcher<K, V> {
|
||||||
public final Sensor recordsFetchLag;
|
public final Sensor recordsFetchLag;
|
||||||
public final Sensor fetchThrottleTimeSensor;
|
public final Sensor fetchThrottleTimeSensor;
|
||||||
|
|
||||||
|
|
||||||
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
|
public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
|
this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
|
||||||
|
|
|
||||||
|
|
@ -242,7 +242,7 @@ public class Compressor {
|
||||||
|
|
||||||
// the following two functions also need to be public since they are used in MemoryRecords.iteration
|
// the following two functions also need to be public since they are used in MemoryRecords.iteration
|
||||||
|
|
||||||
static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
|
public static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
|
||||||
try {
|
try {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case NONE:
|
case NONE:
|
||||||
|
|
@ -271,7 +271,7 @@ public class Compressor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static public DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
|
public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
|
||||||
try {
|
try {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case NONE:
|
case NONE:
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.common.record;
|
package org.apache.kafka.common.record;
|
||||||
|
|
||||||
public class InvalidRecordException extends RuntimeException {
|
import org.apache.kafka.common.KafkaException;
|
||||||
|
|
||||||
|
public class InvalidRecordException extends KafkaException {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1;
|
private static final long serialVersionUID = 1;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.errors.InvalidTopicException;
|
import org.apache.kafka.common.errors.InvalidTopicException;
|
||||||
import org.apache.kafka.common.errors.RecordTooLargeException;
|
import org.apache.kafka.common.errors.RecordTooLargeException;
|
||||||
|
import org.apache.kafka.common.errors.SerializationException;
|
||||||
import org.apache.kafka.common.errors.TimeoutException;
|
import org.apache.kafka.common.errors.TimeoutException;
|
||||||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
|
|
@ -38,6 +39,8 @@ import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.protocol.Errors;
|
import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.protocol.types.Struct;
|
import org.apache.kafka.common.protocol.types.Struct;
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
|
import org.apache.kafka.common.record.Compressor;
|
||||||
|
import org.apache.kafka.common.record.InvalidRecordException;
|
||||||
import org.apache.kafka.common.record.MemoryRecords;
|
import org.apache.kafka.common.record.MemoryRecords;
|
||||||
import org.apache.kafka.common.record.Record;
|
import org.apache.kafka.common.record.Record;
|
||||||
import org.apache.kafka.common.requests.FetchRequest;
|
import org.apache.kafka.common.requests.FetchRequest;
|
||||||
|
|
@ -47,6 +50,7 @@ import org.apache.kafka.common.requests.ListOffsetResponse;
|
||||||
import org.apache.kafka.common.requests.MetadataRequest;
|
import org.apache.kafka.common.requests.MetadataRequest;
|
||||||
import org.apache.kafka.common.requests.MetadataResponse;
|
import org.apache.kafka.common.requests.MetadataResponse;
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.Deserializer;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
@ -128,7 +132,7 @@ public class FetcherTest {
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
records = fetcher.fetchedRecords().get(tp);
|
records = fetcher.fetchedRecords().get(tp);
|
||||||
assertEquals(3, records.size());
|
assertEquals(3, records.size());
|
||||||
assertEquals(4L, (long) subscriptions.position(tp)); // this is the next fetching position
|
assertEquals(4L, subscriptions.position(tp).longValue()); // this is the next fetching position
|
||||||
long offset = 1;
|
long offset = 1;
|
||||||
for (ConsumerRecord<byte[], byte[]> record : records) {
|
for (ConsumerRecord<byte[], byte[]> record : records) {
|
||||||
assertEquals(offset, record.offset());
|
assertEquals(offset, record.offset());
|
||||||
|
|
@ -147,9 +151,83 @@ public class FetcherTest {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchedRecordsRaisesOnSerializationErrors() {
|
||||||
|
// raise an exception from somewhere in the middle of the fetch response
|
||||||
|
// so that we can verify that our position does not advance after raising
|
||||||
|
ByteArrayDeserializer deserializer = new ByteArrayDeserializer() {
|
||||||
|
int i = 0;
|
||||||
|
@Override
|
||||||
|
public byte[] deserialize(String topic, byte[] data) {
|
||||||
|
if (i++ == 1)
|
||||||
|
throw new SerializationException();
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer);
|
||||||
|
|
||||||
|
subscriptions.assignFromUser(Collections.singleton(tp));
|
||||||
|
subscriptions.seek(tp, 1);
|
||||||
|
|
||||||
|
client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
|
||||||
|
|
||||||
|
fetcher.sendFetches();
|
||||||
|
consumerClient.poll(0);
|
||||||
|
try {
|
||||||
|
fetcher.fetchedRecords();
|
||||||
|
fail("fetchedRecords should have raised");
|
||||||
|
} catch (SerializationException e) {
|
||||||
|
// the position should not advance since no data has been returned
|
||||||
|
assertEquals(1, subscriptions.position(tp).longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseInvalidRecord() {
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
Compressor compressor = new Compressor(buffer, CompressionType.NONE);
|
||||||
|
|
||||||
|
byte[] key = "foo".getBytes();
|
||||||
|
byte[] value = "baz".getBytes();
|
||||||
|
long offset = 0;
|
||||||
|
long timestamp = 500L;
|
||||||
|
|
||||||
|
int size = Record.recordSize(key, value);
|
||||||
|
long crc = Record.computeChecksum(timestamp, key, value, CompressionType.NONE, 0, -1);
|
||||||
|
|
||||||
|
// write one valid record
|
||||||
|
compressor.putLong(offset);
|
||||||
|
compressor.putInt(size);
|
||||||
|
Record.write(compressor, crc, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
|
||||||
|
|
||||||
|
// and one invalid record (note the crc)
|
||||||
|
compressor.putLong(offset);
|
||||||
|
compressor.putInt(size);
|
||||||
|
Record.write(compressor, crc + 1, Record.computeAttributes(CompressionType.NONE), timestamp, key, value, 0, -1);
|
||||||
|
|
||||||
|
compressor.close();
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
|
subscriptions.assignFromUser(Arrays.asList(tp));
|
||||||
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
|
// normal fetch
|
||||||
|
fetcher.sendFetches();
|
||||||
|
client.prepareResponse(fetchResponse(buffer, Errors.NONE.code(), 100L, 0));
|
||||||
|
consumerClient.poll(0);
|
||||||
|
try {
|
||||||
|
fetcher.fetchedRecords();
|
||||||
|
fail("fetchedRecords should have raised");
|
||||||
|
} catch (InvalidRecordException e) {
|
||||||
|
// the position should not advance since no data has been returned
|
||||||
|
assertEquals(0, subscriptions.position(tp).longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchMaxPollRecords() {
|
public void testFetchMaxPollRecords() {
|
||||||
Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time));
|
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2);
|
||||||
|
|
||||||
List<ConsumerRecord<byte[], byte[]>> records;
|
List<ConsumerRecord<byte[], byte[]>> records;
|
||||||
subscriptions.assignFromUser(Arrays.asList(tp));
|
subscriptions.assignFromUser(Arrays.asList(tp));
|
||||||
|
|
@ -162,7 +240,7 @@ public class FetcherTest {
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
records = fetcher.fetchedRecords().get(tp);
|
records = fetcher.fetchedRecords().get(tp);
|
||||||
assertEquals(2, records.size());
|
assertEquals(2, records.size());
|
||||||
assertEquals(3L, (long) subscriptions.position(tp));
|
assertEquals(3L, subscriptions.position(tp).longValue());
|
||||||
assertEquals(1, records.get(0).offset());
|
assertEquals(1, records.get(0).offset());
|
||||||
assertEquals(2, records.get(1).offset());
|
assertEquals(2, records.get(1).offset());
|
||||||
|
|
||||||
|
|
@ -170,14 +248,14 @@ public class FetcherTest {
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
records = fetcher.fetchedRecords().get(tp);
|
records = fetcher.fetchedRecords().get(tp);
|
||||||
assertEquals(1, records.size());
|
assertEquals(1, records.size());
|
||||||
assertEquals(4L, (long) subscriptions.position(tp));
|
assertEquals(4L, subscriptions.position(tp).longValue());
|
||||||
assertEquals(3, records.get(0).offset());
|
assertEquals(3, records.get(0).offset());
|
||||||
|
|
||||||
fetcher.sendFetches();
|
fetcher.sendFetches();
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
records = fetcher.fetchedRecords().get(tp);
|
records = fetcher.fetchedRecords().get(tp);
|
||||||
assertEquals(2, records.size());
|
assertEquals(2, records.size());
|
||||||
assertEquals(6L, (long) subscriptions.position(tp));
|
assertEquals(6L, subscriptions.position(tp).longValue());
|
||||||
assertEquals(4, records.get(0).offset());
|
assertEquals(4, records.get(0).offset());
|
||||||
assertEquals(5, records.get(1).offset());
|
assertEquals(5, records.get(1).offset());
|
||||||
}
|
}
|
||||||
|
|
@ -203,7 +281,7 @@ public class FetcherTest {
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
consumerRecords = fetcher.fetchedRecords().get(tp);
|
consumerRecords = fetcher.fetchedRecords().get(tp);
|
||||||
assertEquals(3, consumerRecords.size());
|
assertEquals(3, consumerRecords.size());
|
||||||
assertEquals(31L, (long) subscriptions.position(tp)); // this is the next fetching position
|
assertEquals(31L, subscriptions.position(tp).longValue()); // this is the next fetching position
|
||||||
|
|
||||||
assertEquals(15L, consumerRecords.get(0).offset());
|
assertEquals(15L, consumerRecords.get(0).offset());
|
||||||
assertEquals(20L, consumerRecords.get(1).offset());
|
assertEquals(20L, consumerRecords.get(1).offset());
|
||||||
|
|
@ -318,11 +396,27 @@ public class FetcherTest {
|
||||||
fetcher.sendFetches();
|
fetcher.sendFetches();
|
||||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
assertTrue(subscriptions.isOffsetResetNeeded(tp));
|
|
||||||
assertEquals(0, fetcher.fetchedRecords().size());
|
assertEquals(0, fetcher.fetchedRecords().size());
|
||||||
|
assertTrue(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertEquals(null, subscriptions.position(tp));
|
assertEquals(null, subscriptions.position(tp));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStaleOutOfRangeError() {
|
||||||
|
// verify that an out of range error which arrives after a seek
|
||||||
|
// does not cause us to reset our position or throw an exception
|
||||||
|
subscriptions.assignFromUser(Arrays.asList(tp));
|
||||||
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
|
fetcher.sendFetches();
|
||||||
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
|
||||||
|
subscriptions.seek(tp, 1);
|
||||||
|
consumerClient.poll(0);
|
||||||
|
assertEquals(0, fetcher.fetchedRecords().size());
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertEquals(1, subscriptions.position(tp).longValue());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchedRecordsAfterSeek() {
|
public void testFetchedRecordsAfterSeek() {
|
||||||
subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
|
subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
|
||||||
|
|
@ -368,7 +462,7 @@ public class FetcherTest {
|
||||||
// disconnects should have no affect on subscription state
|
// disconnects should have no affect on subscription state
|
||||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(0, (long) subscriptions.position(tp));
|
assertEquals(0, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -380,7 +474,7 @@ public class FetcherTest {
|
||||||
|
|
||||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(5, (long) subscriptions.position(tp));
|
assertEquals(5, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -393,7 +487,7 @@ public class FetcherTest {
|
||||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(5, (long) subscriptions.position(tp));
|
assertEquals(5, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -406,7 +500,7 @@ public class FetcherTest {
|
||||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(5, (long) subscriptions.position(tp));
|
assertEquals(5, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -419,7 +513,7 @@ public class FetcherTest {
|
||||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(5, (long) subscriptions.position(tp));
|
assertEquals(5, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -437,7 +531,7 @@ public class FetcherTest {
|
||||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
assertTrue(subscriptions.isFetchable(tp));
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
assertEquals(5, (long) subscriptions.position(tp));
|
assertEquals(5, subscriptions.position(tp).longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -575,17 +669,36 @@ public class FetcherTest {
|
||||||
return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
|
return new MetadataResponse(cluster.nodes(), MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
|
||||||
}
|
}
|
||||||
|
|
||||||
private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
|
private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions,
|
||||||
SubscriptionState subscriptions,
|
Metrics metrics,
|
||||||
Metrics metrics) {
|
int maxPollRecords) {
|
||||||
|
return createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), maxPollRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
|
||||||
|
return createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
|
||||||
|
Metrics metrics,
|
||||||
|
Deserializer<K> keyDeserializer,
|
||||||
|
Deserializer<V> valueDeserializer) {
|
||||||
|
return createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
|
||||||
|
Metrics metrics,
|
||||||
|
Deserializer<K> keyDeserializer,
|
||||||
|
Deserializer<V> valueDeserializer,
|
||||||
|
int maxPollRecords) {
|
||||||
return new Fetcher<>(consumerClient,
|
return new Fetcher<>(consumerClient,
|
||||||
minBytes,
|
minBytes,
|
||||||
maxWaitMs,
|
maxWaitMs,
|
||||||
fetchSize,
|
fetchSize,
|
||||||
maxPollRecords,
|
maxPollRecords,
|
||||||
true, // check crc
|
true, // check crc
|
||||||
new ByteArrayDeserializer(),
|
keyDeserializer,
|
||||||
new ByteArrayDeserializer(),
|
valueDeserializer,
|
||||||
metadata,
|
metadata,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
metrics,
|
metrics,
|
||||||
|
|
@ -594,8 +707,4 @@ public class FetcherTest {
|
||||||
retryBackoffMs);
|
retryBackoffMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
|
|
||||||
return createFetcher(Integer.MAX_VALUE, subscriptions, metrics);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue