KAFKA-14145; Faster KRaft HWM replication (#19800)
CI / build (push) Waiting to run Details

This change compares the remote replica's HWM with the leader's HWM and
completes the FETCH request if the remote HWM is less than the leader's
HWM. When the leader's HWM is updated any pending FETCH RPC is
completed.

Reviewers: Alyssa Huang <ahuang@confluent.io>, David Arthur
 <mumrah@gmail.com>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
José Armando García Sancio 2025-06-17 13:00:43 -04:00 committed by GitHub
parent bf15205647
commit 742b327025
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 641 additions and 163 deletions

View File

@ -56,7 +56,9 @@
// Version 16 is the same as version 15 (KIP-951). // Version 16 is the same as version 15 (KIP-951).
// //
// Version 17 adds directory id support from KIP-853 // Version 17 adds directory id support from KIP-853
"validVersions": "4-17", //
// Version 18 adds high-watermark from KIP-1166
"validVersions": "4-18",
"flexibleVersions": "12+", "flexibleVersions": "12+",
"fields": [ "fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
@ -103,7 +105,10 @@
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." },
{ "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true, { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true,
"about": "The directory id of the follower fetching." } "about": "The directory id of the follower fetching." },
{ "name": "HighWatermark", "type": "int64", "versions": "18+", "default": "9223372036854775807", "taggedVersions": "18+",
"tag": 1, "ignorable": true,
"about": "The high-watermark known by the replica. -1 if the high-watermark is not known and 9223372036854775807 if the feature is not supported." }
]} ]}
]}, ]},
{ "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,

View File

@ -48,7 +48,9 @@
// Version 16 adds the 'NodeEndpoints' field (KIP-951). // Version 16 adds the 'NodeEndpoints' field (KIP-951).
// //
// Version 17 no changes to the response (KIP-853). // Version 17 no changes to the response (KIP-853).
"validVersions": "4-17", //
// Version 18 no changes to the response (KIP-1166)
"validVersions": "4-18",
"flexibleVersions": "12+", "flexibleVersions": "12+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

View File

@ -394,13 +394,25 @@ public class FeatureControlManagerTest {
MetadataVersion.MINIMUM_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel())). MetadataVersion.MINIMUM_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel())).
build(); build();
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())); manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION, assertEquals(
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")), ControllerResult.of(
List.of(),
new ApiError(
Errors.INVALID_UPDATE_VERSION,
String.format(
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions %s-%s",
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()
)
)
),
manager.updateFeatures( manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL), Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
true, true,
0)); 0
)
);
} }
@Test @Test

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.AddRaftVoterRequestData;
@ -386,6 +385,11 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// records still held in memory directly to the listener // records still held in memory directly to the listener
appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs); appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs);
// After updating the high-watermark, complete all of the deferred
// fetch requests. This is always correct because all fetch request
// deferred have a HWM less or equal to the previous leader's HWM.
fetchPurgatory.completeAll(currentTimeMs);
// It is also possible that the high watermark is being updated // It is also possible that the high watermark is being updated
// for the first time following the leader election, so we need // for the first time following the leader election, so we need
// to give lagging listeners an opportunity to catch up as well // to give lagging listeners an opportunity to catch up as well
@ -741,7 +745,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private void transitionToResigned(List<ReplicaKey> preferredSuccessors) { private void transitionToResigned(List<ReplicaKey> preferredSuccessors) {
fetchPurgatory.completeAllExceptionally( fetchPurgatory.completeAllExceptionally(
Errors.NOT_LEADER_OR_FOLLOWER.exception("Not handling request since this node is resigning")); Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Not handling request since this node is resigning"
)
);
quorum.transitionToResigned(preferredSuccessors); quorum.transitionToResigned(preferredSuccessors);
resetConnections(); resetConnections();
} }
@ -753,12 +760,18 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// After becoming a follower, we need to complete all pending fetches so that // After becoming a follower, we need to complete all pending fetches so that
// they can be re-sent to the leader without waiting for their expirations // they can be re-sent to the leader without waiting for their expirations
fetchPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException( fetchPurgatory.completeAllExceptionally(
"Cannot process the fetch request because the node is no longer the leader.")); Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Cannot process the fetch request because the node is no longer the leader"
)
);
// Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader // Clearing the append purgatory should complete all futures exceptionally since this node is no longer the leader
appendPurgatory.completeAllExceptionally(new NotLeaderOrFollowerException( appendPurgatory.completeAllExceptionally(
"Failed to receive sufficient acknowledgments for this append before leader change.")); Errors.NOT_LEADER_OR_FOLLOWER.exception(
"Failed to receive sufficient acknowledgments for this append before leader change"
)
);
} }
private void transitionToFollower( private void transitionToFollower(
@ -1514,19 +1527,22 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
|| FetchResponse.recordsSize(partitionResponse) > 0 || FetchResponse.recordsSize(partitionResponse) > 0
|| request.maxWaitMs() == 0 || request.maxWaitMs() == 0
|| isPartitionDiverged(partitionResponse) || isPartitionDiverged(partitionResponse)
|| isPartitionSnapshotted(partitionResponse)) { || isPartitionSnapshotted(partitionResponse)
|| isHighWatermarkUpdated(partitionResponse, fetchPartition)) {
// Reply immediately if any of the following is true // Reply immediately if any of the following is true
// 1. The response contains an error // 1. The response contains an error
// 2. There are records in the response // 2. There are records in the response
// 3. The fetching replica doesn't want to wait for the partition to contain new data // 3. The fetching replica doesn't want to wait for the partition to contain new data
// 4. The fetching replica needs to truncate because the log diverged // 4. The fetching replica needs to truncate because the log diverged
// 5. The fetching replica needs to fetch a snapshot // 5. The fetching replica needs to fetch a snapshot
// 6. The fetching replica should update its high-watermark
return completedFuture(response); return completedFuture(response);
} }
CompletableFuture<Long> future = fetchPurgatory.await( CompletableFuture<Long> future = fetchPurgatory.await(
fetchPartition.fetchOffset(), fetchPartition.fetchOffset(),
request.maxWaitMs()); request.maxWaitMs()
);
return future.handle((completionTimeMs, exception) -> { return future.handle((completionTimeMs, exception) -> {
if (exception != null) { if (exception != null) {
@ -1556,9 +1572,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
Optional.empty() Optional.empty()
); );
} }
} } else {
// FIXME: `completionTimeMs`, which can be null
logger.trace( logger.trace(
"Completing delayed fetch from {} starting at offset {} at {}", "Completing delayed fetch from {} starting at offset {} at {}",
replicaKey, replicaKey,
@ -1566,16 +1580,17 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
completionTimeMs completionTimeMs
); );
// It is safe to call tryCompleteFetchRequest because only the polling thread completes this // It is safe to call tryCompleteFetchRequest because only the polling thread completes
// future successfully. This is true because only the polling thread appends record batches to // this future successfully. The future is completed successfully either because of an
// the log from maybeAppendBatches. // append (maybeAppendBatches) or because the HWM was updated (onUpdateLeaderHighWatermark)
return tryCompleteFetchRequest( return tryCompleteFetchRequest(
requestMetadata.listenerName(), requestMetadata.listenerName(),
requestMetadata.apiVersion(), requestMetadata.apiVersion(),
replicaKey, replicaKey,
fetchPartition, fetchPartition,
time.milliseconds() completionTimeMs
); );
}
}); });
} }
@ -1633,18 +1648,29 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
} }
} }
private static boolean isPartitionDiverged(FetchResponseData.PartitionData partitionResponseData) { private static boolean isPartitionDiverged(
FetchResponseData.PartitionData partitionResponseData
) {
FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch(); FetchResponseData.EpochEndOffset divergingEpoch = partitionResponseData.divergingEpoch();
return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1; return divergingEpoch.epoch() != -1 || divergingEpoch.endOffset() != -1;
} }
private static boolean isPartitionSnapshotted(FetchResponseData.PartitionData partitionResponseData) { private static boolean isPartitionSnapshotted(
FetchResponseData.PartitionData partitionResponseData
) {
FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId(); FetchResponseData.SnapshotId snapshotId = partitionResponseData.snapshotId();
return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1; return snapshotId.epoch() != -1 || snapshotId.endOffset() != -1;
} }
private static boolean isHighWatermarkUpdated(
FetchResponseData.PartitionData partitionResponseData,
FetchRequestData.FetchPartition partitionRequestData
) {
return partitionRequestData.highWatermark() < partitionResponseData.highWatermark();
}
private static OptionalInt optionalLeaderId(int leaderIdOrNil) { private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
if (leaderIdOrNil < 0) if (leaderIdOrNil < 0)
return OptionalInt.empty(); return OptionalInt.empty();
@ -2882,6 +2908,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
.setLastFetchedEpoch(log.lastFetchedEpoch()) .setLastFetchedEpoch(log.lastFetchedEpoch())
.setFetchOffset(log.endOffset().offset()) .setFetchOffset(log.endOffset().offset())
.setReplicaDirectoryId(quorum.localDirectoryId()) .setReplicaDirectoryId(quorum.localDirectoryId())
.setHighWatermark(quorum.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L))
); );
return request return request

