KAFKA-13002: listOffsets must downgrade immediately for non MAX_TIMESTAMP specs (#10936)

This patch fixes a regression introduced https://github.com/apache/kafka/pull/10760. The downgrade logic was not downgrading the version when only non MAX_TIMESTAMP specs were used.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
thomaskwscott 2021-07-01 07:35:55 +01:00 committed by GitHub
parent cee2e975d1
commit 593b34a5be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 24 deletions

View File

@ -4244,7 +4244,9 @@ public class KafkaAdminClient extends AdminClient {
final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
private boolean supportsMaxTimestamp = true;
private boolean supportsMaxTimestamp = partitionsToQuery.stream()
.flatMap(t -> t.partitions().stream())
.anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP);
@Override
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
@ -4315,7 +4317,6 @@ public class KafkaAdminClient extends AdminClient {
supportsMaxTimestamp = false;
// fail any unsupported futures and remove partitions from the downgraded retry
boolean foundMaxTimestampPartition = false;
Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
while (topicIterator.hasNext()) {
ListOffsetsTopic topic = topicIterator.next();
@ -4323,7 +4324,6 @@ public class KafkaAdminClient extends AdminClient {
while (partitionIterator.hasNext()) {
ListOffsetsPartition partition = partitionIterator.next();
if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
foundMaxTimestampPartition = true;
futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
.completeExceptionally(new UnsupportedVersionException(
"Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
@ -4334,7 +4334,7 @@ public class KafkaAdminClient extends AdminClient {
topicIterator.remove();
}
}
return foundMaxTimestampPartition && !partitionsToQuery.isEmpty();
return !partitionsToQuery.isEmpty();
}
return false;
}

View File

@ -237,23 +237,31 @@ public class MockClient implements KafkaClient {
continue;
AbstractRequest.Builder<?> builder = request.requestBuilder();
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest) {
unsupportedVersionException = new UnsupportedVersionException(
"Api " + request.apiKey() + " with version " + version);
} else {
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request "
+ abstractRequest + " with prepared response " + futureResp.responseBody);
try {
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
UnsupportedVersionException unsupportedVersionException = null;
if (futureResp.isUnsupportedRequest) {
unsupportedVersionException = new UnsupportedVersionException(
"Api " + request.apiKey() + " with version " + version);
} else {
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request "
+ abstractRequest + " with prepared response " + futureResp.responseBody);
}
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
responses.add(resp);
} catch (UnsupportedVersionException unsupportedVersionException) {
ClientResponse resp = new ClientResponse(request.makeHeader(builder.latestAllowedVersion()), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), false, unsupportedVersionException, null, null);
responses.add(resp);
}
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
request.createdTimeMs(), time.milliseconds(), futureResp.disconnected,
unsupportedVersionException, null, futureResp.responseBody);
responses.add(resp);
iterator.remove();
return;
}

View File

@ -4250,7 +4250,8 @@ public class KafkaAdminClientTest {
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
@ -4283,7 +4284,8 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
@ -4333,7 +4335,8 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
@ -4347,6 +4350,49 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListOffsetsNonMaxTimestampDowngradedImmediately() throws Exception {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
List<PartitionInfo> pInfos = new ArrayList<>();
pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
pInfos,
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(
ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 6));
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
ListOffsetsTopicResponse t0 = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321);
ListOffsetsResponseData responseData = new ListOffsetsResponseData()
.setThrottleTimeMs(0)
.setTopics(Arrays.asList(t0));
// listoffsets response from broker 0
env.kafkaClient().prepareResponse(
request -> request instanceof ListOffsetsRequest,
new ListOffsetsResponse(responseData));
ListOffsetsResult result = env.adminClient().listOffsets(
Collections.singletonMap(tp0, OffsetSpec.latest()));
ListOffsetsResultInfo tp0Offset = result.partitionResult(tp0).get();
assertEquals(123L, tp0Offset.offset());
assertEquals(321, tp0Offset.leaderEpoch().get().intValue());
assertEquals(-1L, tp0Offset.timestamp());
}
}
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
return Utils.mkMap(
Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)),

View File

@ -2213,10 +2213,15 @@ public class TransactionManagerTest {
txnOffsetCommitResponse.put(tp0, Errors.NONE);
txnOffsetCommitResponse.put(tp1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
prepareGroupMetadataCommit(
TransactionalRequestResult addOffsetsResult = prepareGroupMetadataCommit(
() -> prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, txnOffsetCommitResponse));
assertThrows(UnsupportedVersionException.class, () -> sender.runOnce());
sender.runOnce();
assertTrue(addOffsetsResult.isCompleted());
assertFalse(addOffsetsResult.isSuccessful());
assertTrue(addOffsetsResult.error() instanceof UnsupportedVersionException);
assertFatalError(UnsupportedVersionException.class);
}
private TransactionalRequestResult prepareGroupMetadataCommit(Runnable prepareTxnCommitResponse) {