mirror of https://github.com/apache/kafka.git
Merged in upstream trunk
This commit is contained in:
commit
af67e01bf0
|
@ -61,8 +61,8 @@ import java.util.Set;
|
||||||
* This class manage the fetching process with the brokers.
|
* This class manage the fetching process with the brokers.
|
||||||
*/
|
*/
|
||||||
public class Fetcher<K, V> {
|
public class Fetcher<K, V> {
|
||||||
private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
|
public static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
|
||||||
private static final long LATEST_OFFSET_TIMESTAMP = -1L;
|
public static final long LATEST_OFFSET_TIMESTAMP = -1L;
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
|
private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
|
||||||
|
|
||||||
|
|
|
@ -34,15 +34,24 @@ import org.apache.kafka.common.utils.Time;
|
||||||
* A mock network client for use testing code
|
* A mock network client for use testing code
|
||||||
*/
|
*/
|
||||||
public class MockClient implements KafkaClient {
|
public class MockClient implements KafkaClient {
|
||||||
|
public static final RequestMatcher ALWAYS_TRUE = new RequestMatcher() {
|
||||||
|
@Override
|
||||||
|
public boolean matches(ClientRequest request) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
private class FutureResponse {
|
private class FutureResponse {
|
||||||
public final Struct responseBody;
|
public final Struct responseBody;
|
||||||
public final boolean disconnected;
|
public final boolean disconnected;
|
||||||
|
public final RequestMatcher requestMatcher;
|
||||||
|
|
||||||
public FutureResponse(Struct responseBody, boolean disconnected) {
|
public FutureResponse(Struct responseBody, boolean disconnected, RequestMatcher requestMatcher) {
|
||||||
this.responseBody = responseBody;
|
this.responseBody = responseBody;
|
||||||
this.disconnected = disconnected;
|
this.disconnected = disconnected;
|
||||||
|
this.requestMatcher = requestMatcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Time time;
|
private final Time time;
|
||||||
|
@ -94,6 +103,9 @@ public class MockClient implements KafkaClient {
|
||||||
public void send(ClientRequest request) {
|
public void send(ClientRequest request) {
|
||||||
if (!futureResponses.isEmpty()) {
|
if (!futureResponses.isEmpty()) {
|
||||||
FutureResponse futureResp = futureResponses.poll();
|
FutureResponse futureResp = futureResponses.poll();
|
||||||
|
if (!futureResp.requestMatcher.matches(request))
|
||||||
|
throw new IllegalStateException("Next in line response did not match expected request");
|
||||||
|
|
||||||
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
|
ClientResponse resp = new ClientResponse(request, time.milliseconds(), futureResp.disconnected, futureResp.responseBody);
|
||||||
responses.add(resp);
|
responses.add(resp);
|
||||||
} else {
|
} else {
|
||||||
|
@ -141,11 +153,32 @@ public class MockClient implements KafkaClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void prepareResponse(Struct body) {
|
public void prepareResponse(Struct body) {
|
||||||
prepareResponse(body, false);
|
prepareResponse(ALWAYS_TRUE, body, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare a response for a request matching the provided matcher. If the matcher does not
|
||||||
|
* match, {@link #send(ClientRequest)} will throw IllegalStateException
|
||||||
|
* @param matcher The matcher to apply
|
||||||
|
* @param body The response body
|
||||||
|
*/
|
||||||
|
public void prepareResponse(RequestMatcher matcher, Struct body) {
|
||||||
|
prepareResponse(matcher, body, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void prepareResponse(Struct body, boolean disconnected) {
|
public void prepareResponse(Struct body, boolean disconnected) {
|
||||||
futureResponses.add(new FutureResponse(body, disconnected));
|
prepareResponse(ALWAYS_TRUE, body, disconnected);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare a response for a request matching the provided matcher. If the matcher does not
|
||||||
|
* match, {@link #send(ClientRequest)} will throw IllegalStateException
|
||||||
|
* @param matcher The matcher to apply
|
||||||
|
* @param body The response body
|
||||||
|
* @param disconnected Whether the request was disconnected
|
||||||
|
*/
|
||||||
|
public void prepareResponse(RequestMatcher matcher, Struct body, boolean disconnected) {
|
||||||
|
futureResponses.add(new FutureResponse(body, disconnected, matcher));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNode(Node node) {
|
public void setNode(Node node) {
|
||||||
|
@ -180,4 +213,14 @@ public class MockClient implements KafkaClient {
|
||||||
return this.node;
|
return this.node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The RequestMatcher provides a way to match a particular request to a response prepared
|
||||||
|
* through {@link #prepareResponse(RequestMatcher, Struct)}. Basically this allows testers
|
||||||
|
* to inspect the request body for the type of the request or for specific fields that should be set,
|
||||||
|
* and to fail the test if it doesn't match.
|
||||||
|
*/
|
||||||
|
public interface RequestMatcher {
|
||||||
|
boolean matches(ClientRequest request);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.ClientRequest;
|
||||||
import org.apache.kafka.clients.Metadata;
|
import org.apache.kafka.clients.Metadata;
|
||||||
import org.apache.kafka.clients.MockClient;
|
import org.apache.kafka.clients.MockClient;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||||
|
@ -30,6 +31,8 @@ import org.apache.kafka.common.protocol.types.Struct;
|
||||||
import org.apache.kafka.common.record.CompressionType;
|
import org.apache.kafka.common.record.CompressionType;
|
||||||
import org.apache.kafka.common.record.MemoryRecords;
|
import org.apache.kafka.common.record.MemoryRecords;
|
||||||
import org.apache.kafka.common.requests.FetchResponse;
|
import org.apache.kafka.common.requests.FetchResponse;
|
||||||
|
import org.apache.kafka.common.requests.ListOffsetRequest;
|
||||||
|
import org.apache.kafka.common.requests.ListOffsetResponse;
|
||||||
import org.apache.kafka.common.requests.MetadataResponse;
|
import org.apache.kafka.common.requests.MetadataResponse;
|
||||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
@ -40,11 +43,13 @@ import org.junit.Test;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -158,26 +163,34 @@ public class FetcherTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchFailed() {
|
public void testFetchNotLeaderForPartition() {
|
||||||
subscriptions.subscribe(tp);
|
subscriptions.subscribe(tp);
|
||||||
subscriptions.seek(tp, 0);
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
// fetch with not leader
|
|
||||||
fetcher.initFetches(cluster);
|
fetcher.initFetches(cluster);
|
||||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L));
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
assertEquals(0, fetcher.fetchedRecords().size());
|
assertEquals(0, fetcher.fetchedRecords().size());
|
||||||
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchUnknownTopicOrPartition() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
// fetch with unknown topic partition
|
|
||||||
fetcher.initFetches(cluster);
|
fetcher.initFetches(cluster);
|
||||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L));
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
assertEquals(0, fetcher.fetchedRecords().size());
|
assertEquals(0, fetcher.fetchedRecords().size());
|
||||||
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchOffsetOutOfRange() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
// fetch with out of range
|
|
||||||
subscriptions.fetched(tp, 5);
|
|
||||||
fetcher.initFetches(cluster);
|
fetcher.initFetches(cluster);
|
||||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
|
@ -188,18 +201,94 @@ public class FetcherTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchOutOfRange() {
|
public void testFetchDisconnected() {
|
||||||
subscriptions.subscribe(tp);
|
subscriptions.subscribe(tp);
|
||||||
subscriptions.seek(tp, 5);
|
subscriptions.seek(tp, 0);
|
||||||
|
|
||||||
// fetch with out of range
|
|
||||||
fetcher.initFetches(cluster);
|
fetcher.initFetches(cluster);
|
||||||
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L));
|
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L), true);
|
||||||
consumerClient.poll(0);
|
consumerClient.poll(0);
|
||||||
assertTrue(subscriptions.isOffsetResetNeeded(tp));
|
|
||||||
assertEquals(0, fetcher.fetchedRecords().size());
|
assertEquals(0, fetcher.fetchedRecords().size());
|
||||||
assertEquals(null, subscriptions.fetched(tp));
|
|
||||||
assertEquals(null, subscriptions.consumed(tp));
|
// disconnects should have no affect on subscription state
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(0, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(0, (long) subscriptions.consumed(tp));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateFetchPositionToCommitted() {
|
||||||
|
// unless a specific reset is expected, the default behavior is to reset to the committed
|
||||||
|
// position if one is present
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.committed(tp, 5);
|
||||||
|
|
||||||
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateFetchPositionResetToDefaultOffset() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
// with no commit position, we should reset using the default strategy defined above (EARLIEST)
|
||||||
|
|
||||||
|
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
|
||||||
|
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||||
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateFetchPositionResetToLatestOffset() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
|
||||||
|
|
||||||
|
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||||
|
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||||
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateFetchPositionResetToEarliestOffset() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
|
||||||
|
|
||||||
|
client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP),
|
||||||
|
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||||
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateFetchPositionDisconnect() {
|
||||||
|
subscriptions.subscribe(tp);
|
||||||
|
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
|
||||||
|
|
||||||
|
// First request gets a disconnect
|
||||||
|
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||||
|
listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true);
|
||||||
|
|
||||||
|
// Next one succeeds
|
||||||
|
client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP),
|
||||||
|
listOffsetResponse(Errors.NONE, Arrays.asList(5L)));
|
||||||
|
fetcher.updateFetchPositions(Collections.singleton(tp));
|
||||||
|
assertFalse(subscriptions.isOffsetResetNeeded(tp));
|
||||||
|
assertTrue(subscriptions.isFetchable(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.fetched(tp));
|
||||||
|
assertEquals(5, (long) subscriptions.consumed(tp));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -213,6 +302,26 @@ public class FetcherTest {
|
||||||
assertEquals(cluster.topics().size(), allTopics.size());
|
assertEquals(cluster.topics().size(), allTopics.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
|
||||||
|
// matches any list offset request with the provided timestamp
|
||||||
|
return new MockClient.RequestMatcher() {
|
||||||
|
@Override
|
||||||
|
public boolean matches(ClientRequest request) {
|
||||||
|
ListOffsetRequest req = new ListOffsetRequest(request.request().body());
|
||||||
|
ListOffsetRequest.PartitionData partitionData = req.offsetData().get(tp);
|
||||||
|
return partitionData != null && partitionData.timestamp == timestamp;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private Struct listOffsetResponse(Errors error, List<Long> offsets) {
|
||||||
|
ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), offsets);
|
||||||
|
Map<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<>();
|
||||||
|
allPartitionData.put(tp, partitionData);
|
||||||
|
ListOffsetResponse response = new ListOffsetResponse(allPartitionData);
|
||||||
|
return response.toStruct();
|
||||||
|
}
|
||||||
|
|
||||||
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
|
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
|
||||||
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
|
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, buffer)));
|
||||||
return response.toStruct();
|
return response.toStruct();
|
||||||
|
|
Loading…
Reference in New Issue