View File

@ -53,7 +53,7 @@ public class KafkaRaftClientClusterAuthTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0, 0); RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0, 0, context.client.highWatermark());
FetchResponseData response = new FetchResponseData() FetchResponseData response = new FetchResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
context.deliverResponse( context.deliverResponse(

View File

@ -17,6 +17,7 @@
package org.apache.kafka.raft; package org.apache.kafka.raft;
import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ArbitraryMemoryRecords; import org.apache.kafka.common.record.ArbitraryMemoryRecords;
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider; import org.apache.kafka.common.record.InvalidMemoryRecordsProvider;
@ -33,7 +34,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -72,7 +76,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset(); long oldLogEndOffset = context.log.endOffset().offset();
@ -107,7 +111,7 @@ public final class KafkaRaftClientFetchTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, OptionalLong.empty());
long oldLogEndOffset = context.log.endOffset().offset(); long oldLogEndOffset = context.log.endOffset().offset();
int numberOfRecords = 10; int numberOfRecords = 10;
@ -149,4 +153,327 @@ public final class KafkaRaftClientFetchTest {
// Check that only the first batch was appended because the second batch has a greater epoch // Check that only the first batch was appended because the second batch has a greater epoch
assertEquals(oldLogEndOffset + numberOfRecords, context.log.endOffset().offset()); assertEquals(oldLogEndOffset + numberOfRecords, context.log.endOffset().offset());
} }
@Test
void testHighWatermarkSentInFetchRequest() throws Exception {
int epoch = 2;
int localId = KafkaRaftClientTest.randomReplicaId();
ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, electedLeader)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, electedLeader.id())
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
var localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
epoch,
OptionalLong.empty()
);
// Set the HWM to the LEO
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
electedLeader.id(),
MemoryRecords.EMPTY,
localLogEndOffset,
Errors.NONE
)
);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
epoch,
OptionalLong.of(localLogEndOffset)
);
}
@Test
void testDefaultHwmDeferred() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testUnknownHwmDeferredWhenLeaderDoesNotKnowHwm() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.empty(),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testOutdatedHwmCompletedWhenLeaderKnowsHwm() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
// FETCH response completed when remote replica doesn't know HWM
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.empty(),
Integer.MAX_VALUE
)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
// FETCH response completed when remote replica has outdated HWM
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset - 1),
Integer.MAX_VALUE
)
);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(localLogEndOffset, epoch);
}
@Test
void testUnchangedHighWatermarkDeferred() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
}
@Test
void testUpdatedHighWatermarkCompleted() throws Exception {
var epoch = 2;
var local = KafkaRaftClientTest.replicaKey(
KafkaRaftClientTest.randomReplicaId(),
true
);
var voter = KafkaRaftClientTest.replicaKey(local.id() + 1, true);
var remote = KafkaRaftClientTest.replicaKey(local.id() + 2, true);
RaftClientTestContext context = new RaftClientTestContext.Builder(
local.id(),
local.directoryId().get()
)
.appendToLog(epoch, List.of("a", "b", "c"))
.appendToLog(epoch, List.of("d", "e", "f"))
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(local, voter)), KRaftVersion.KRAFT_VERSION_1
)
.withUnknownLeader(epoch)
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_1166_PROTOCOL)
.build();
context.unattachedToLeader();
epoch = context.currentEpoch();
// Establish a HWM (3) but don't set it to the LEO
context.deliverRequest(context.fetchRequest(epoch, voter, 3L, 2, 0));
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id()));
var localLogEndOffset = context.log.endOffset().offset();
var lastFetchedEpoch = context.log.lastFetchedEpoch();
context.deliverRequest(
context.fetchRequest(
epoch,
remote,
localLogEndOffset,
lastFetchedEpoch,
OptionalLong.of(localLogEndOffset),
Integer.MAX_VALUE
)
);
// Check that the fetch response was deferred
for (var i = 0; i < 10; ++i) {
context.client.poll();
assertEquals(List.of(), context.drainSentResponses(ApiKeys.FETCH));
}
// Update the HWM and complete the deferred FETCH response
context.deliverRequest(
context.fetchRequest(epoch, voter, localLogEndOffset, lastFetchedEpoch, 0)
);
context.pollUntilResponse();
// Check that two fetch requests were completed
var fetchResponses = context.drainSentResponses(ApiKeys.FETCH);
for (var fetchResponse : fetchResponses) {
var partitionResponse = context.assertFetchResponseData(fetchResponse);
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));
assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
assertEquals(localLogEndOffset, partitionResponse.highWatermark());
}
}
} }

