KAFKA-14145; Faster KRaft HWM replication (#19800)

This change compares the remote replica's HWM with the leader's HWM and
completes the FETCH request if the remote HWM is less than the leader's
HWM. When the leader's HWM is updated any pending FETCH RPC is
completed.

Reviewers: Alyssa Huang <ahuang@confluent.io>, David Arthur
 <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>
(cherry picked from commit 742b327025)
This commit is contained in:
José Armando García Sancio 2025-06-17 13:00:43 -04:00 committed by José Armando García Sancio
parent 4308dc39a7
commit 88eced0c0f
13 changed files with 641 additions and 163 deletions

View File

@ -56,7 +56,9 @@
// Version 16 is the same as version 15 (KIP-951).
//
// Version 17 adds directory id support from KIP-853
"validVersions": "4-17",
//
// Version 18 adds high-watermark from KIP-1166
"validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
@ -103,7 +105,10 @@
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." },
{ "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true,
"about": "The directory id of the follower fetching." }
"about": "The directory id of the follower fetching." },
{ "name": "HighWatermark", "type": "int64", "versions": "18+", "default": "9223372036854775807", "taggedVersions": "18+",
"tag": 1, "ignorable": true,
"about": "The high-watermark known by the replica. -1 if the high-watermark is not known and 9223372036854775807 if the feature is not supported." }
]}
]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,

View File

@ -48,7 +48,9 @@
// Version 16 adds the 'NodeEndpoints' field (KIP-951).
//
// Version 17 no changes to the response (KIP-853).
"validVersions": "4-17",
//
// Version 18 no changes to the response (KIP-1166)
"validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

View File

@ -395,13 +395,25 @@ public class FeatureControlManagerTest {
MetadataVersion.MINIMUM_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel())).
build();
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
true,
0));
assertEquals(
ControllerResult.of(
List.of(),
new ApiError(
Errors.INVALID_UPDATE_VERSION,
String.format(
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions %s-%s",
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()
)
)
),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
true,
0
)
);
}
@Test

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
@ -386,6 +385,11 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// records still held in memory directly to the listener
appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs);
// After updating the high-watermark, complete all of the deferred
// fetch requests. This is always correct because all fetch request
// deferred have a HWM less or equal to the previous leader's HWM.
fetchPurgatory.completeAll(currentTimeMs);
// It is also possible that the high watermark is being updated
// for the first time following the leader election, so we need
// to give lagging listeners an opportunity to catch up as well
@ -741,7 +745,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning"));
Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Not handling request since this node is resigning"
)
);
quorum.transitionToResigned(preferredSuccessors);
resetConnections();
}
@ -753,12 +760,18 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// After becoming a follower, we need to complete all pending fetches so that
// they can be re-sent to the leader without waiting for their expirations
fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
"Cannot process the fetch request because the node is no longer the leader."));
fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Cannot process the fetch request because the node is no longer the leader"
)
);
// Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader
appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException(
"Failed to receive sufficient acknowledgments for this append before leader change."));
appendPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Failed to receive sufficient acknowledgments for this append before leader change"
)
);
}
private void transitionToFollower(
@ -1514,19 +1527,22 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
|| FetchResponse.recordsSize(partitionResponse) > 0
|| request.maxWaitMs() == 0
|| isPartitionDiverged(partitionResponse)
|| isPartitionSnapshotted(partitionResponse)) {
|| isPartitionSnapshotted(partitionResponse)
|| isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
// Reply immediately if any of the following is true
// 1. The response contains an error
// 2. There are records in the response
// 3. The fetching replica doesn't want to wait for the partition to contain new data
// 4. The fetching replica needs to truncate because the log diverged
// 5. The fetching replica needs to fetch a snapshot
// 6. The fetching replica should update its high-watermark
return completedFuture(response);
}
CompletableFuture<Long> future = fetchPurgatory.await(
fetchPartition.fetchOffset(),
request.maxWaitMs());
request.maxWaitMs()
);
return future.handle((completionTimeMs, exception) -> {
if (exception != null) {
@ -1556,26 +1572,25 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
Optional.empty()
);
}
} else {
logger.trace(
"Completing delayed fetch from {} starting at offset {} at {}",
replicaKey,
fetchPartition.fetchOffset(),
completionTimeMs
);
// It is safe to call tryCompleteFetchRequest because only the polling thread completes
// this future successfully. The future is completed successfully either because of an
// append (maybeAppendBatches) or because the HWM was updated (onUpdateLeaderHighWatermark)
return tryCompleteFetchRequest(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
replicaKey,
fetchPartition,
completionTimeMs
);
}
// FIXME: `completionTimeMs`, which can be null
logger.trace(
"Completing delayed fetch from {} starting at offset {} at {}",
replicaKey,
fetchPartition.fetchOffset(),
completionTimeMs
);
// It is safe to call tryCompleteFetchRequest because only the polling thread completes this
// future successfully. This is true because only the polling thread appends record batches to
// the log from maybeAppendBatches.
return tryCompleteFetchRequest(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
replicaKey,
fetchPartition,
time.milliseconds()
);
});
}
@ -1633,18 +1648,29 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private static boolean isPartitionDiverged(FetchResponseData.PartitionData partitionResponseData) {
private static boolean isPartitionDiverged(
FetchResponseData.PartitionData partitionResponseData
) {
FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch();
return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1;
}
private static boolean isPartitionSnapshotted(FetchResponseData.PartitionData partitionResponseData) {
private static boolean isPartitionSnapshotted(
FetchResponseData.PartitionData partitionResponseData
) {
FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId();
return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1;
}
private static boolean isHighWatermarkUpdated(
FetchResponseData.PartitionData partitionResponseData,
FetchRequestData.FetchPartition partitionRequestData
) {
return partitionRequestData.highWatermark() < partitionResponseData.highWatermark();
}
private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
if (leaderIdOrNil < 0)
return OptionalInt.empty();
@ -2882,6 +2908,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
.setLastFetchedEpoch(log.lastFetchedEpoch())
.setFetchOffset(log.endOffset().offset())
.setReplicaDirectoryId(quorum.localDirectoryId())
.setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L))
);
return request

