Merged in upstream trunk

This commit is contained in:
Geoff Anderson 2015-08-06 23:08:55 -07:00
commit af67e01bf0
3 changed files with 169 additions and 17 deletions

View File

@ -61,8 +61,8 @@ import java.util.Set;
* This class manage the fetching process with the brokers.
*/
public class Fetcher<K, V> {
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
public static final long LATEST_OFFSET_TIMESTAMP = -1L;
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);

View File

@ -34,15 +34,24 @@ import org.apache.kafka.common.utils.Time;
* A mock network client for use testing code
*/
public class MockClient implements KafkaClient {
public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
return true;
}
};
private class FutureResponse {
public final Struct responseBody;
public final boolean disconnected;
public final RequestMatcher requestMatcher;
public FutureResponse(Struct responseBody, boolean disconnected) {
public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
this.responseBody = responseBody;
this.disconnected = disconnected;
this.requestMatcher = requestMatcher;
}
}
private final Time time;
@ -94,6 +103,9 @@ public class MockClient implements KafkaClient {
public void send(ClientRequest request) {
if (!futureResponses.isEmpty()) {
FutureResponse futureResp = futureResponses.poll();
if (!futureResp.requestMatcher.matches(request))
throw new IllegalStateException("Next in line response did not match expected request");
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
responses.add(resp);
} else {
@ -141,11 +153,32 @@ public class MockClient implements KafkaClient {
}
public void prepareResponse(Struct body) {
prepareResponse(body, false);
prepareResponse(ALWAYS_TRUE, body, false);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link #send(ClientRequest)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
*/
public void prepareResponse(RequestMatcher matcher, Struct body) {
prepareResponse(matcher, body, false);
}
public void prepareResponse(Struct body, boolean disconnected) {
futureResponses.add(new FutureResponse(body, disconnected));
prepareResponse(ALWAYS_TRUE, body, disconnected);
}
/**
* Prepare a response for a request matching the provided matcher. If the matcher does not
* match, {@link #send(ClientRequest)} will throw IllegalStateException
* @param matcher The matcher to apply
* @param body The response body
* @param disconnected Whether the request was disconnected
*/
public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
futureResponses.add(new FutureResponse(body, disconnected, matcher));
}
public void setNode(Node node) {
@ -180,4 +213,14 @@ public class MockClient implements KafkaClient {
return this.node;
}
/**
* The RequestMatcher provides a way to match a particular request to a response prepared
* through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
* to inspect the request body for the type of the request or for specific fields that should be set,
* and to fail the test if it doesn't match.
*/
public interface RequestMatcher {
boolean matches(ClientRequest request);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -30,6 +31,8 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
@ -40,11 +43,13 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -158,26 +163,34 @@ public class FetcherTest {
}
@Test
public void testFetchFailed() {
public void testFetchNotLeaderForPartition() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
// fetch with not leader
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
}
@Test
public void testFetchUnknownTopicOrPartition() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
// fetch with unknown topic partition
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
}
@Test
public void testFetchOffsetOutOfRange() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 0);
// fetch with out of range
subscriptions.fetched(tp, 5);
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
consumerClient.poll(0);
@ -188,18 +201,94 @@ public class FetcherTest {
}
@Test
public void testFetchOutOfRange() {
public void testFetchDisconnected() {
subscriptions.subscribe(tp);
subscriptions.seek(tp, 5);
subscriptions.seek(tp, 0);
// fetch with out of range
fetcher.initFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(null, subscriptions.fetched(tp));
assertEquals(null, subscriptions.consumed(tp));
// disconnects should have no affect on subscription state
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(0, (long) subscriptions.fetched(tp));
assertEquals(0, (long) subscriptions.consumed(tp));
}
@Test
public void testUpdateFetchPositionToCommitted() {
// unless a specific reset is expected, the default behavior is to reset to the committed
// position if one is present
subscriptions.subscribe(tp);
subscriptions.committed(tp, 5);
fetcher.updateFetchPositions(Collections.singleton(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
public void testUpdateFetchPositionResetToDefaultOffset() {
subscriptions.subscribe(tp);
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
public void testUpdateFetchPositionResetToLatestOffset() {
subscriptions.subscribe(tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
public void testUpdateFetchPositionResetToEarliestOffset() {
subscriptions.subscribe(tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
public void testUpdateFetchPositionDisconnect() {
subscriptions.subscribe(tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
// First request gets a disconnect
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
// Next one succeeds
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
fetcher.updateFetchPositions(Collections.singleton(tp));
assertFalse(subscriptions.isOffsetResetNeeded(tp));
assertTrue(subscriptions.isFetchable(tp));
assertEquals(5, (long) subscriptions.fetched(tp));
assertEquals(5, (long) subscriptions.consumed(tp));
}
@Test
@ -213,6 +302,26 @@ public class FetcherTest {
assertEquals(cluster.topics().size(), allTopics.size());
}
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
// matches any list offset request with the provided timestamp
return new MockClient.RequestMatcher() {
@Override
public boolean matches(ClientRequest request) {
ListOffsetRequest req = new ListOffsetRequest(request.request().body());
ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp);
return partitionData != null && partitionData.timestamp == timestamp;
}
};
}
private Struct listOffsetResponse(Errors error, List<Long> offsets) {
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets);
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
allPartitionData.put(tp, partitionData);
ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
return response.toStruct();
}
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
return response.toStruct();