mirror of https://github.com/apache/kafka.git
Merged in upstream trunk
This commit is contained in:
commit
af67e01bf0
|
@ -61,8 +61,8 @@ import java.util.Set;
|
|||
* This class manage the fetching process with the brokers.
|
||||
*/
|
||||
public class Fetcher<K, V> {
|
||||
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
|
||||
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
|
||||
public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
|
||||
public static final long LATEST_OFFSET_TIMESTAMP = -1L;
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
|
||||
|
||||
|
|
|
@ -34,15 +34,24 @@ import org.apache.kafka.common.utils.Time;
|
|||
* A mock network client for use testing code
|
||||
*/
|
||||
public class MockClient implements KafkaClient {
|
||||
public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
|
||||
@Override
|
||||
public boolean matches(ClientRequest request) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
private class FutureResponse {
|
||||
public final Struct responseBody;
|
||||
public final boolean disconnected;
|
||||
public final RequestMatcher requestMatcher;
|
||||
|
||||
public FutureResponse(Struct responseBody, boolean disconnected) {
|
||||
public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
|
||||
this.responseBody = responseBody;
|
||||
this.disconnected = disconnected;
|
||||
this.requestMatcher = requestMatcher;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private final Time time;
|
||||
|
@ -94,6 +103,9 @@ public class MockClient implements KafkaClient {
|
|||
public void send(ClientRequest request) {
|
||||
if (!futureResponses.isEmpty()) {
|
||||
FutureResponse futureResp = futureResponses.poll();
|
||||
if (!futureResp.requestMatcher.matches(request))
|
||||
throw new IllegalStateException("Next in line response did not match expected request");
|
||||
|
||||
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
|
||||
responses.add(resp);
|
||||
} else {
|
||||
|
@ -141,11 +153,32 @@ public class MockClient implements KafkaClient {
|
|||
}
|
||||
|
||||
public void prepareResponse(Struct body) {
|
||||
prepareResponse(body, false);
|
||||
prepareResponse(ALWAYS_TRUE, body, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare a response for a request matching the provided matcher. If the matcher does not
|
||||
* match, {@link #send(ClientRequest)} will throw IllegalStateException
|
||||
* @param matcher The matcher to apply
|
||||
* @param body The response body
|
||||
*/
|
||||
public void prepareResponse(RequestMatcher matcher, Struct body) {
|
||||
prepareResponse(matcher, body, false);
|
||||
}
|
||||
|
||||
public void prepareResponse(Struct body, boolean disconnected) {
|
||||
futureResponses.add(new FutureResponse(body, disconnected));
|
||||
prepareResponse(ALWAYS_TRUE, body, disconnected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare a response for a request matching the provided matcher. If the matcher does not
|
||||
* match, {@link #send(ClientRequest)} will throw IllegalStateException
|
||||
* @param matcher The matcher to apply
|
||||
* @param body The response body
|
||||
* @param disconnected Whether the request was disconnected
|
||||
*/
|
||||
public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
|
||||
futureResponses.add(new FutureResponse(body, disconnected, matcher));
|
||||
}
|
||||
|
||||
public void setNode(Node node) {
|
||||
|
@ -180,4 +213,14 @@ public class MockClient implements KafkaClient {
|
|||
return this.node;
|
||||
}
|
||||
|
||||
/**
|
||||
* The RequestMatcher provides a way to match a particular request to a response prepared
|
||||
* through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
|
||||
* to inspect the request body for the type of the request or for specific fields that should be set,
|
||||
* and to fail the test if it doesn't match.
|
||||
*/
|
||||
public interface RequestMatcher {
|
||||
boolean matches(ClientRequest request);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.clients.ClientRequest;
|
||||
import org.apache.kafka.clients.Metadata;
|
||||
import org.apache.kafka.clients.MockClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -30,6 +31,8 @@ import org.apache.kafka.common.protocol.types.Struct;
|
|||
import org.apache.kafka.common.record.CompressionType;
|
||||
import org.apache.kafka.common.record.MemoryRecords;
|
||||
import org.apache.kafka.common.requests.FetchResponse;
|
||||
import org.apache.kafka.common.requests.ListOffsetRequest;
|
||||
import org.apache.kafka.common.requests.ListOffsetResponse;
|
||||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
|
@ -40,11 +43,13 @@ import org.junit.Test;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -158,26 +163,34 @@ public class FetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFetchFailed() {
|
||||
public void testFetchNotLeaderForPartition() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.seek(tp, 0);
|
||||
|
||||
// fetch with not leader
|
||||
fetcher.initFetches(cluster);
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
|
||||
consumerClient.poll(0);
|
||||
assertEquals(0, fetcher.fetchedRecords().size());
|
||||
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchUnknownTopicOrPartition() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.seek(tp, 0);
|
||||
|
||||
// fetch with unknown topic partition
|
||||
fetcher.initFetches(cluster);
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
|
||||
consumerClient.poll(0);
|
||||
assertEquals(0, fetcher.fetchedRecords().size());
|
||||
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchOffsetOutOfRange() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.seek(tp, 0);
|
||||
|
||||
// fetch with out of range
|
||||
subscriptions.fetched(tp, 5);
|
||||
fetcher.initFetches(cluster);
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
|
||||
consumerClient.poll(0);
|
||||
|
@ -188,18 +201,94 @@ public class FetcherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testFetchOutOfRange() {
|
||||
public void testFetchDisconnected() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.seek(tp, 5);
|
||||
subscriptions.seek(tp, 0);
|
||||
|
||||
// fetch with out of range
|
||||
fetcher.initFetches(cluster);
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
|
||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
|
||||
consumerClient.poll(0);
|
||||
assertTrue(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertEquals(0, fetcher.fetchedRecords().size());
|
||||
assertEquals(null, subscriptions.fetched(tp));
|
||||
assertEquals(null, subscriptions.consumed(tp));
|
||||
|
||||
// disconnects should have no affect on subscription state
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(0, (long) subscriptions.fetched(tp));
|
||||
assertEquals(0, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFetchPositionToCommitted() {
|
||||
// unless a specific reset is expected, the default behavior is to reset to the committed
|
||||
// position if one is present
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.committed(tp, 5);
|
||||
|
||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFetchPositionResetToDefaultOffset() {
|
||||
subscriptions.subscribe(tp);
|
||||
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
|
||||
|
||||
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
|
||||
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFetchPositionResetToLatestOffset() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
|
||||
|
||||
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFetchPositionResetToEarliestOffset() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
|
||||
|
||||
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
|
||||
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFetchPositionDisconnect() {
|
||||
subscriptions.subscribe(tp);
|
||||
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
|
||||
|
||||
// First request gets a disconnect
|
||||
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||
listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
|
||||
|
||||
// Next one succeeds
|
||||
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||
assertTrue(subscriptions.isFetchable(tp));
|
||||
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -213,6 +302,26 @@ public class FetcherTest {
|
|||
assertEquals(cluster.topics().size(), allTopics.size());
|
||||
}
|
||||
|
||||
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
|
||||
// matches any list offset request with the provided timestamp
|
||||
return new MockClient.RequestMatcher() {
|
||||
@Override
|
||||
public boolean matches(ClientRequest request) {
|
||||
ListOffsetRequest req = new ListOffsetRequest(request.request().body());
|
||||
ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp);
|
||||
return partitionData != null && partitionData.timestamp == timestamp;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private Struct listOffsetResponse(Errors error, List<Long> offsets) {
|
||||
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets);
|
||||
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
|
||||
allPartitionData.put(tp, partitionData);
|
||||
ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
|
||||
return response.toStruct();
|
||||
}
|
||||
|
||||
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
|
||||
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
|
||||
return response.toStruct();
|
||||
|
|
Loading…
Reference in New Issue