View File

@ -65,7 +65,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) { if (hasFetchedFromLeader) {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -350,7 +350,7 @@ public class KafkaRaftClientPreVoteTest {
if (hasFetchedFromLeader) { if (hasFetchedFromLeader) {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -654,7 +654,7 @@ public class KafkaRaftClientPreVoteTest {
// After fetching successfully from the leader once, follower will no longer grant PreVotes // After fetching successfully from the leader once, follower will no longer grant PreVotes
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),

View File

@ -229,7 +229,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader // check that follower will send fetch request to leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// check if leader response were to contain bootstrap snapshot id, follower would not send fetch snapshot request // check if leader response were to contain bootstrap snapshot id, follower would not send fetch snapshot request
context.deliverResponse( context.deliverResponse(
@ -239,7 +239,7 @@ public class KafkaRaftClientReconfigTest {
); );
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@Test @Test
@ -259,7 +259,7 @@ public class KafkaRaftClientReconfigTest {
// check that follower will send fetch request to leader // check that follower will send fetch request to leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// check that before receiving bootstrap records from leader, follower is not in the voter set // check that before receiving bootstrap records from leader, follower is not in the voter set
assertFalse(context.client.quorum().isVoter(follower)); assertFalse(context.client.quorum().isVoter(follower));
@ -2142,7 +2142,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2179,7 +2179,7 @@ public class KafkaRaftClientReconfigTest {
// after sending an update voter the next request should be a fetch // after sending an update voter the next request should be a fetch
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -2215,7 +2215,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2255,7 +2255,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2306,7 +2306,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2342,7 +2342,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Election a new leader causes the replica to resend update voter request // Election a new leader causes the replica to resend update voter request
int newEpoch = epoch + 1; int newEpoch = epoch + 1;
@ -2354,7 +2354,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0); context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2390,7 +2390,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0); context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark());
} }
@Test @Test
@ -2640,7 +2640,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2661,7 +2661,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// after more than 3 fetch timeouts the update voter period timer should have expired. // after more than 3 fetch timeouts the update voter period timer should have expired.
// check that the update voter period timer doesn't remain at zero (0) and cause the message queue to get // check that the update voter period timer doesn't remain at zero (0) and cause the message queue to get
@ -2701,7 +2701,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2721,7 +2721,7 @@ public class KafkaRaftClientReconfigTest {
// expect one last FETCH request // expect one last FETCH request
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// don't send a response but increase the time // don't send a response but increase the time
context.time.sleep(context.requestTimeoutMs() - 1); context.time.sleep(context.requestTimeoutMs() - 1);
@ -2781,7 +2781,7 @@ public class KafkaRaftClientReconfigTest {
context.time.sleep(context.fetchTimeoutMs - 1); context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2818,7 +2818,7 @@ public class KafkaRaftClientReconfigTest {
// check that there is a fetch to the new leader // check that there is a fetch to the new leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
assertEquals(voter2.id(), fetchRequest.destination().id()); assertEquals(voter2.id(), fetchRequest.destination().id());
} }
@ -2837,7 +2837,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
assertEquals(-2, fetchRequest.destination().id()); assertEquals(-2, fetchRequest.destination().id());
} }

View File

@ -155,7 +155,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset(); long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
fetchRequest.destination(), fetchRequest.destination(),
@ -163,7 +169,12 @@ public final class KafkaRaftClientSnapshotTest {
); );
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch()); context.assertSentFetchRequest(
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that listener was notified of the new snapshot // Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) { try (SnapshotReader<String> snapshot = context.listener.drainHandledSnapshot().get()) {
@ -197,7 +208,13 @@ public final class KafkaRaftClientSnapshotTest {
long localLogEndOffset = context.log.endOffset().offset(); long localLogEndOffset = context.log.endOffset().offset();
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); context.assertFetchRequestData(
fetchRequest,
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
fetchRequest.destination(), fetchRequest.destination(),
@ -205,7 +222,12 @@ public final class KafkaRaftClientSnapshotTest {
); );
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, localLogEndOffset, snapshotId.epoch()); context.assertSentFetchRequest(
epoch,
localLogEndOffset,
snapshotId.epoch(),
context.client.highWatermark()
);
RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId)); RaftClientTestContext.MockListener secondListener = new RaftClientTestContext.MockListener(OptionalInt.of(localId));
context.client.register(secondListener); context.client.register(secondListener);
@ -1145,7 +1167,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1162,7 +1184,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1179,7 +1201,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Fetch timer is not reset; sleeping for remainder should transition to prospective // Fetch timer is not reset; sleeping for remainder should transition to prospective
context.time.sleep(context.fetchTimeoutMs - slept); context.time.sleep(context.fetchTimeoutMs - slept);
@ -1206,7 +1228,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1249,7 +1271,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch()); context.assertFetchRequestData(
fetchRequest,
epoch,
snapshotId.offset(),
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that the snapshot was written to the log // Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
@ -1279,7 +1307,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1354,7 +1382,13 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, snapshotId.offset(), snapshotId.epoch()); context.assertFetchRequestData(
fetchRequest,
epoch,
snapshotId.offset(),
snapshotId.epoch(),
context.client.highWatermark()
);
// Check that the snapshot was written to the log // Check that the snapshot was written to the log
RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get(); RawSnapshotReader snapshot = context.log.readSnapshot(snapshotId).get();
@ -1384,7 +1418,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1426,7 +1460,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -1446,7 +1480,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1488,7 +1522,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -1507,7 +1541,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1549,7 +1583,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -1568,7 +1602,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1639,7 +1673,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1687,7 +1721,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request // Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1736,7 +1770,7 @@ public final class KafkaRaftClientSnapshotTest {
// Follower should send a fetch request // Follower should send a fetch request
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -1755,7 +1789,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2014,7 +2048,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 1L, 1); context.assertFetchRequestData(fetchRequest, epoch, 1L, 1, context.client.highWatermark());
// The response does not advance the high watermark // The response does not advance the high watermark
List<String> records1 = List.of("b", "c"); List<String> records1 = List.of("b", "c");
@ -2042,7 +2076,7 @@ public final class KafkaRaftClientSnapshotTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3); context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, context.client.highWatermark());
List<String> records2 = List.of("d", "e", "f"); List<String> records2 = List.of("d", "e", "f");
int batch2Epoch = 4; int batch2Epoch = 4;

