KAFKA-4194; Follow-up improvements/testing for ListOffsets v1 (KIP-79)

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #1897 from becketqin/KAFKA-4194
This commit is contained in:
Jiangjie Qin 2016-09-28 17:45:08 -07:00 committed by Jason Gustafson
parent 71036527e9
commit aa506a6919
22 changed files with 265 additions and 105 deletions

View File

@ -16,7 +16,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.OffsetAndTimestamp;
import java.io.Closeable;
import java.util.Collection;

View File

@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.OffsetAndTimestamp;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
@ -1419,38 +1418,46 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.getOffsetsByTimes(timestampsToSearch);
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
/**
* Get the earliest available offsets for the given partitions.
*
* Get the first offset for the given partitions.
* <p>
* Notice that this method may block indefinitely if the partition does not exist.
* This method does not change the current consumer position of the partitions.
*
* @see #seekToBeginning(Collection)
*
* @param partitions the partitions to get the earliest offsets.
* @return The earliest available offsets for the given partitions
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
return fetcher.earliestOffsets(partitions);
return fetcher.beginningOffsets(partitions, requestTimeoutMs);
}
/**
* Get the end offsets for the given partitions. The end offset of a partition is the offset of the upcoming
* Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming
* message, i.e. the offset of the last available message + 1.
*
* <p>
* Notice that this method may block indefinitely if the partition does not exist.
* This method does not change the current consumer position of the partitions.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return fetcher.latestOffsets(partitions);
return fetcher.endOffsets(partitions, requestTimeoutMs);
}
/**

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.record.OffsetAndTimestamp;
import java.util.ArrayList;
import java.util.Collection;
@ -301,7 +300,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
return null;
throw new UnsupportedOperationException("Not implemented yet.");
}
@Override

View File

@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;
package org.apache.kafka.clients.consumer;
import java.util.Objects;
import org.apache.kafka.common.utils.Utils;
/**
* A container class for offset and timestamp.
*
* Both offset and timestamp are non-negative.
*/
public final class OffsetAndTimestamp {
private final long timestamp;
@ -40,12 +42,12 @@ public final class OffsetAndTimestamp {
@Override
public String toString() {
return "{Timestamp = " + timestamp + ", Offset = " + offset + "}";
return "{timestamp=" + timestamp + ", offset=" + offset + "}";
}
@Override
public int hashCode() {
return Objects.hash(timestamp, offset);
return 31 * Utils.longHashcode(timestamp) + Utils.longHashcode(offset);
}
@Override

View File

@ -132,10 +132,21 @@ public class ConsumerNetworkClient implements Closeable {
* Block until the metadata has been refreshed.
*/
public void awaitMetadataUpdate() {
awaitMetadataUpdate(Long.MAX_VALUE);
}
/**
* Block waiting on the metadata refresh with a timeout.
*
* @return true if update succeeded, false otherwise.
*/
public boolean awaitMetadataUpdate(long timeout) {
long startMs = time.milliseconds();
int version = this.metadata.requestUpdate();
do {
poll(Long.MAX_VALUE);
} while (this.metadata.version() == version);
poll(timeout);
} while (this.metadata.version() == version && time.milliseconds() - startMs < timeout);
return this.metadata.version() > version;
}
/**

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
@ -353,17 +353,23 @@ public class Fetcher<K, V> {
throw new NoOffsetForPartitionException(partition);
log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT));
long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp)).get(partition).offset();
long offset = getOffsetsByTimes(Collections.singletonMap(partition, timestamp), Long.MAX_VALUE).get(partition).offset();
// we might lose the assignment while fetching the offset, so check it is still active
if (subscriptions.isAssigned(partition))
this.subscriptions.seek(partition, offset);
}
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch) {
while (true) {
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
long timeout) {
long startMs = time.milliseconds();
long remaining = timeout;
do {
RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future = sendListOffsetRequests(timestampsToSearch);
client.poll(future);
client.poll(future, remaining);
if (!future.isDone())
break;
if (future.succeeded())
return future.value();
@ -371,27 +377,38 @@ public class Fetcher<K, V> {
if (!future.isRetriable())
throw future.exception();
long elapsed = time.milliseconds() - startMs;
remaining = timeout - elapsed;
if (remaining <= 0)
break;
if (future.exception() instanceof InvalidMetadataException)
client.awaitMetadataUpdate();
client.awaitMetadataUpdate(remaining);
else
time.sleep(retryBackoffMs);
}
time.sleep(Math.min(remaining, retryBackoffMs));
elapsed = time.milliseconds() - startMs;
remaining = timeout - elapsed;
} while (remaining > 0);
throw new TimeoutException("Failed to get offsets by times in " + timeout + " ms");
}
public Map<TopicPartition, Long> earliestOffsets(Collection<TopicPartition> partitions) {
return earliestOrLatestOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP);
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, long timeout) {
return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP, timeout);
}
public Map<TopicPartition, Long> latestOffsets(Collection<TopicPartition> partitions) {
return earliestOrLatestOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP);
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, long timeout) {
return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timeout);
}
private Map<TopicPartition, Long> earliestOrLatestOffset(Collection<TopicPartition> partitions, long timestamp) {
private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
long timestamp,
long timeout) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition tp : partitions)
timestampsToSearch.put(tp, timestamp);
Map<TopicPartition, Long> result = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : getOffsetsByTimes(timestampsToSearch).entrySet())
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : getOffsetsByTimes(timestampsToSearch, timeout).entrySet())
result.put(entry.getKey(), entry.getValue().offset());
return result;
@ -502,6 +519,7 @@ public class Fetcher<K, V> {
final RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> listOffsetRequestsFuture = new RequestFuture<>();
final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets = new HashMap<>();
final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
sendListOffsetRequest(entry.getKey(), entry.getValue())
.addListener(new RequestFutureListener<Map<TopicPartition, OffsetAndTimestamp>>() {
@ -509,7 +527,7 @@ public class Fetcher<K, V> {
public void onSuccess(Map<TopicPartition, OffsetAndTimestamp> value) {
synchronized (listOffsetRequestsFuture) {
fetchedTimestampOffsets.putAll(value);
if (fetchedTimestampOffsets.size() == timestampsToSearch.size() && !listOffsetRequestsFuture.isDone())
if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone())
listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
}
}
@ -534,7 +552,7 @@ public class Fetcher<K, V> {
* @param timestampsToSearch The mapping from partitions to the target timestamps.
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequest(Node node,
private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequest(final Node node,
final Map<TopicPartition, Long> timestampsToSearch) {
ListOffsetRequest request = new ListOffsetRequest(timestampsToSearch, ListOffsetRequest.CONSUMER_REPLICA_ID);
log.trace("Sending ListOffsetRequest {} to broker {}", request, node);
@ -542,7 +560,9 @@ public class Fetcher<K, V> {
.compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
handleListOffsetResponse(timestampsToSearch, response, future);
ListOffsetResponse lor = new ListOffsetResponse(response.responseBody());
log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
handleListOffsetResponse(timestampsToSearch, lor, future);
}
});
}
@ -550,17 +570,16 @@ public class Fetcher<K, V> {
/**
* Callback for the response of the list offset call above.
* @param timestampsToSearch The mapping from partitions to target timestamps
* @param clientResponse The response from the server.
* @param listOffsetResponse The response from the server.
* @param future The future to be completed by the response.
*/
private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
ClientResponse clientResponse,
ListOffsetResponse listOffsetResponse,
RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
Map<TopicPartition, OffsetAndTimestamp> timestampOffsetMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
TopicPartition topicPartition = entry.getKey();
ListOffsetResponse.PartitionData partitionData = lor.responseData().get(topicPartition);
ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition);
Errors error = Errors.forCode(partitionData.errorCode);
if (error == Errors.NONE) {
OffsetAndTimestamp offsetAndTimestamp = null;
@ -568,7 +587,7 @@ public class Fetcher<K, V> {
offsetAndTimestamp = new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp);
log.debug("Fetched {} for partition {}", offsetAndTimestamp, topicPartition);
timestampOffsetMap.put(topicPartition, offsetAndTimestamp);
} else if (error == Errors.INVALID_REQUEST) {
} else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
// The message format on the broker side is before 0.10.0, we simply put null in the response.
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.errors;
/**
* The message format version does not support the requested function.
*/
public class UnsupportedForMessageFormatException extends ApiException {
private static final long serialVersionUID = 1L;
public UnsupportedForMessageFormatException(String message) {
super(message);
}
public UnsupportedForMessageFormatException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -43,6 +43,7 @@ import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
@ -161,7 +162,9 @@ public enum Errors {
new NotControllerException("This is not the correct controller for this cluster.")),
INVALID_REQUEST(42,
new InvalidRequestException("This most likely occurs because of a request being malformed by the client library or" +
" the message was sent to an incompatible broker. See the broker logs for more details."));
" the message was sent to an incompatible broker. See the broker logs for more details.")),
UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
new UnsupportedForMessageFormatException("The message format version on the broker does not support the request."));
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -425,7 +425,7 @@ public class Protocol {
"The timestamp associated with the returned offset"),
new Field("offset",
INT64,
"offsets found"));
"offset found"));
public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
new Field("partition_responses",

View File

@ -92,7 +92,7 @@ public class ListOffsetRequest extends AbstractRequest {
/**
* Constructor for ListOffsetRequest v1.
*/
public ListOffsetRequest(Map<TopicPartition, ?> targetTimes, int replicaId) {
public ListOffsetRequest(Map<TopicPartition, Long> targetTimes, int replicaId) {
this(replicaId, targetTimes, 1);
}
@ -165,6 +165,7 @@ public class ListOffsetRequest extends AbstractRequest {
}
@Override
@SuppressWarnings("deprecation")
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();

View File

@ -49,7 +49,7 @@ public class ListOffsetResponse extends AbstractRequestResponse {
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* INVALID_REQUEST (42)
* UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
* UNKNOWN (-1)
*/

View File

@ -728,4 +728,8 @@ public class Utils {
public static int toPositive(int number) {
return number & 0x7fffffff;
}
public static int longHashcode(long value) {
return (int) (value ^ (value >>> 32));
}
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
@ -59,6 +60,7 @@ public class MockClient implements KafkaClient {
}
private final Time time;
private final Metadata metadata;
private int correlation = 0;
private Node node = null;
private final Set<String> ready = new HashSet<>();
@ -66,9 +68,16 @@ public class MockClient implements KafkaClient {
private final Queue<ClientRequest> requests = new ArrayDeque<>();
private final Queue<ClientResponse> responses = new ArrayDeque<>();
private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
private final Queue<Cluster> metadataUpdates = new ArrayDeque<>();
public MockClient(Time time) {
this.time = time;
this.metadata = null;
}
public MockClient(Time time, Metadata metadata) {
this.time = time;
this.metadata = metadata;
}
@Override
@ -149,6 +158,14 @@ public class MockClient implements KafkaClient {
public List<ClientResponse> poll(long timeoutMs, long now) {
List<ClientResponse> copy = new ArrayList<>(this.responses);
if (metadata != null && metadata.updateRequested()) {
Cluster cluster = metadataUpdates.poll();
if (cluster == null)
metadata.update(metadata.fetch(), time.milliseconds());
else
metadata.update(cluster, time.milliseconds());
}
while (!this.responses.isEmpty()) {
ClientResponse response = this.responses.poll();
if (response.request().hasCallback())
@ -233,6 +250,19 @@ public class MockClient implements KafkaClient {
futureResponses.add(new FutureResponse(body, disconnected, matcher, node));
}
public void reset() {
ready.clear();
blackedOut.clear();
requests.clear();
responses.clear();
futureResponses.clear();
metadataUpdates.clear();
}
public void prepareMetadataUpdate(Cluster cluster) {
metadataUpdates.add(cluster);
}
public void setNode(Node node) {
this.node = node;
}

View File

@ -40,7 +40,7 @@ import static org.junit.Assert.fail;
public class ConsumerNetworkClientTest {
private String topicName = "test";
private MockTime time = new MockTime();
private MockTime time = new MockTime(1);
private MockClient client = new MockClient(time);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
@ -132,6 +132,11 @@ public class ConsumerNetworkClientTest {
assertTrue(future.isDone());
}
@Test
public void testAwaitForMetadataUpdateWithTimeout() {
assertFalse(consumerClient.awaitMetadataUpdate(10L));
}
@Test
public void sendExpiry() throws InterruptedException {
long unsentExpiryMs = 10;

View File

@ -41,7 +41,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Compressor;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
@ -84,8 +84,8 @@ public class FetcherTest {
private int fetchSize = 1000;
private long retryBackoffMs = 100;
private MockTime time = new MockTime(1);
private MockClient client = new MockClient(time);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
private MockClient client = new MockClient(time, metadata);
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
@ -637,14 +637,70 @@ public class FetcherTest {
}
@Test
public void testGetOffsetsByTimes() {
client.prepareResponseFrom(listOffsetResponse(Errors.NONE, 100L, 100L), cluster.leaderFor(tp));
public void testGetOffsetsForTimesTimeout() {
try {
fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(topicName, 2), 1000L), 100L);
fail("Should throw timeout exception.");
} catch (TimeoutException e) {
// let it go.
}
}
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
fetcher.getOffsetsByTimes(Collections.singletonMap(tp, 0L));
assertEquals(offsetAndTimestampMap.get(tp).timestamp(), 100L);
assertEquals(offsetAndTimestampMap.get(tp).offset(), 100L);
@Test
public void testGetOffsetsForTimes() {
// Error code none with unknown offset
testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
// Error code none with known offset
testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
// Test both of partition has error.
testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
// Test the second partition has error.
testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
// Test different errors.
testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
}
private void testGetOffsetsForTimesWithError(Errors errorForTp0,
Errors errorForTp1,
long offsetForTp0,
long offsetForTp1,
Long expectedOffsetForTp0,
Long expectedOffsetForTp1) {
client.reset();
TopicPartition tp0 = tp;
TopicPartition tp1 = new TopicPartition(topicName, 1);
// Ensure metadata has both partition.
Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
metadata.update(cluster, time.milliseconds());
// First try should fail due to metadata error.
client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
// Second try should succeed.
client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, 0L);
timestampToSearch.put(tp1, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
if (expectedOffsetForTp0 == null)
assertNull(offsetAndTimestampMap.get(tp0));
else {
assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).timestamp());
assertEquals(expectedOffsetForTp0.longValue(), offsetAndTimestampMap.get(tp0).offset());
}
if (expectedOffsetForTp1 == null)
assertNull(offsetAndTimestampMap.get(tp1));
else {
assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).timestamp());
assertEquals(expectedOffsetForTp1.longValue(), offsetAndTimestampMap.get(tp1).offset());
}
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
@ -659,6 +715,10 @@ public class FetcherTest {
}
private Struct listOffsetResponse(Errors error, long timestamp, long offset) {
return listOffsetResponse(tp, error, timestamp, offset);
}
private Struct listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);

View File

@ -257,6 +257,7 @@ public class RequestResponseTest {
return new HeartbeatResponse(Errors.NONE.code());
}
@SuppressWarnings("deprecation")
private AbstractRequest createJoinGroupRequest(int version) {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
@ -307,6 +308,7 @@ public class RequestResponseTest {
return new LeaveGroupResponse(Errors.NONE.code());
}
@SuppressWarnings("deprecation")
private AbstractRequest createListOffsetRequest(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
@ -321,6 +323,7 @@ public class RequestResponseTest {
}
}
@SuppressWarnings("deprecation")
private AbstractRequestResponse createListOffsetResponse(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
@ -353,6 +356,7 @@ public class RequestResponseTest {
return new MetadataResponse(Arrays.asList(node), null, MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
}
@SuppressWarnings("deprecation")
private AbstractRequest createOffsetCommitRequest(int version) {
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));

View File

@ -164,11 +164,9 @@ class FileMessageSet private[kafka](@volatile var file: File,
* @return The timestamp and offset of the message found. None, if no message is found.
*/
def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = {
var lastOffsetChecked = -1L
val messagesToSearch = read(startingPosition, sizeInBytes)
for (messageAndOffset <- messagesToSearch) {
val message = messageAndOffset.message
lastOffsetChecked = messageAndOffset.offset
if (message.timestamp >= targetTimestamp) {
// We found a message
message.compressionCodec match {

View File

@ -28,7 +28,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
import org.apache.kafka.common.errors.{InvalidRequestException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
import org.apache.kafka.common.errors.{UnsupportedForMessageFormatException, CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetRequest
@ -594,7 +594,7 @@ class Log(val dir: File,
if (config.messageFormatVersion < KAFKA_0_10_0_IV0 &&
targetTimestamp != ListOffsetRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetRequest.LATEST_TIMESTAMP)
throw new InvalidRequestException(s"Cannot search offsets based on timestamp because message format version " +
throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
s"for partition $topicAndPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
s"required version $KAFKA_0_10_0_IV0")
@ -615,10 +615,7 @@ class Log(val dir: File,
None
}
targetSeg match {
case Some(segment) => segment.findOffsetByTimestamp(targetTimestamp)
case None => None
}
targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
}
/**

View File

@ -387,7 +387,7 @@ class LogSegment(val log: FileMessageSet,
* the truncated log and maybe retry or even do the search on another log segment.
*
* @param timestamp The timestamp to search for.
* @return the timestamp and offset of the first message whose timestamp is larger than or equals to the
* @return the timestamp and offset of the first message whose timestamp is larger than or equal to the
* target timestamp. None will be returned if there is no such message.
*/
def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {

View File

@ -36,10 +36,9 @@ import kafka.network.RequestChannel.{Response, Session}
import kafka.security.auth
import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Write, Delete}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.common.errors.{InvalidRequestException, ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
import org.apache.kafka.common.requests.ListOffsetResponse.PartitionData
import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.Utils
@ -555,10 +554,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
)
val responseMap = authorizedRequestInfo.map({case (topicPartition, partitionData) =>
val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
try {
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
@ -592,7 +591,7 @@ class KafkaApis(val requestChannel: RequestChannel,
error("Error while responding to offset request", e)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
}
})
}
responseMap ++ unauthorizedResponseStatus
}
@ -606,68 +605,62 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
new PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET)
})
val responseMap = authorizedRequestInfo.map({case (topicPartition, timestamp) =>
val responseMap = authorizedRequestInfo.map { case (topicPartition, timestamp) =>
if (offsetRequest.duplicatePartitions().contains(topicPartition)) {
debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
(topicPartition, new PartitionData(Errors.INVALID_REQUEST.code(),
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code(),
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
} else {
try {
val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
// ensure leader exists
val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
else
replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
val found = {
fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
case Some(timestampOffset) =>
// The request is not from a consumer client
if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID)
timestampOffset
// The request is from a consumer client
else {
// the found offset is smaller or equals to the high watermark
if (timestampOffset.offset <= localReplica.highWatermark.messageOffset)
timestampOffset
// the consumer wants the latest offset.
else if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset)
// The found offset is higher than the high watermark and the consumer is not asking for the end offset.
else
TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
}
case None =>
TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
val found = {
if (fromConsumer && timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
TimestampOffset(Message.NoTimestamp, localReplica.highWatermark.messageOffset)
else {
def allowed(timestampOffset: TimestampOffset): Boolean =
!fromConsumer || timestampOffset.offset <= localReplica.highWatermark.messageOffset
fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
}
}
}
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset))
} catch {
// NOTE: These exceptions are special cased since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
_ : NotLeaderForPartitionException |
_ : InvalidRequestException) =>
_ : UnsupportedForMessageFormatException) =>
debug(s"Offset request with correlation id $correlationId from client $clientId on " +
s"partition $topicPartition failed due to ${e.getMessage}")
(topicPartition, new PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
case e: Throwable =>
error("Error while responding to offset request", e)
(topicPartition, new PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
(topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET))
}
}
})
}
responseMap ++ unauthorizedResponseStatus
}
@ -687,7 +680,7 @@ class KafkaApis(val requestChannel: RequestChannel,
logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition)) match {
case Some(log) =>
log.fetchOffsetsByTimestamp(timestamp)
case _ =>
case None =>
throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.")
}
}

View File

@ -264,7 +264,7 @@ class ReplicaFetcherThread(name: String,
private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = {
val (request, apiVersion) =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
val partitions = Map(topicPartition -> earliestOrLatest)
val partitions = Map(topicPartition -> java.lang.Long.valueOf(earliestOrLatest))
(new ListOffsetRequest(partitions.asJava, consumerId), 1)
} else {
val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1))

View File

@ -52,7 +52,6 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
<li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
<li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
<li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
</ul>
<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
@ -64,7 +63,7 @@ Note: Because new protocols are introduced, it is important to upgrade your Kafk
removed in a future major release. </li>
<li> Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface. </li>
<li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
<li> The new Java Consumer now allows users to search offsets by timestamp on partitions.
<li> The new Java Consumer now allows users to search offsets by timestamp on partitions. </li>
</ul>
<h5><a id="upgrade_1010_new_protocols" href="#upgrade_1010_new_protocols">New Protocol Versions</a></h5>