View File

@ -53,7 +53,7 @@ public class KafkaRaftClientClusterAuthTest {
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0, 0);
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0, 0, context.client.highWatermark());
FetchResponseData response = new FetchResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
context.deliverResponse(

View File

@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ArbitraryMemoryRecords;
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider;
@ -33,7 +34,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -72,7 +76,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset();
@ -107,7 +111,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset();
int numberOfRecords = 10;
@ -149,4 +153,327 @@ public final class KafkaRaftClientFetchTest {
// Check that only the first batch was appended because the second batch has a greater epoch
assertEquals(oldLogEndOffset + numberOfRecords, context.log.endOffset().offset());
}
@Test
void testHighWatermarkSentInFetchRequest() throws Exception {
int epoch = 2;
int localId = KafkaRaftClientTest.randomReplicaId();
ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, electedLeader)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, electedLeader.id())
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
var localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
epoch,
OptionalLong.empty()
);
// Set the HWM to the LEO
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
electedLeader.id(),
MemoryRecords.EMPTY,
localLogEndOffset,
Errors.NONE
)
);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
epoch,
OptionalLong.of(localLogEndOffset)
);
}
@Test
void testDefaultHwmDeferred() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testUnknownHwmDeferredWhenLeaderDoesNotKnowHwm() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.empty(),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testOutdatedHwmCompletedWhenLeaderKnowsHwm() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
// FETCH response completed when remote replica doesn't know HWM
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.empty(),
Integer.MAX_VALUE
)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
// FETCH response completed when remote replica has outdated HWM
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset - 1),
Integer.MAX_VALUE
)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
}
@Test
void testUnchangedHighWatermarkDeferred() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testUpdatedHighWatermarkCompleted() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
// Establish a HWM (3) but don't set it to the LEO
context.deliverRequest(context.fetchRequest(epoch, voter, 3L, 2, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
// Update the HWM and complete the deferred FETCH response
context.deliverRequest(
context.fetchRequest(epoch, voter, localLogEndOffset, lastFetchedEpoch, 0)
);
context.pollUntilResponse();
// Check that two fetch requests were completed
var fetchResponses = context.drainSentResponses(ApiKeys.FETCH);
for (var fetchResponse : fetchResponses) {
var partitionResponse = context.assertFetchResponseData(fetchResponse);
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));
assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
assertEquals(localLogEndOffset, partitionResponse.highWatermark());
}
}
}

View File

@ -65,7 +65,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -350,7 +350,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -654,7 +654,7 @@ public class KafkaRaftClientPreVoteTest {
// After fetching successfully from the leader once, follower will no longer grant PreVotes
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),

View File