View File

@ -818,7 +818,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -863,7 +863,7 @@ class KafkaRaftClientTest {
context.assertUnknownLeaderAndNoVotedCandidate(0); context.assertUnknownLeaderAndNoVotedCandidate(0);
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 0); RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached()); assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter()); assertTrue(context.client.quorum().isVoter());
@ -1430,7 +1430,7 @@ class KafkaRaftClientTest {
context.time.sleep(1); context.time.sleep(1);
context.client.poll(); context.client.poll();
context.assertSentFetchRequest(leaderEpoch, 0, 0); context.assertSentFetchRequest(leaderEpoch, 0, 0, OptionalLong.empty());
context.time.sleep(context.electionBackoffMaxMs); context.time.sleep(context.electionBackoffMaxMs);
context.client.poll(); context.client.poll();
@ -1885,7 +1885,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 0L, 0); context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
} }
@ParameterizedTest @ParameterizedTest
@ -1906,7 +1906,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId); context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch); context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
} }
@ParameterizedTest @ParameterizedTest
@ -1926,7 +1926,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId); context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch); context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs); context.time.sleep(context.fetchTimeoutMs);
context.client.poll(); context.client.poll();
@ -1952,7 +1952,7 @@ class KafkaRaftClientTest {
context.assertElectedLeader(epoch, otherNodeId); context.assertElectedLeader(epoch, otherNodeId);
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 1L, lastEpoch); context.assertSentFetchRequest(epoch, 1L, lastEpoch, OptionalLong.empty());
context.time.sleep(context.fetchTimeoutMs); context.time.sleep(context.fetchTimeoutMs);
context.pollUntilRequest(); context.pollUntilRequest();
@ -1979,13 +1979,13 @@ class KafkaRaftClientTest {
.build(); .build();
context.pollUntilRequest(); context.pollUntilRequest();
context.assertSentFetchRequest(epoch, 0L, 0); context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached()); assertTrue(context.client.quorum().isUnattached());
context.time.sleep(context.electionTimeoutMs() * 2); context.time.sleep(context.electionTimeoutMs() * 2);
context.pollUntilRequest(); context.pollUntilRequest();
assertTrue(context.client.quorum().isUnattached()); assertTrue(context.client.quorum().isUnattached());
context.assertSentFetchRequest(epoch, 0L, 0); context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// confirm no vote request was sent // confirm no vote request was sent
assertEquals(0, context.channel.drainSendQueue().size()); assertEquals(0, context.channel.drainSendQueue().size());
@ -1998,7 +1998,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
// observer cannot transition to prospective though // observer cannot transition to prospective though
assertTrue(context.client.quorum().isUnattached()); assertTrue(context.client.quorum().isUnattached());
context.assertSentFetchRequest(epoch + 1, 0L, 0); context.assertSentFetchRequest(epoch + 1, 0L, 0, OptionalLong.empty());
assertEquals(0, context.channel.drainSendQueue().size()); assertEquals(0, context.channel.drainSendQueue().size());
} }
@ -2017,7 +2017,7 @@ class KafkaRaftClientTest {
.build(); .build();
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached()); assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter()); assertTrue(context.client.quorum().isVoter());
@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2081,7 +2081,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2095,7 +2095,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2128,7 +2128,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0); context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2145,7 +2145,7 @@ class KafkaRaftClientTest {
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id()); assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -2173,7 +2173,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id())); assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0); context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, context.client.highWatermark());
// Send a response with the leader and epoch // Send a response with the leader and epoch
context.deliverResponse( context.deliverResponse(
@ -2189,7 +2189,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id()); assertEquals(leaderId, toLeaderFetchRequest.destination().id());
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0); context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs()); context.time.sleep(context.requestTimeoutMs());
@ -2198,7 +2198,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id())); assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0); context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0, context.client.highWatermark());
// Deliver the delayed responses from the leader // Deliver the delayed responses from the leader
Records records = context.buildBatch(0L, 3, List.of("a", "b")); Records records = context.buildBatch(0L, 3, List.of("a", "b"));
@ -2247,7 +2247,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound discoveryFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(discoveryFetchRequest.destination().id())); assertFalse(voters.contains(discoveryFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(discoveryFetchRequest.destination().id()));
context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0); context.assertFetchRequestData(discoveryFetchRequest, 0, 0L, 0, context.client.highWatermark());
// Send a response with the leader and epoch // Send a response with the leader and epoch
context.deliverResponse( context.deliverResponse(
@ -2263,7 +2263,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound toLeaderFetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, toLeaderFetchRequest.destination().id()); assertEquals(leaderId, toLeaderFetchRequest.destination().id());
context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0); context.assertFetchRequestData(toLeaderFetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs()); context.time.sleep(context.requestTimeoutMs());
@ -2272,7 +2272,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound retryToBootstrapServerFetchRequest = context.assertSentFetchRequest();
assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id())); assertFalse(voters.contains(retryToBootstrapServerFetchRequest.destination().id()));
assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(retryToBootstrapServerFetchRequest.destination().id()));
context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0); context.assertFetchRequestData(retryToBootstrapServerFetchRequest, epoch, 0L, 0, context.client.highWatermark());
// At this point toLeaderFetchRequest has timed out but retryToBootstrapServerFetchRequest // At this point toLeaderFetchRequest has timed out but retryToBootstrapServerFetchRequest
// is still waiting for a response. // is still waiting for a response.
@ -2378,17 +2378,23 @@ class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
// null cluster id is accepted // null cluster id is accepted
context.deliverRequest(context.fetchRequest(epoch, null, otherNodeKey, -5L, 0, 0)); context.deliverRequest(
context.fetchRequest(epoch, null, otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse(); context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId)); context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
// empty cluster id is rejected // empty cluster id is rejected
context.deliverRequest(context.fetchRequest(epoch, "", otherNodeKey, -5L, 0, 0)); context.deliverRequest(
context.fetchRequest(epoch, "", otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse(); context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID); context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
// invalid cluster id is rejected // invalid cluster id is rejected
context.deliverRequest(context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0, 0)); context.deliverRequest(
context.fetchRequest(epoch, "invalid-uuid", otherNodeKey, -5L, 0, OptionalLong.of(Long.MAX_VALUE), 0)
);
context.pollUntilResponse(); context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID); context.assertSentFetchPartitionResponse(Errors.INCONSISTENT_CLUSTER_ID);
} }
@ -2778,7 +2784,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader // Wait until we have a Fetch inflight to the leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now await the fetch timeout and become prospective // Now await the fetch timeout and become prospective
context.time.sleep(context.fetchTimeoutMs); context.time.sleep(context.fetchTimeoutMs);
@ -2818,7 +2824,7 @@ class KafkaRaftClientTest {
// Wait until we have a Fetch inflight to the leader // Wait until we have a Fetch inflight to the leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
// Now receive a BeginEpoch from `voter3` // Now receive a BeginEpoch from `voter3`
context.deliverRequest(context.beginEpochRequest(epoch + 1, voter3)); context.deliverRequest(context.beginEpochRequest(epoch + 1, voter3));
@ -2915,7 +2921,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id()); assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest1.correlationId(), fetchRequest1.correlationId(),
@ -2929,7 +2935,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id()); assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -2954,7 +2960,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id()); assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs()); context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest(); context.pollUntilRequest();
@ -2964,7 +2970,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id()); assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -2985,12 +2991,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc) .withKip853Rpc(withKip853Rpc)
.build(); .build();
context.discoverLeaderAsObserver(leaderId, epoch); context.discoverLeaderAsObserver(leaderId, epoch, context.client.highWatermark());
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id()); assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest1.correlationId(), fetchRequest1.correlationId(),
@ -3004,7 +3010,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id()); assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest2.correlationId(), fetchRequest2.correlationId(),
@ -3034,12 +3040,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc) .withKip853Rpc(withKip853Rpc)
.build(); .build();
context.discoverLeaderAsObserver(leaderId, epoch); context.discoverLeaderAsObserver(leaderId, epoch, context.client.highWatermark());
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id()); assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0, context.client.highWatermark());
context.time.sleep(context.requestTimeoutMs()); context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest(); context.pollUntilRequest();
@ -3049,7 +3055,7 @@ class KafkaRaftClientTest {
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id()); assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse( context.deliverResponse(
fetchRequest2.correlationId(), fetchRequest2.correlationId(),
@ -3727,7 +3733,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b")); Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE); FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse( context.deliverResponse(
@ -3758,7 +3764,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
Records records = context.buildBatch(0L, 3, List.of("a", "b")); Records records = context.buildBatch(0L, 3, List.of("a", "b"));
FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE); FetchResponseData response = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse( context.deliverResponse(
@ -3789,7 +3795,7 @@ class KafkaRaftClientTest {
// Receive an empty fetch response // Receive an empty fetch response
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0); RaftRequest.Outbound fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
FetchResponseData fetchResponse = context.fetchResponse( FetchResponseData fetchResponse = context.fetchResponse(
epoch, epoch,
otherNodeId, otherNodeId,
@ -3809,7 +3815,7 @@ class KafkaRaftClientTest {
// Receive some records in the next poll, but do not advance high watermark // Receive some records in the next poll, but do not advance high watermark
context.pollUntilRequest(); context.pollUntilRequest();
Records records = context.buildBatch(0L, epoch, List.of("a", "b")); Records records = context.buildBatch(0L, epoch, List.of("a", "b"));
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0); fetchQuorumRequest = context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.of(0));
fetchResponse = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE); fetchResponse = context.fetchResponse(epoch, otherNodeId, records, 0L, Errors.NONE);
context.deliverResponse( context.deliverResponse(
fetchQuorumRequest.correlationId(), fetchQuorumRequest.correlationId(),
@ -3822,7 +3828,7 @@ class KafkaRaftClientTest {
// The next fetch response is empty, but should still advance the high watermark // The next fetch response is empty, but should still advance the high watermark
context.pollUntilRequest(); context.pollUntilRequest();
fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch); fetchQuorumRequest = context.assertSentFetchRequest(epoch, 2L, epoch, OptionalLong.of(0));
fetchResponse = context.fetchResponse( fetchResponse = context.fetchResponse(
epoch, epoch,
otherNodeId, otherNodeId,
@ -3974,10 +3980,20 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 3L, lastEpoch); RaftRequest.Outbound request = context.assertSentFetchRequest(
epoch,
3L,
lastEpoch,
OptionalLong.empty()
);
FetchResponseData response = context.divergingFetchResponse(epoch, otherNodeId, 2L, FetchResponseData response = context.divergingFetchResponse(
lastEpoch, 1L); epoch,
otherNodeId,
2L,
lastEpoch,
1L
);
context.deliverResponse(request.correlationId(), request.destination(), response); context.deliverResponse(request.correlationId(), request.destination(), response);
// Poll again to complete truncation // Poll again to complete truncation
@ -3987,7 +4003,7 @@ class KafkaRaftClientTest {
// Now we should be fetching // Now we should be fetching
context.client.poll(); context.client.poll();
context.assertSentFetchRequest(epoch, 2L, lastEpoch); context.assertSentFetchRequest(epoch, 2L, lastEpoch, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest
@ -4255,7 +4271,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// The response does not advance the high watermark // The response does not advance the high watermark
List<String> records1 = List.of("a", "b", "c"); List<String> records1 = List.of("a", "b", "c");
@ -4276,7 +4292,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id())); assertTrue(voters.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3); context.assertFetchRequestData(fetchRequest, epoch, 3L, 3, context.client.highWatermark());
// The high watermark advances to include the first batch we fetched // The high watermark advances to include the first batch we fetched
List<String> records2 = List.of("d", "e", "f"); List<String> records2 = List.of("d", "e", "f");
@ -4514,7 +4530,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest1.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest1.destination().id()));
context.assertFetchRequestData(fetchRequest1, 0, 0L, 0); context.assertFetchRequestData(fetchRequest1, 0, 0L, 0, context.client.highWatermark());
int leaderEpoch = 5; int leaderEpoch = 5;
@ -4531,7 +4547,7 @@ class KafkaRaftClientTest {
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest2.destination().id()); assertEquals(leaderId, fetchRequest2.destination().id());
context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, leaderEpoch, 0L, 0, context.client.highWatermark());
List<String> records = List.of("a", "b", "c"); List<String> records = List.of("a", "b", "c");
MemoryRecords batch1 = context.buildBatch(0L, 3, records); MemoryRecords batch1 = context.buildBatch(0L, 3, records);

View File

@ -957,7 +957,7 @@ public final class RaftClientTestContext {
return requests; return requests;
} }
private List<RaftResponse.Outbound> drainSentResponses( List<RaftResponse.Outbound> drainSentResponses(
ApiKeys apiKey ApiKeys apiKey
) { ) {
List<RaftResponse.Outbound> res = new ArrayList<>(); List<RaftResponse.Outbound> res = new ArrayList<>();
@ -1114,23 +1114,20 @@ public final class RaftClientTestContext {
RaftRequest.Outbound assertSentFetchRequest( RaftRequest.Outbound assertSentFetchRequest(
int epoch, int epoch,
long fetchOffset, long fetchOffset,
int lastFetchedEpoch int lastFetchedEpoch,
OptionalLong highWatermark
) { ) {
List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue(); List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
assertEquals(1, sentMessages.size()); assertEquals(1, sentMessages.size());
RaftRequest.Outbound raftRequest = sentMessages.get(0); RaftRequest.Outbound raftRequest = sentMessages.get(0);
assertFetchRequestData(raftRequest, epoch, fetchOffset, lastFetchedEpoch); assertFetchRequestData(raftRequest, epoch, fetchOffset, lastFetchedEpoch, highWatermark);
return raftRequest; return raftRequest;
} }
FetchResponseData.PartitionData assertSentFetchPartitionResponse() { FetchResponseData.PartitionData assertFetchResponseData(RaftResponse.Outbound message) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH); assertEquals(ApiKeys.FETCH.id, message.data().apiKey());
assertEquals( FetchResponseData response = (FetchResponseData) message.data();
1, sentMessages.size(), "Found unexpected sent messages " + sentMessages);
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data();
assertEquals(Errors.NONE, Errors.forCode(response.errorCode())); assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
assertEquals(1, response.responses().size()); assertEquals(1, response.responses().size());
@ -1152,17 +1149,30 @@ public final class RaftClientTestContext {
return partitionResponse; return partitionResponse;
} }
FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals(
1,
sentMessages.size(),
"Found unexpected sent messages " + sentMessages
);
return assertFetchResponseData(sentMessages.get(0));
}
void assertSentFetchPartitionResponse(Errors topLevelError) { void assertSentFetchPartitionResponse(Errors topLevelError) {
List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH); List<RaftResponse.Outbound> sentMessages = drainSentResponses(ApiKeys.FETCH);
assertEquals( assertEquals(
1, sentMessages.size(), "Found unexpected sent messages " + sentMessages); 1,
sentMessages.size(),
"Found unexpected sent messages " + sentMessages
);
RaftResponse.Outbound raftMessage = sentMessages.get(0); RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey()); assertEquals(ApiKeys.FETCH.id, raftMessage.data().apiKey());
FetchResponseData response = (FetchResponseData) raftMessage.data(); FetchResponseData response = (FetchResponseData) raftMessage.data();
assertEquals(topLevelError, Errors.forCode(response.errorCode())); assertEquals(topLevelError, Errors.forCode(response.errorCode()));
} }
MemoryRecords assertSentFetchPartitionResponse( MemoryRecords assertSentFetchPartitionResponse(
Errors error, Errors error,
int epoch, int epoch,
@ -1375,7 +1385,8 @@ public final class RaftClientTestContext {
void discoverLeaderAsObserver( void discoverLeaderAsObserver(
int leaderId, int leaderId,
int epoch int epoch,
OptionalLong highWatermark
) throws Exception { ) throws Exception {
pollUntilRequest(); pollUntilRequest();
RaftRequest.Outbound fetchRequest = assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = assertSentFetchRequest();
@ -1384,7 +1395,7 @@ public final class RaftClientTestContext {
startingVoters.voterIds().contains(destinationId) || bootstrapIds.contains(destinationId), startingVoters.voterIds().contains(destinationId) || bootstrapIds.contains(destinationId),
String.format("id %d is not in sets %s or %s", destinationId, startingVoters, bootstrapIds) String.format("id %d is not in sets %s or %s", destinationId, startingVoters, bootstrapIds)
); );
assertFetchRequestData(fetchRequest, 0, 0L, 0); assertFetchRequestData(fetchRequest, 0, 0L, 0, highWatermark);
deliverResponse( deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -1672,7 +1683,8 @@ public final class RaftClientTestContext {
RaftRequest.Outbound message, RaftRequest.Outbound message,
int epoch, int epoch,
long fetchOffset, long fetchOffset,
int lastFetchedEpoch int lastFetchedEpoch,
OptionalLong highWatermark
) { ) {
assertInstanceOf( assertInstanceOf(
FetchRequestData.class, FetchRequestData.class,
@ -1691,6 +1703,7 @@ public final class RaftClientTestContext {
assertEquals(fetchOffset, fetchPartition.fetchOffset()); assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch()); assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
assertEquals(localId.orElse(-1), request.replicaState().replicaId()); assertEquals(localId.orElse(-1), request.replicaState().replicaId());
assertEquals(highWatermark.orElse(-1), fetchPartition.highWatermark());
// Assert that voters have flushed up to the fetch offset // Assert that voters have flushed up to the fetch offset
if ((localId.isPresent() && startingVoters.voterIds().contains(localId.getAsInt())) || if ((localId.isPresent() && startingVoters.voterIds().contains(localId.getAsInt())) ||
@ -1716,6 +1729,24 @@ public final class RaftClientTestContext {
long fetchOffset, long fetchOffset,
int lastFetchedEpoch, int lastFetchedEpoch,
int maxWaitTimeMs int maxWaitTimeMs
) {
return fetchRequest(
epoch,
replicaKey,
fetchOffset,
lastFetchedEpoch,
OptionalLong.of(Long.MAX_VALUE),
maxWaitTimeMs
);
}
FetchRequestData fetchRequest(
int epoch,
ReplicaKey replicaKey,
long fetchOffset,
int lastFetchedEpoch,
OptionalLong highWatermark,
int maxWaitTimeMs
) { ) {
return fetchRequest( return fetchRequest(
epoch, epoch,
@ -1723,6 +1754,7 @@ public final class RaftClientTestContext {
replicaKey, replicaKey,
fetchOffset, fetchOffset,
lastFetchedEpoch, lastFetchedEpoch,
highWatermark,
maxWaitTimeMs maxWaitTimeMs
); );
} }
@ -1733,6 +1765,7 @@ public final class RaftClientTestContext {
ReplicaKey replicaKey, ReplicaKey replicaKey,
long fetchOffset, long fetchOffset,
int lastFetchedEpoch, int lastFetchedEpoch,
OptionalLong highWatermark,
int maxWaitTimeMs int maxWaitTimeMs
) { ) {
FetchRequestData request = RaftUtil.singletonFetchRequest( FetchRequestData request = RaftUtil.singletonFetchRequest(
@ -1742,7 +1775,8 @@ public final class RaftClientTestContext {
fetchPartition fetchPartition
.setCurrentLeaderEpoch(epoch) .setCurrentLeaderEpoch(epoch)
.setLastFetchedEpoch(lastFetchedEpoch) .setLastFetchedEpoch(lastFetchedEpoch)
.setFetchOffset(fetchOffset); .setFetchOffset(fetchOffset)
.setHighWatermark(highWatermark.orElse(-1));
if (raftProtocol.isReconfigSupported()) { if (raftProtocol.isReconfigSupported()) {
fetchPartition fetchPartition
.setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)); .setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
@ -1932,7 +1966,9 @@ public final class RaftClientTestContext {
} }
private short fetchRpcVersion() { private short fetchRpcVersion() {
if (raftProtocol.isReconfigSupported()) { if (raftProtocol.isHwmInFetchSupported()) {
return 18;
} else if (raftProtocol.isReconfigSupported()) {
return 17; return 17;
} else { } else {
return 16; return 16;
@ -2236,7 +2272,9 @@ public final class RaftClientTestContext {
// dynamic quorum reconfiguration support // dynamic quorum reconfiguration support
KIP_853_PROTOCOL, KIP_853_PROTOCOL,
// preVote support // preVote support
KIP_996_PROTOCOL; KIP_996_PROTOCOL,
// HWM in FETCH request support
KIP_1166_PROTOCOL;
boolean isKRaftSupported() { boolean isKRaftSupported() {
return isAtLeast(KIP_595_PROTOCOL); return isAtLeast(KIP_595_PROTOCOL);
@ -2250,6 +2288,10 @@ public final class RaftClientTestContext {
return isAtLeast(KIP_996_PROTOCOL); return isAtLeast(KIP_996_PROTOCOL);
} }
boolean isHwmInFetchSupported() {
return isAtLeast(KIP_1166_PROTOCOL);
}
private boolean isAtLeast(RaftProtocol otherRpc) { private boolean isAtLeast(RaftProtocol otherRpc) {
return this.compareTo(otherRpc) >= 0; return this.compareTo(otherRpc) >= 0;
} }

View File

@ -116,6 +116,9 @@ public enum MetadataVersion {
// Streams groups are early access in 4.1 (KIP-1071). // Streams groups are early access in 4.1 (KIP-1071).
IBP_4_1_IV0(26, "4.1", "IV0", false), IBP_4_1_IV0(26, "4.1", "IV0", false),
// Send FETCH version 18 in the replica fetcher (KIP-1166)
IBP_4_1_IV1(27, "4.1", "IV1", false),
// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of // Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
// IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be // IBP_4_2_IVx accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
// a placeholder. // a placeholder.
@ -126,7 +129,7 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE *** // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. *** // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
IBP_4_2_IV0(27, "4.2", "IV0", false), IBP_4_2_IV0(28, "4.2", "IV0", false),
// Enables "streams" groups by default for new clusters (KIP-1071). // Enables "streams" groups by default for new clusters (KIP-1071).
// //
@ -134,7 +137,7 @@ public enum MetadataVersion {
// *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS *** // *** STREAMS GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A STREAMS ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. *** // *** DYNAMICALLY TO TRY OUT THE EARLY ACCESS CAPABILITY. ***
IBP_4_2_IV1(28, "4.2", "IV1", false); IBP_4_2_IV1(29, "4.2", "IV1", false);
// NOTES when adding a new version: // NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version // Update the default version in @ClusterTest annotation to point to the latest version
@ -264,13 +267,15 @@ public enum MetadataVersion {
} }
public short fetchRequestVersion() { public short fetchRequestVersion() {
if (this.isAtLeast(IBP_3_9_IV0)) { if (isAtLeast(IBP_4_1_IV1)) {
return 18;
} else if (isAtLeast(IBP_3_9_IV0)) {
return 17; return 17;
} else if (this.isAtLeast(IBP_3_7_IV4)) { } else if (isAtLeast(IBP_3_7_IV4)) {
return 16; return 16;
} else if (this.isAtLeast(IBP_3_5_IV1)) { } else if (isAtLeast(IBP_3_5_IV1)) {
return 15; return 15;
} else if (this.isAtLeast(IBP_3_5_IV0)) { } else if (isAtLeast(IBP_3_5_IV0)) {
return 14; return 14;
} else { } else {
return 13; return 13;

View File

@ -121,8 +121,16 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version")) "disable", "--feature", "metadata.version"))
); );
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed for all features since the following " + assertEquals(
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput); String.format(
"Could not disable metadata.version. The update failed for all features since the " +
"following feature had an error: Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions %s-%s",
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel()
),
commandOutput
);
commandOutput = ToolsTestUtils.captureStandardOut(() -> commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),