@ -229,7 +229,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// check if leader response were to contain bootstrap snapshot id, follower would not send fetch snapshot request
context.deliverResponse(
@ -239,7 +239,7 @@ public class KafkaRaftClientReconfigTest {
);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@Test
@ -259,7 +259,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// check that before receiving bootstrap records from leader, follower is not in the voter set
assertFalse(context.client.quorum().isVoter(follower));
@ -2142,7 +2142,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2179,7 +2179,7 @@ public class KafkaRaftClientReconfigTest {
// after sending an update voter the next request should be a fetch
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -2215,7 +2215,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2255,7 +2255,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2306,7 +2306,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2342,7 +2342,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Election a new leader causes the replica to resend update voter request
int newEpoch = epoch + 1;
@ -2354,7 +2354,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2390,7 +2390,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark());
}
@Test
@ -2640,7 +2640,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2661,7 +2661,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// after more than 3 fetch timeouts the update voter period timer should have expired.
// check that the update voter period timer doesn't remain at zero (0) and cause the message queue to get
@ -2701,7 +2701,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2721,7 +2721,7 @@ public class KafkaRaftClientReconfigTest {
// expect one last FETCH request
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// don't send a response but increase the time
context.time.sleep(context.requestTimeoutMs() - 1);
@ -2781,7 +2781,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2818,7 +2818,7 @@ public class KafkaRaftClientReconfigTest {
// check that there is a fetch to the new leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
assertEquals(voter2.id(), fetchRequest.destination().id());
}
@ -2837,7 +2837,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
assertEquals(-2, fetchRequest.destination().id());
}

View File

@ -155,7 +155,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch());
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@ -163,7 +169,12 @@ public final class KafkaRaftClientSnapshotTest {
);
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch());
context.assertSentFetchRequest(
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
@ -197,7 +208,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch());
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@ -205,7 +222,12 @@ public final class KafkaRaftClientSnapshotTest {
);
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch());
context.assertSentFetchRequest(
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener);
@ -1145,7 +1167,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1162,7 +1184,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1179,7 +1201,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Fetch timer is not reset; sleeping for remainder should transition to prospective
context.time.sleep(context.fetchTimeoutMs - slept);
@ -1206,7 +1228,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1249,7 +1271,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch());
context.assertFetchRequestData(
fetchRequest,
epoch,
snapshotId.offset(),
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
@ -1279,7 +1307,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1354,7 +1382,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch());
context.assertFetchRequestData(
fetchRequest,
epoch,
snapshotId.offset(),
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
@ -1384,7 +1418,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1426,7 +1460,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -1446,7 +1480,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1488,7 +1522,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -1507,7 +1541,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1549,7 +1583,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -1568,7 +1602,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1639,7 +1673,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1687,7 +1721,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -1736,7 +1770,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -1755,7 +1789,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2014,7 +2048,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 1L, 1);
context.assertFetchRequestData(fetchRequest, epoch, 1L, 1, context.client.highWatermark());
// The response does not advance the high watermark
List<String> records1 = List.of("b", "c");
@ -2042,7 +2076,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, context.client.highWatermark());
List<String> records2 = List.of("d", "e", "f");
int batch2Epoch = 4;

View File

@ -818,7 +818,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -863,7 +863,7 @@ class KafkaRaftClientTest {
context.assertUnknownLeaderAndNoVotedCandidate(0);
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 0);
RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
@ -1430,7 +1430,7 @@ class KafkaRaftClientTest {
context.time.sleep(1);
context.client.poll();
context.assertSentFetchRequest(leaderEpoch, 0, 0);
context.assertSentFetchRequest(leaderEpoch, 0, 0, OptionalLong.empty());
context.time.sleep(context.electionBackoffMaxMs);
context.client.poll();
@ -1885,7 +1885,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 0L, 0);
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
}
@ParameterizedTest
@ -1906,7 +1906,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch);
context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
}
@ParameterizedTest
@ -1926,7 +1926,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch);
context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs);
context.client.poll();
@ -1952,7 +1952,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch);
context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest();
@ -1979,13 +1979,13 @@ class KafkaRaftClientTest {
.build();
context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 0L, 0);
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
context.time.sleep(context.electionTimeoutMs() * 2);
context.pollUntilRequest();
assertTrue(context.client.quorum().isUnattached());
context.assertSentFetchRequest(epoch, 0L, 0);
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// confirm no vote request was sent
assertEquals(0, context.channel.drainSendQueue().size());
@ -1998,7 +1998,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
// observer cannot transition to prospective though
assertTrue(context.client.quorum().isUnattached());
context.assertSentFetchRequest(epoch + 1, 0L, 0);
context.assertSentFetchRequest(epoch + 1, 0L, 0, OptionalLong.empty());
assertEquals(0, context.channel.drainSendQueue().size());
}
@ -2017,7 +2017,7 @@ class KafkaRaftClientTest {
.build();
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2081,7 +2081,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2095,7 +2095,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2128,7 +2128,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
@ -2145,7 +2145,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -2173,7 +2173,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, context.client.highWatermark());
// Send a response with the leader and epoch
context.deliverResponse(
@ -2189,7 +2189,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id());
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
@ -2198,7 +2198,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Deliver the delayed responses from the leader
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
@ -2247,7 +2247,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0);
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, context.client.highWatermark());
// Send a response with the leader and epoch
context.deliverResponse(
@ -2263,7 +2263,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id());
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
@ -2272,7 +2272,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0, context.client.highWatermark());
// At this point toLeaderFetchRequest has timed out but retryToBootstrapServerFetchRequest
// is still waiting for a response.
@ -2378,17 +2378,23 @@ class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
// null cluster id is accepted
context.deliverRequest(context.fetchRequest(epoch, null, otherNodeKey, -5L, 0, 0));
context.deliverRequest(
context.fetchRequest(epoch, null, otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
// empty cluster id is rejected
context.deliverRequest(context.fetchRequest(epoch, "", otherNodeKey, -5L, 0, 0));
context.deliverRequest(
context.fetchRequest(epoch, "", otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected
context.deliverRequest(context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0, 0));
context.deliverRequest(
context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
@ -2778,7 +2784,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now await the fetch timeout and become prospective
context.time.sleep(context.fetchTimeoutMs);
@ -2818,7 +2824,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now receive a BeginEpoch from `voter3`
context.deliverRequest(context.beginEpochRequest(epoch + 1, voter3));
@ -2915,7 +2921,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest1.correlationId(),
@ -2929,7 +2935,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -2954,7 +2960,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest();
@ -2964,7 +2970,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
}
@ParameterizedTest
@ -2985,12 +2991,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc)
.build();
context.discoverLeaderAsObserver(leaderId, epoch);
context.discoverLeaderAsObserver(leaderId, epoch, context.client.highWatermark());
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest1.correlationId(),
@ -3004,7 +3010,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest2.correlationId(),
@ -3034,12 +3040,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc)
.build();
context.discoverLeaderAsObserver(leaderId, epoch);
context.discoverLeaderAsObserver(leaderId, epoch, context.client.highWatermark());
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest();
@ -3049,7 +3055,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest2.correlationId(),
@ -3727,7 +3733,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse(
@ -3758,7 +3764,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse(
@ -3789,7 +3795,7 @@ class KafkaRaftClientTest {
// Receive an empty fetch response
context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
FetchResponseData fetchResponse = context.fetchResponse(
epoch,
otherNodeId,
@ -3809,7 +3815,7 @@ class KafkaRaftClientTest {
// Receive some records in the next poll, but do not advance high watermark
context.pollUntilRequest();
Records records = context.buildBatch(0L, epoch, List.of("a", "b"));
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0);
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.of(0));
fetchResponse = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse(
fetchQuorumRequest.correlationId(),
@ -3822,7 +3828,7 @@ class KafkaRaftClientTest {
// The next fetch response is empty, but should still advance the high watermark
context.pollUntilRequest();
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch);
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch, OptionalLong.of(0));
fetchResponse = context.fetchResponse(
epoch,
otherNodeId,
@ -3974,10 +3980,20 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 3L, lastEpoch);
RaftRequest.Outbound request = context.assertSentFetchRequest(
epoch,
3L,
lastEpoch,
OptionalLong.empty()
);
FetchResponseData response = context.divergingFetchResponse(epoch, otherNodeId, 2L,
lastEpoch, 1L);
FetchResponseData response = context.divergingFetchResponse(
epoch,
otherNodeId,
2L,
lastEpoch,
1L
);
context.deliverResponse(request.correlationId(), request.destination(), response);
// Poll again to complete truncation
@ -3987,7 +4003,7 @@ class KafkaRaftClientTest {
// Now we should be fetching
context.client.poll();
context.assertSentFetchRequest(epoch, 2L, lastEpoch);
context.assertSentFetchRequest(epoch, 2L, lastEpoch, context.client.highWatermark());
}
@ParameterizedTest
@ -4255,7 +4271,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// The response does not advance the high watermark
List<String> records1 = List.of("a", "b", "c");
@ -4276,7 +4292,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, context.client.highWatermark());
// The high watermark advances to include the first batch we fetched
List<String> records2 = List.of("d", "e", "f");
@ -4514,7 +4530,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest1.destination().id()));
context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
context.assertFetchRequestData(fetchRequest1, 0, 0L, 0, context.client.highWatermark());
int leaderEpoch = 5;
@ -4531,7 +4547,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest2.destination().id());
context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0);
context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0, context.client.highWatermark());
List<String> records = List.of("a", "b", "c");
MemoryRecords batch1 = context.buildBatch(0L, 3, records);

View File

@ -957,7 +957,7 @@ public final class RaftClientTestContext {
return requests;
}
private List<RaftResponse.Outbound> drainSentResponses(
List<RaftResponse.Outbound> drainSentResponses(
ApiKeys apiKey
) {
List<RaftResponse.Outbound> res = new ArrayList<>();
@ -1114,23 +1114,20 @@ public final class RaftClientTestContext {
RaftRequest.Outbound assertSentFetchRequest(
int epoch,
long fetchOffset,
int lastFetchedEpoch
int lastFetchedEpoch,
OptionalLong highWatermark
) {
List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
assertEquals(1, sentMessages.size());
RaftRequest.Outbound raftRequest = sentMessages.get(0);
assertFetchRequestData(raftRequest, epoch, fetchOffset, lastFetchedEpoch);
assertFetchRequestData(raftRequest, epoch, fetchOffset, lastFetchedEpoch, highWatermark);
return raftRequest;
}
FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals(
1, sentMessages.size(), "Found unexpected sent messages " + sentMessages);
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data();
FetchResponseData.PartitionData assertFetchResponseData(RaftResponse.Outbound message) {
assertEquals(ApiKeys.FETCH.id, message.data().apiKey());
FetchResponseData response = (FetchResponseData) message.data();
assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
assertEquals(1, response.responses().size());
@ -1152,17 +1149,30 @@ public final class RaftClientTestContext {
return partitionResponse;
}
FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals(
1,
sentMessages.size(),
"Found unexpected sent messages " + sentMessages
);
return assertFetchResponseData(sentMessages.get(0));
}
void assertSentFetchPartitionResponse(Errors topLevelError) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals(
1, sentMessages.size(), "Found unexpected sent messages " + sentMessages);
1,
sentMessages.size(),
"Found unexpected sent messages " + sentMessages
);
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data();
assertEquals(topLevelError, Errors.forCode(response.errorCode()));
}
MemoryRecords assertSentFetchPartitionResponse(
Errors error,
int epoch,
@ -1375,7 +1385,8 @@ public final class RaftClientTestContext {
void discoverLeaderAsObserver(
int leaderId,
int epoch
int epoch,
OptionalLong highWatermark
) throws Exception {
pollUntilRequest();
RaftRequest.Outbound fetchRequest = assertSentFetchRequest();
@ -1384,7 +1395,7 @@ public final class RaftClientTestContext {
startingVoters.voterIds().contains(destinationId) || bootstrapIds.contains(destinationId),
String.format("id %d is not in sets %s or %s", destinationId, startingVoters, bootstrapIds)
);
assertFetchRequestData(fetchRequest, 0, 0L, 0);
assertFetchRequestData(fetchRequest, 0, 0L, 0, highWatermark);
deliverResponse(
fetchRequest.correlationId(),
@ -1672,7 +1683,8 @@ public final class RaftClientTestContext {
RaftRequest.Outbound message,
int epoch,
long fetchOffset,
int lastFetchedEpoch
int lastFetchedEpoch,
OptionalLong highWatermark
) {
assertInstanceOf(
FetchRequestData.class,
@ -1691,6 +1703,7 @@ public final class RaftClientTestContext {
assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
assertEquals(localId.orElse(-1), request.replicaState().replicaId());
assertEquals(highWatermark.orElse(-1), fetchPartition.highWatermark());
// Assert that voters have flushed up to the fetch offset
if ((localId.isPresent() && startingVoters.voterIds().contains(localId.getAsInt())) ||
@ -1716,6 +1729,24 @@ public final class RaftClientTestContext {
long fetchOffset,
int lastFetchedEpoch,
int maxWaitTimeMs
) {
return fetchRequest(
epoch,
replicaKey,
fetchOffset,
lastFetchedEpoch,
OptionalLong.of(Long.MAX_VALUE),
maxWaitTimeMs
);
}
FetchRequestData fetchRequest(
int epoch,
ReplicaKey replicaKey,
long fetchOffset,
int lastFetchedEpoch,
OptionalLong highWatermark,
int maxWaitTimeMs
) {
return fetchRequest(
epoch,
@ -1723,6 +1754,7 @@ public final class RaftClientTestContext {
replicaKey,
fetchOffset,
lastFetchedEpoch,
highWatermark,
maxWaitTimeMs
);
}
@ -1733,6 +1765,7 @@ public final class RaftClientTestContext {
ReplicaKey replicaKey,
long fetchOffset,
int lastFetchedEpoch,
OptionalLong highWatermark,
int maxWaitTimeMs
) {
FetchRequestData request = RaftUtil.singletonFetchRequest(
@ -1742,7 +1775,8 @@ public final class RaftClientTestContext {
fetchPartition
.setCurrentLeaderEpoch(epoch)
.setLastFetchedEpoch(lastFetchedEpoch)
.setFetchOffset(fetchOffset);
.setFetchOffset(fetchOffset)
.setHighWatermark(highWatermark.orElse(-1));
if (raftProtocol.isReconfigSupported()) {
fetchPartition
.setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
@ -1932,7 +1966,9 @@ public final class RaftClientTestContext {
}
private short fetchRpcVersion() {
if (raftProtocol.isReconfigSupported()) {
if (raftProtocol.isHwmInFetchSupported()) {
return 18;
} else if (raftProtocol.isReconfigSupported()) {
return 17;
} else {
return 16;
@ -2236,7 +2272,9 @@ public final class RaftClientTestContext {
// dynamic quorum reconfiguration support
KIP_853_PROTOCOL,
// preVote support
KIP_996_PROTOCOL;
KIP_996_PROTOCOL,
// HWM in FETCH request support
KIP_1166_PROTOCOL;
boolean isKRaftSupported() {
return isAtLeast(KIP_595_PROTOCOL);
@ -2250,6 +2288,10 @@ public final class RaftClientTestContext {
return isAtLeast(KIP_996_PROTOCOL);
}
boolean isHwmInFetchSupported() {
return isAtLeast(KIP_1166_PROTOCOL);
}
private boolean isAtLeast(RaftProtocol otherRpc) {
return this.compareTo(otherRpc) >= 0;
}

View File

@ -116,6 +116,9 @@ public enum MetadataVersion {
// Streams groups are early access in 4.1 (KIP-1071).
IBP_4_1_IV0(26, "4.1", "IV0", false),
// Send FETCH version 18 in the replica fetcher (KIP-1166)
IBP_4_1_IV1(27, "4.1", "IV1", false),
// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
// IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
// a placeholder.
@ -126,7 +129,7 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
IBP_4_2_IV0(27, "4.2", "IV0", false),
IBP_4_2_IV0(28, "4.2", "IV0", false),
// Enables "streams" groups by default for new clusters (KIP-1071).
//
@ -134,7 +137,7 @@ public enum MetadataVersion {
// *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. ***
IBP_4_2_IV1(28, "4.2", "IV1", false);
IBP_4_2_IV1(29, "4.2", "IV1", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version
@ -264,13 +267,15 @@ public enum MetadataVersion {
}
public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_9_IV0)) {
if (isAtLeast(IBP_4_1_IV1)) {
return 18;
} else if (isAtLeast(IBP_3_9_IV0)) {
return 17;
} else if (this.isAtLeast(IBP_3_7_IV4)) {
} else if (isAtLeast(IBP_3_7_IV4)) {
return 16;
} else if (this.isAtLeast(IBP_3_5_IV1)) {
} else if (isAtLeast(IBP_3_5_IV1)) {
return 15;
} else if (this.isAtLeast(IBP_3_5_IV0)) {
} else if (isAtLeast(IBP_3_5_IV0)) {
return 14;
} else {
return 13;

View File

@ -121,8 +121,16 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version"))
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput);
assertEquals(
String.format(
"Could not disable metadata.version. The update failed for all features since the " +
"following feature had an error: Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions %s-%s",
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()
),
commandOutput
);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),