KAFKA-19078 Automatic controller addition to cluster metadata partition (#19589)

Add the `controller.quorum.auto.join.enable` configuration. When enabled
with KIP-853 supported, follower controllers who are observers (their
replica id + directory id are not in the voter set) will:

- Automatically remove voter set entries which match their replica id
but not directory id by sending the `RemoveVoterRPC` to the leader.
- Automatically add themselves as a voter when their replica id is not
present in the voter set by sending the `AddVoterRPC` to the leader.

Reviewers: José Armando García Sancio
 [jsancio@apache.org](mailto:jsancio@apache.org),   Chia-Ping Tsai
 [chia7712@gmail.com](mailto:chia7712@gmail.com)
This commit is contained in:
Kevin Wu 2025-08-13 10:20:18 -05:00 committed by GitHub
parent dbf3808f53
commit 92d8cb562a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 819 additions and 205 deletions

View File

@ -570,6 +570,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} (an explicit security mapping for each controller listener is required if ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG} is non-empty, or if there are security protocols other than PLAINTEXT in use)")
}
}
// controller.quorum.auto.join.enable must be false for KRaft broker-only
require(!quorumConfig.autoJoin,
s"${QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG} is only supported when ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role.")
// warn that only the first controller listener is used if there is more than one
if (controllerListenerNames.size > 1) {
warn(s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} has multiple entries; only the first will be used since ${KRaftConfigs.PROCESS_ROLES_CONFIG}=broker: ${controllerListenerNames}")

View File

@ -24,11 +24,14 @@ import org.apache.kafka.clients.admin.RaftVoterEndpoint;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@ -164,4 +167,69 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
@Test
public void testControllersAutoJoinStandaloneVoter() throws Exception {
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
setStandalone(true).
build()
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), voters.get(replicaId));
}
});
}
}
}
@Test
public void testNewVoterAutoRemovesAndAdds() throws Exception {
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build();
// Configure the initial voters with one voter having a different directory ID.
// This simulates the case where the controller failed and is brought back up with a different directory ID.
final Map<Integer, Uuid> initialVoters = new HashMap<>();
final var oldDirectoryId = Uuid.randomUuid();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
oldDirectoryId : controllerNode.metadataDirectoryId()
);
}
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
setInitialVoterSet(initialVoters).
build()
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), voters.get(replicaId));
}
});
}
}
}
}

View File

@ -1489,6 +1489,18 @@ class KafkaConfigTest {
assertEquals(expected, addresses)
}
@Test
def testInvalidQuorumAutoJoinForKRaftBroker(): Unit = {
val props = TestUtils.createBrokerConfig(0)
props.setProperty(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, String.valueOf(true))
assertEquals(
"requirement failed: controller.quorum.auto.join.enable is only " +
"supported when process.roles contains the 'controller' role.",
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props)).getMessage
)
}
@Test
def testAcceptsLargeId(): Unit = {
val largeBrokerId = 2000

View File

@ -40,8 +40,8 @@ public class FollowerState implements EpochState {
private final Set<Integer> voters;
// Used for tracking the expiration of both the Fetch and FetchSnapshot requests
private final Timer fetchTimer;
// Used to track when to send another update voter request
private final Timer updateVoterPeriodTimer;
// Used to track when to send another add, remove, or update voter request
private final Timer updateVoterSetPeriodTimer;
/* Used to track if the replica has fetched successfully from the leader at least once since
* the transition to follower in this epoch. If the replica has not yet fetched successfully,
@ -76,7 +76,7 @@ public class FollowerState implements EpochState {
this.votedKey = votedKey;
this.voters = voters;
this.fetchTimer = time.timer(fetchTimeoutMs);
this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs());
this.updateVoterSetPeriodTimer = time.timer(updateVoterPeriodMs());
this.highWatermark = highWatermark;
this.log = logContext.logger(FollowerState.class);
}
@ -154,19 +154,19 @@ public class FollowerState implements EpochState {
return fetchTimeoutMs;
}
public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
updateVoterPeriodTimer.update(currentTimeMs);
return updateVoterPeriodTimer.isExpired();
public boolean hasUpdateVoterSetPeriodExpired(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
return updateVoterSetPeriodTimer.isExpired();
}
public long remainingUpdateVoterPeriodMs(long currentTimeMs) {
updateVoterPeriodTimer.update(currentTimeMs);
return updateVoterPeriodTimer.remainingMs();
public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
return updateVoterSetPeriodTimer.remainingMs();
}
public void resetUpdateVoterPeriod(long currentTimeMs) {
updateVoterPeriodTimer.update(currentTimeMs);
updateVoterPeriodTimer.reset(updateVoterPeriodMs());
public void resetUpdateVoterSetPeriod(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
updateVoterSetPeriodTimer.reset(updateVoterPeriodMs());
}
public boolean hasUpdatedLeader() {

View File

@ -19,11 +19,13 @@ package org.apache.kafka.raft;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.network.ListenerName;
@ -31,11 +33,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AddRaftVoterRequest;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
@ -181,20 +185,25 @@ public class KafkaNetworkChannel implements NetworkChannel {
static AbstractRequest.Builder<? extends AbstractRequest> buildRequest(ApiMessage requestData) {
if (requestData instanceof VoteRequestData)
return new VoteRequest.Builder((VoteRequestData) requestData);
if (requestData instanceof BeginQuorumEpochRequestData)
else if (requestData instanceof BeginQuorumEpochRequestData)
return new BeginQuorumEpochRequest.Builder((BeginQuorumEpochRequestData) requestData);
if (requestData instanceof EndQuorumEpochRequestData)
else if (requestData instanceof EndQuorumEpochRequestData)
return new EndQuorumEpochRequest.Builder((EndQuorumEpochRequestData) requestData);
if (requestData instanceof FetchRequestData)
else if (requestData instanceof FetchRequestData)
return new FetchRequest.SimpleBuilder((FetchRequestData) requestData);
if (requestData instanceof FetchSnapshotRequestData)
else if (requestData instanceof FetchSnapshotRequestData)
return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData);
if (requestData instanceof UpdateRaftVoterRequestData)
else if (requestData instanceof UpdateRaftVoterRequestData)
return new UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
if (requestData instanceof ApiVersionsRequestData)
else if (requestData instanceof AddRaftVoterRequestData)
return new AddRaftVoterRequest.Builder((AddRaftVoterRequestData) requestData);
else if (requestData instanceof RemoveRaftVoterRequestData)
return new RemoveRaftVoterRequest.Builder((RemoveRaftVoterRequestData) requestData);
else if (requestData instanceof ApiVersionsRequestData)
return new ApiVersionsRequest.Builder((ApiVersionsRequestData) requestData,
ApiKeys.API_VERSIONS.oldestVersion(),
ApiKeys.API_VERSIONS.latestVersion());
throw new IllegalArgumentException("Unexpected type for requestData: " + requestData);
else
throw new IllegalArgumentException("Unexpected type for requestData: " + requestData);
}
}

View File

@ -180,7 +180,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private final Logger logger;
private final Time time;
private final int fetchMaxWaitMs;
private final boolean followersAlwaysFlush;
private final boolean canBecomeVoter;
private final String clusterId;
private final Endpoints localListeners;
private final SupportedVersionRange localSupportedKRaftVersion;
@ -229,7 +229,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
* non-participating observer.
*
* @param nodeDirectoryId the node directory id, cannot be the zero uuid
* @param followersAlwaysFlush instruct followers to always fsync when appending to the log
* @param canBecomeVoter instruct followers to always fsync when appending to the log
*/
public KafkaRaftClient(
OptionalInt nodeId,
@ -240,7 +240,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
Time time,
ExpirationService expirationService,
LogContext logContext,
boolean followersAlwaysFlush,
boolean canBecomeVoter,
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
@ -258,7 +258,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
time,
expirationService,
MAX_FETCH_WAIT_MS,
followersAlwaysFlush,
canBecomeVoter,
clusterId,
bootstrapServers,
localListeners,
@ -280,7 +280,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
Time time,
ExpirationService expirationService,
int fetchMaxWaitMs,
boolean followersAlwaysFlush,
boolean canBecomeVoter,
String clusterId,
Collection<InetSocketAddress> bootstrapServers,
Endpoints localListeners,
@ -308,7 +308,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
this.localListeners = localListeners;
this.localSupportedKRaftVersion = localSupportedKRaftVersion;
this.fetchMaxWaitMs = fetchMaxWaitMs;
this.followersAlwaysFlush = followersAlwaysFlush;
this.canBecomeVoter = canBecomeVoter;
this.logger = logContext.logger(KafkaRaftClient.class);
this.random = random;
this.quorumConfig = quorumConfig;
@ -1839,7 +1839,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
);
}
if (quorum.isVoter() || followersAlwaysFlush) {
if (quorum.isVoter() || canBecomeVoter) {
// the leader only requires that voters have flushed their log before sending a Fetch
// request. Because of reconfiguration some observers (that are getting added to the
// voter set) need to flush the disk because the leader may assume that they are in the
@ -2291,6 +2291,25 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
);
}
private boolean handleAddVoterResponse(
RaftResponse.Inbound responseMetadata,
long currentTimeMs
) {
final AddRaftVoterResponseData data = (AddRaftVoterResponseData) responseMetadata.data();
final Errors error = Errors.forCode(data.errorCode());
/* These error codes indicate the replica was successfully added or the leader is unable to
* process the request. In either case, reset the update voter set timer to back off.
*/
if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
error == Errors.DUPLICATE_VOTER) {
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
@ -2334,6 +2353,25 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
);
}
private boolean handleRemoveVoterResponse(
RaftResponse.Inbound responseMetadata,
long currentTimeMs
) {
final RemoveRaftVoterResponseData data = (RemoveRaftVoterResponseData) responseMetadata.data();
final Errors error = Errors.forCode(data.errorCode());
/* These error codes indicate the replica was successfully removed or the leader is unable to
* process the request. In either case, reset the update voter set timer to back off.
*/
if (error == Errors.NONE || error == Errors.REQUEST_TIMED_OUT ||
error == Errors.VOTER_NOT_FOUND) {
quorum.followerStateOrThrow().resetUpdateVoterSetPeriod(currentTimeMs);
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
RaftRequest.Inbound requestMetadata,
long currentTimeMs
@ -2629,6 +2667,14 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs);
break;
case ADD_RAFT_VOTER:
handledSuccessfully = handleAddVoterResponse(response, currentTimeMs);
break;
case REMOVE_RAFT_VOTER:
handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs);
break;
default:
throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
}
@ -3247,7 +3293,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
logger.info("Transitioning to Prospective state due to fetch timeout");
transitionToProspective(currentTimeMs);
backoffMs = 0;
} else if (state.hasUpdateVoterPeriodExpired(currentTimeMs)) {
} else if (state.hasUpdateVoterSetPeriodExpired(currentTimeMs)) {
final boolean resetUpdateVoterTimer;
if (shouldSendUpdateVoteRequest(state)) {
var sendResult = maybeSendUpdateVoterRequest(state, currentTimeMs);
@ -3261,7 +3307,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
}
if (resetUpdateVoterTimer) {
state.resetUpdateVoterPeriod(currentTimeMs);
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
} else {
backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
@ -3271,13 +3317,56 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
backoffMs,
Math.min(
state.remainingFetchTimeMs(currentTimeMs),
state.remainingUpdateVoterPeriodMs(currentTimeMs)
state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
)
);
}
private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, long currentTimeMs) {
/* When the cluster supports reconfiguration, only replicas that can become a voter
* and are configured to auto join should attempt to automatically join the voter
* set for the configured topic partition.
*/
return partitionState.lastKraftVersion().isReconfigSupported() && canBecomeVoter &&
quorumConfig.autoJoin() && state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
}
private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
return maybeSendFetchToBestNode(state, currentTimeMs);
GracefulShutdown shutdown = this.shutdown.get();
final long backoffMs;
if (shutdown != null) {
// If we are an observer, then we can shutdown immediately. We want to
// skip potentially sending any add or remove voter RPCs.
backoffMs = 0;
} else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
final var localReplicaKey = quorum.localReplicaKeyOrThrow();
final var voters = partitionState.lastVoterSet();
final RequestSendResult sendResult;
if (voters.voterIds().contains(localReplicaKey.id())) {
/* The replica's id is in the voter set but the replica is not a voter because
* the directory id of the voter set entry is different. Remove the old voter.
* Local replica is not in the voter set because the replica is an observer.
*/
final var oldVoter = voters.voterKeys()
.stream()
.filter(replicaKey -> replicaKey.id() == localReplicaKey.id())
.findFirst()
.get();
sendResult = maybeSendRemoveVoterRequest(state, oldVoter, currentTimeMs);
} else {
sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
}
backoffMs = sendResult.timeToWaitMs();
if (sendResult.requestSent()) {
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
} else {
backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
}
return Math.min(
backoffMs,
state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
);
}
private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) {
@ -3329,6 +3418,23 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
);
}
private AddRaftVoterRequestData buildAddVoterRequest() {
return RaftUtil.addVoterRequest(
clusterId,
quorumConfig.requestTimeoutMs(),
quorum.localReplicaKeyOrThrow(),
localListeners,
false
);
}
private RemoveRaftVoterRequestData buildRemoveVoterRequest(ReplicaKey replicaKey) {
return RaftUtil.removeVoterRequest(
clusterId,
replicaKey
);
}
private RequestSendResult maybeSendUpdateVoterRequest(FollowerState state, long currentTimeMs) {
return maybeSendRequest(
currentTimeMs,
@ -3337,6 +3443,29 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
);
}
private RequestSendResult maybeSendAddVoterRequest(
FollowerState state,
long currentTimeMs
) {
return maybeSendRequest(
currentTimeMs,
state.leaderNode(channel.listenerName()),
this::buildAddVoterRequest
);
}
private RequestSendResult maybeSendRemoveVoterRequest(
FollowerState state,
ReplicaKey replicaKey,
long currentTimeMs
) {
return maybeSendRequest(
currentTimeMs,
state.leaderNode(channel.listenerName()),
() -> buildRemoveVoterRequest(replicaKey)
);
}
private long pollUnattached(long currentTimeMs) {
UnattachedState state = quorum.unattachedStateOrThrow();
if (quorum.isVoter()) {

View File

@ -35,6 +35,7 @@ import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
@ -102,6 +103,11 @@ public class QuorumConfig {
public static final String QUORUM_RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
public static final String QUORUM_AUTO_JOIN_ENABLE_CONFIG = QUORUM_PREFIX + "auto.join.enable";
public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "Controls whether a KRaft controller should automatically " +
"join the cluster metadata partition for its cluster id.";
public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
@ -110,7 +116,8 @@ public class QuorumConfig {
.define(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, INT, DEFAULT_QUORUM_ELECTION_BACKOFF_MAX_MS, atLeast(0), HIGH, QUORUM_ELECTION_BACKOFF_MAX_MS_DOC)
.define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM, QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC);
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
.define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN, DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC);
private final List<String> voters;
private final List<String> bootstrapServers;
@ -120,6 +127,7 @@ public class QuorumConfig {
private final int electionBackoffMaxMs;
private final int fetchTimeoutMs;
private final int appendLingerMs;
private final boolean autoJoin;
public QuorumConfig(AbstractConfig abstractConfig) {
this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
@ -130,6 +138,7 @@ public class QuorumConfig {
this.electionBackoffMaxMs = abstractConfig.getInt(QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG);
this.fetchTimeoutMs = abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
this.autoJoin = abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG);
}
public List<String> voters() {
@ -164,6 +173,10 @@ public class QuorumConfig {
return appendLingerMs;
}
public boolean autoJoin() {
return autoJoin;
}
private static Integer parseVoterId(String idString) {
try {
return Integer.parseInt(idString);

View File

@ -64,6 +64,8 @@ public class RaftUtil {
case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code());
case API_VERSIONS -> new ApiVersionsResponseData().setErrorCode(error.code());
case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code());
case ADD_RAFT_VOTER -> new AddRaftVoterResponseData().setErrorCode(error.code());
case REMOVE_RAFT_VOTER -> new RemoveRaftVoterResponseData().setErrorCode(error.code());
default -> throw new IllegalArgumentException("Received response for unexpected request type: " + apiKey);
};
}
@ -524,14 +526,16 @@ public class RaftUtil {
String clusterId,
int timeoutMs,
ReplicaKey voter,
Endpoints listeners
Endpoints listeners,
boolean ackWhenCommitted
) {
return new AddRaftVoterRequestData()
.setClusterId(clusterId)
.setTimeoutMs(timeoutMs)
.setVoterId(voter.id())
.setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
.setListeners(listeners.toAddVoterRequest());
.setListeners(listeners.toAddVoterRequest())
.setAckWhenCommitted(ackWhenCommitted);
}
public static AddRaftVoterResponseData addVoterResponse(

View File

@ -22,12 +22,14 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.network.ListenerName;
@ -36,6 +38,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddRaftVoterResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
@ -44,6 +47,7 @@ import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
@ -88,7 +92,9 @@ public class KafkaNetworkChannelTest {
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.FETCH,
ApiKeys.FETCH_SNAPSHOT,
ApiKeys.UPDATE_RAFT_VOTER
ApiKeys.UPDATE_RAFT_VOTER,
ApiKeys.ADD_RAFT_VOTER,
ApiKeys.REMOVE_RAFT_VOTER
);
private final int requestTimeoutMs = 30000;
@ -316,6 +322,21 @@ public class KafkaNetworkChannelTest {
Endpoints.empty()
);
case ADD_RAFT_VOTER:
return RaftUtil.addVoterRequest(
clusterId,
requestTimeoutMs,
ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
Endpoints.empty(),
true
);
case REMOVE_RAFT_VOTER:
return RaftUtil.removeVoterRequest(
clusterId,
ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)
);
default:
throw new AssertionError("Unexpected api " + key);
}
@ -345,6 +366,8 @@ public class KafkaNetworkChannelTest {
case FETCH -> new FetchResponseData().setErrorCode(error.code());
case FETCH_SNAPSHOT -> new FetchSnapshotResponseData().setErrorCode(error.code());
case UPDATE_RAFT_VOTER -> new UpdateRaftVoterResponseData().setErrorCode(error.code());
case ADD_RAFT_VOTER -> new AddRaftVoterResponseData().setErrorCode(error.code());
case REMOVE_RAFT_VOTER -> new RemoveRaftVoterResponseData().setErrorCode(error.code());
default -> throw new AssertionError("Unexpected api " + key);
};
}
@ -363,6 +386,10 @@ public class KafkaNetworkChannelTest {
code = ((FetchSnapshotResponseData) response).errorCode();
} else if (response instanceof UpdateRaftVoterResponseData) {
code = ((UpdateRaftVoterResponseData) response).errorCode();
} else if (response instanceof AddRaftVoterResponseData) {
code = ((AddRaftVoterResponseData) response).errorCode();
} else if (response instanceof RemoveRaftVoterResponseData) {
code = ((RemoveRaftVoterResponseData) response).errorCode();
} else {
throw new IllegalArgumentException("Unexpected type for responseData: " + response);
}
@ -383,6 +410,10 @@ public class KafkaNetworkChannelTest {
return new FetchSnapshotResponse((FetchSnapshotResponseData) responseData);
} else if (responseData instanceof UpdateRaftVoterResponseData) {
return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData) responseData);
} else if (responseData instanceof AddRaftVoterResponseData) {
return new AddRaftVoterResponse((AddRaftVoterResponseData) responseData);
} else if (responseData instanceof RemoveRaftVoterResponseData) {
return new RemoveRaftVoterResponse((RemoveRaftVoterResponseData) responseData);
} else {
throw new IllegalArgumentException("Unexpected type for responseData: " + responseData);
}

View File

@ -0,0 +1,323 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.server.common.KRaftVersion;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
import static org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_595_PROTOCOL;
import static org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL;
public class KafkaRaftClientAutoJoinTest {
@Test
public void testAutoRemoveOldVoter() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var oldFollower = replicaKey(leader.id() + 1, true);
final var newFollowerKey = replicaKey(oldFollower.id(), true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
newFollowerKey.id(),
newFollowerKey.directoryId().get()
)
.withRaftProtocol(KIP_853_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, oldFollower)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the next request should be a remove voter request
pollAndDeliverRemoveVoter(context, oldFollower);
// after sending a remove voter the next request should be a fetch
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the replica should send remove voter again because the fetch did not update the voter set
pollAndDeliverRemoveVoter(context, oldFollower);
}
@Test
public void testAutoAddNewVoter() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var follower = replicaKey(leader.id() + 1, true);
final var newVoter = replicaKey(follower.id() + 1, true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
newVoter.id(),
newVoter.directoryId().get()
)
.withRaftProtocol(KIP_853_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the next request should be an add voter request
pollAndSendAddVoter(context, newVoter);
// expire the add voter request, the next request should be a fetch
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the replica should send add voter again because the completed fetch
// did not update the voter set, and its timer has expired
final var addVoterRequest = pollAndSendAddVoter(context, newVoter);
// deliver the add voter response, this is possible before a completed fetch because of KIP-1186
context.deliverResponse(
addVoterRequest.correlationId(),
addVoterRequest.destination(),
RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
);
// verify the replica can perform a fetch to commit the new voter set
pollAndDeliverFetchToUpdateVoterSet(
context,
epoch,
VoterSetTest.voterSet(Stream.of(leader, newVoter))
);
}
@Test
public void testObserverRemovesOldVoterAndAutoJoins() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var oldFollower = replicaKey(leader.id() + 1, true);
final var newFollowerKey = replicaKey(oldFollower.id(), true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
newFollowerKey.id(),
newFollowerKey.directoryId().get()
)
.withRaftProtocol(KIP_853_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, oldFollower)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
// advance time and complete a fetch to trigger the remove voter request
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the next request should be a remove voter request
pollAndDeliverRemoveVoter(context, oldFollower);
// after sending a remove voter the next request should be a fetch
// this fetch will remove the old follower from the voter set
pollAndDeliverFetchToUpdateVoterSet(
context,
epoch,
VoterSetTest.voterSet(Stream.of(leader))
);
// advance time and complete a fetch to trigger the add voter request
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
// the next request should be an add voter request
final var addVoterRequest = pollAndSendAddVoter(context, newFollowerKey);
// deliver the add voter response, this is possible before a completed fetch because of KIP-1186
context.deliverResponse(
addVoterRequest.correlationId(),
addVoterRequest.destination(),
RaftUtil.addVoterResponse(Errors.NONE, Errors.NONE.message())
);
// verify the replica can perform a fetch to commit the new voter set
pollAndDeliverFetchToUpdateVoterSet(
context,
epoch,
VoterSetTest.voterSet(Stream.of(leader, newFollowerKey))
);
// advance time and complete a fetch and expire the update voter set timer
// the next request should be a fetch because the log voter configuration is up-to-date
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.pollUntilRequest();
context.assertSentFetchRequest();
}
@Test
public void testObserversDoNotAutoJoin() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var follower = replicaKey(leader.id() + 1, true);
final var newObserver = replicaKey(follower.id() + 1, true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
newObserver.id(),
newObserver.directoryId().get()
)
.withRaftProtocol(KIP_853_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(true)
.withCanBecomeVoter(false)
.build();
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
// When canBecomeVoter == false, the replica should not send an add voter request
final var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@Test
public void testObserverDoesNotAddItselfWhenAutoJoinDisabled() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var follower = replicaKey(leader.id() + 1, true);
final var observer = replicaKey(follower.id() + 1, true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
observer.id(),
observer.directoryId().get()
)
.withRaftProtocol(KIP_853_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(false)
.withCanBecomeVoter(true)
.build();
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
// When autoJoin == false, the replica should not send an add voter request
final var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
@Test
public void testObserverDoesNotAutoJoinWithKRaftVersion0() throws Exception {
final var leader = replicaKey(randomReplicaId(), true);
final var follower = replicaKey(leader.id() + 1, true);
final var observer = replicaKey(follower.id() + 1, true);
final int epoch = 1;
final var context = new RaftClientTestContext.Builder(
observer.id(),
observer.directoryId().get()
)
.withRaftProtocol(KIP_595_PROTOCOL)
.withStartingVoters(
VoterSetTest.voterSet(Stream.of(leader, follower)), KRaftVersion.KRAFT_VERSION_0
)
.withElectedLeader(epoch, leader.id())
.withAutoJoin(true)
.withCanBecomeVoter(true)
.build();
context.advanceTimeAndCompleteFetch(epoch, leader.id(), true);
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
// When kraft.version == 0, the replica should not send an add voter request
final var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
}
private void pollAndDeliverRemoveVoter(
RaftClientTestContext context,
ReplicaKey oldFollower
) throws Exception {
context.pollUntilRequest();
final var removeRequest = context.assertSentRemoveVoterRequest(oldFollower);
context.deliverResponse(
removeRequest.correlationId(),
removeRequest.destination(),
RaftUtil.removeVoterResponse(Errors.NONE, Errors.NONE.message())
);
}
private RaftRequest.Outbound pollAndSendAddVoter(
RaftClientTestContext context,
ReplicaKey newVoter
) throws Exception {
context.pollUntilRequest();
return context.assertSentAddVoterRequest(
newVoter,
context.client.quorum().localVoterNodeOrThrow().listeners()
);
}
private void pollAndDeliverFetchToUpdateVoterSet(
RaftClientTestContext context,
int epoch,
VoterSet newVoterSet
) throws Exception {
context.pollUntilRequest();
final var fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(
fetchRequest,
epoch,
context.log.endOffset().offset(),
context.log.lastFetchedEpoch(),
context.client.highWatermark()
);
// deliver the fetch response with the updated voter set
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
fetchRequest.destination().id(),
MemoryRecords.withVotersRecord(
context.log.endOffset().offset(),
context.time.milliseconds(),
epoch,
BufferSupplier.NO_CACHING.get(300),
newVoterSet.toVotersRecord((short) 0)
),
context.log.endOffset().offset() + 1,
Errors.NONE
)
);
// poll kraft to update the replica's voter set
context.client.poll();
}
private int randomReplicaId() {
return ThreadLocalRandom.current().nextInt(1025);
}
}

View File

@ -78,8 +78,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class KafkaRaftClientReconfigTest {
private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD = 1;
@Test
public void testLeaderWritesBootstrapRecords() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
@ -2225,28 +2223,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH requests until the UpdateRaftVoter request is sent
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
local,
@ -2298,28 +2276,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
local,
@ -2389,28 +2347,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
local,
@ -2437,28 +2375,8 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilResponse();
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, newEpoch, 0L, 0, context.client.highWatermark());
context.advanceTimeAndCompleteFetch(newEpoch, voter1.id(), true);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
newEpoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
updateRequest = context.assertSentUpdateVoterRequest(
local,
@ -2723,29 +2641,9 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
// update voter should not be sent because the local listener is not different from the voter set
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
@ -2784,26 +2682,7 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting up to the last FETCH request before the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), false);
// expect one last FETCH request
context.pollUntilRequest();
@ -2864,28 +2743,8 @@ public class KafkaRaftClientReconfigTest {
.build();
// waiting for FETCH request until the UpdateRaftVoter request is set
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) {
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.advanceTimeAndCompleteFetch(epoch, voter1.id(), true);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
context.fetchResponse(
epoch,
voter1.id(),
MemoryRecords.EMPTY,
0L,
Errors.NONE
)
);
// poll kraft to handle the fetch response
context.client.poll();
}
context.time.sleep(context.fetchTimeoutMs - 1);
context.pollUntilRequest();
RaftRequest.Outbound updateRequest = context.assertSentUpdateVoterRequest(
local,

View File

@ -3786,7 +3786,7 @@ class KafkaRaftClientTest {
@ParameterizedTest
@CsvSource({ "true, true", "true, false", "false, true", "false, false" })
public void testObserverReplication(boolean withKip853Rpc, boolean alwaysFlush) throws Exception {
public void testObserverReplication(boolean withKip853Rpc, boolean canBecomeVoter) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
int epoch = 5;
@ -3795,7 +3795,7 @@ class KafkaRaftClientTest {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, otherNodeId)
.withKip853Rpc(withKip853Rpc)
.withAlwaysFlush(alwaysFlush)
.withCanBecomeVoter(canBecomeVoter)
.build();
context.assertElectedLeader(epoch, otherNodeId);
@ -3812,7 +3812,7 @@ class KafkaRaftClientTest {
context.client.poll();
assertEquals(2L, context.log.endOffset().offset());
long firstUnflushedOffset = alwaysFlush ? 2L : 0L;
long firstUnflushedOffset = canBecomeVoter ? 2L : 0L;
assertEquals(firstUnflushedOffset, context.log.firstUnflushedOffset());
}

View File

@ -34,6 +34,7 @@ public class QuorumConfigTest {
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1"));
}
private void assertInvalidConfig(Map<String, Object> overrideConfig) {
@ -46,6 +47,7 @@ public class QuorumConfigTest {
props.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, "10");
props.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, "10");
props.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "10");
props.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true);
props.putAll(overrideConfig);

View File

@ -143,10 +143,12 @@ public final class RaftClientTestContext {
// Used to determine which RPC request and response to construct
final RaftProtocol raftProtocol;
// Used to determine if the local kraft client was configured to always flush
final boolean alwaysFlush;
final boolean canBecomeVoter;
private final List<RaftResponse.Outbound> sentResponses = new ArrayList<>();
private static final int NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD = 1;
public static final class Builder {
static final int DEFAULT_ELECTION_TIMEOUT_MS = 10000;
@ -177,10 +179,11 @@ public final class RaftClientTestContext {
private MemoryPool memoryPool = MemoryPool.NONE;
private Optional<List<InetSocketAddress>> bootstrapServers = Optional.empty();
private RaftProtocol raftProtocol = RaftProtocol.KIP_595_PROTOCOL;
private boolean alwaysFlush = false;
private boolean canBecomeVoter = false;
private VoterSet startingVoters = VoterSet.empty();
private Endpoints localListeners = Endpoints.empty();
private boolean isStartingVotersStatic = false;
private boolean autoJoin = false;
public Builder(int localId, Set<Integer> staticVoters) {
this(OptionalInt.of(localId), staticVoters);
@ -309,8 +312,8 @@ public final class RaftClientTestContext {
return this;
}
Builder withAlwaysFlush(boolean alwaysFlush) {
this.alwaysFlush = alwaysFlush;
Builder withCanBecomeVoter(boolean canBecomeVoter) {
this.canBecomeVoter = canBecomeVoter;
return this;
}
@ -376,6 +379,11 @@ public final class RaftClientTestContext {
return this;
}
Builder withAutoJoin(boolean autoJoin) {
this.autoJoin = autoJoin;
return this;
}
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
@ -404,13 +412,14 @@ public final class RaftClientTestContext {
Endpoints.empty() :
this.localListeners;
Map<String, Integer> configMap = new HashMap<>();
Map<String, Object> configMap = new HashMap<>();
configMap.put(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
configMap.put(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS);
configMap.put(QuorumConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG, electionTimeoutMs);
configMap.put(QuorumConfig.QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG, ELECTION_BACKOFF_MAX_MS);
configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, FETCH_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, appendLingerMs);
configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, autoJoin);
QuorumConfig quorumConfig = new QuorumConfig(new AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
List<InetSocketAddress> computedBootstrapServers = bootstrapServers.orElseGet(() -> {
@ -436,7 +445,7 @@ public final class RaftClientTestContext {
time,
new MockExpirationService(time),
FETCH_MAX_WAIT_MS,
alwaysFlush,
canBecomeVoter,
clusterId,
computedBootstrapServers,
localListeners,
@ -474,7 +483,7 @@ public final class RaftClientTestContext {
.boxed()
.collect(Collectors.toSet()),
raftProtocol,
alwaysFlush,
canBecomeVoter,
metrics,
externalKRaftMetrics,
listener
@ -503,7 +512,7 @@ public final class RaftClientTestContext {
VoterSet startingVoters,
Set<Integer> bootstrapIds,
RaftProtocol raftProtocol,
boolean alwaysFlush,
boolean canBecomeVoter,
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics,
MockListener listener
@ -521,7 +530,7 @@ public final class RaftClientTestContext {
this.startingVoters = startingVoters;
this.bootstrapIds = bootstrapIds;
this.raftProtocol = raftProtocol;
this.alwaysFlush = alwaysFlush;
this.canBecomeVoter = canBecomeVoter;
this.metrics = metrics;
this.externalKRaftMetrics = externalKRaftMetrics;
this.listener = listener;
@ -949,6 +958,51 @@ public final class RaftClientTestContext {
channel.mockReceive(new RaftResponse.Inbound(correlationId, versionedResponse, source));
}
/**
* Advance time and complete an empty fetch to reset the fetch timer.
* This is used to expire the update voter set timer without also expiring the fetch timer,
* which is needed for add, remove, and update voter tests.
* For voters and observers, polling after exiting this method expires the update voter set timer.
* @param epoch - the current epoch
* @param leaderId - the leader id
* @param expireUpdateVoterSetTimer - if true, advance time again to expire this timer
*/
void advanceTimeAndCompleteFetch(
int epoch,
int leaderId,
boolean expireUpdateVoterSetTimer
) throws Exception {
for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_VOTER_SET_PERIOD; i++) {
time.sleep(fetchTimeoutMs - 1);
pollUntilRequest();
final var fetchRequest = assertSentFetchRequest();
assertFetchRequestData(
fetchRequest,
epoch,
log.endOffset().offset(),
log.lastFetchedEpoch(),
client.highWatermark()
);
deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
fetchResponse(
epoch,
leaderId,
MemoryRecords.EMPTY,
log.endOffset().offset(),
Errors.NONE
)
);
// poll kraft to handle the fetch response
client.poll();
}
if (expireUpdateVoterSetTimer) {
time.sleep(fetchTimeoutMs - 1);
}
}
List<RaftRequest.Outbound> assertSentBeginQuorumEpochRequest(int epoch, Set<Integer> destinationIds) {
List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch);
assertEquals(destinationIds.size(), requests.size());
@ -1259,6 +1313,26 @@ public final class RaftClientTestContext {
return sentRequests.get(0);
}
RaftRequest.Outbound assertSentAddVoterRequest(
ReplicaKey replicaKey,
Endpoints endpoints
) {
final var sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.ADD_RAFT_VOTER));
assertEquals(1, sentRequests.size());
final var request = sentRequests.get(0);
assertInstanceOf(AddRaftVoterRequestData.class, request.data());
final var addRaftVoterRequestData = (AddRaftVoterRequestData) request.data();
assertEquals(clusterId, addRaftVoterRequestData.clusterId());
assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId());
assertEquals(replicaKey.directoryId().get(), addRaftVoterRequestData.voterDirectoryId());
assertEquals(endpoints, Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners()));
assertEquals(false, addRaftVoterRequestData.ackWhenCommitted());
return request;
}
AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) {
List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.ADD_RAFT_VOTER);
assertEquals(1, sentResponses.size());
@ -1272,6 +1346,23 @@ public final class RaftClientTestContext {
return addVoterResponse;
}
RaftRequest.Outbound assertSentRemoveVoterRequest(
ReplicaKey replicaKey
) {
final var sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.REMOVE_RAFT_VOTER));
assertEquals(1, sentRequests.size());
final var request = sentRequests.get(0);
assertInstanceOf(RemoveRaftVoterRequestData.class, request.data());
final var removeRaftVoterRequestData = (RemoveRaftVoterRequestData) request.data();
assertEquals(clusterId, removeRaftVoterRequestData.clusterId());
assertEquals(replicaKey.id(), removeRaftVoterRequestData.voterId());
assertEquals(replicaKey.directoryId().get(), removeRaftVoterRequestData.voterDirectoryId());
return request;
}
RemoveRaftVoterResponseData assertSentRemoveVoterResponse(Errors error) {
List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.REMOVE_RAFT_VOTER);
assertEquals(1, sentResponses.size());
@ -1707,7 +1798,7 @@ public final class RaftClientTestContext {
// Assert that voters have flushed up to the fetch offset
if ((localId.isPresent() && startingVoters.voterIds().contains(localId.getAsInt())) ||
alwaysFlush
canBecomeVoter
) {
assertEquals(
log.firstUnflushedOffset(),
@ -1921,7 +2012,8 @@ public final class RaftClientTestContext {
clusterId,
timeoutMs,
voter,
endpoints
endpoints,
true
);
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
import org.apache.kafka.common.message.AddRaftVoterResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
@ -42,6 +43,7 @@ import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
import org.apache.kafka.common.message.VoteResponseData;
@ -93,6 +95,10 @@ public class RaftUtilTest {
RaftUtil.errorResponse(ApiKeys.FETCH, Errors.NONE));
assertEquals(new FetchSnapshotResponseData().setErrorCode(Errors.NONE.code()),
RaftUtil.errorResponse(ApiKeys.FETCH_SNAPSHOT, Errors.NONE));
assertEquals(new AddRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
RaftUtil.errorResponse(ApiKeys.ADD_RAFT_VOTER, Errors.NONE));
assertEquals(new RemoveRaftVoterResponseData().setErrorCode(Errors.NONE.code()),
RaftUtil.errorResponse(ApiKeys.REMOVE_RAFT_VOTER, Errors.NONE));
assertThrows(IllegalArgumentException.class, () -> RaftUtil.errorResponse(ApiKeys.PRODUCE, Errors.NONE));
}

View File

@ -27,10 +27,12 @@ import kafka.server.SharedServer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -114,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
private boolean standalone;
private Optional<Map<Integer, Uuid>> initialVoterSet = Optional.empty();
private boolean deleteOnClose;
public Builder(TestKitNodes nodes) {
@ -130,6 +134,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
return this;
}
public Builder setStandalone(boolean standalone) {
this.standalone = standalone;
return this;
}
public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
this.initialVoterSet = Optional.of(initialVoterSet);
return this;
}
private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());
@ -184,6 +198,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
// reduce log cleaner offset map memory usage
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
// do not include auto join config in broker nodes
if (brokerNode != null) {
props.remove(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG);
}
// Add associated broker node property overrides
if (brokerNode != null) {
props.putAll(brokerNode.propertyOverrides());
@ -323,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
faultHandlerFactory,
socketFactoryManager,
jaasFile,
standalone,
initialVoterSet,
deleteOnClose);
}
@ -368,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
private final boolean standalone;
private final Optional<Map<Integer, Uuid>> initialVoterSet;
private final boolean deleteOnClose;
private KafkaClusterTestKit(
@ -378,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile,
boolean standalone,
Optional<Map<Integer, Uuid>> initialVoterSet,
boolean deleteOnClose
) {
/*
@ -395,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
this.standalone = standalone;
this.initialVoterSet = initialVoterSet;
this.deleteOnClose = deleteOnClose;
}
@ -425,8 +452,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
boolean writeMetadataDirectory
) {
try {
final var nodeId = ensemble.nodeId().getAsInt();
Formatter formatter = new Formatter();
formatter.setNodeId(ensemble.nodeId().getAsInt());
formatter.setNodeId(nodeId);
formatter.setClusterId(ensemble.clusterId().get());
if (writeMetadataDirectory) {
formatter.setDirectories(ensemble.logDirProps().keySet());
@ -452,15 +480,50 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
StringBuilder dynamicVotersBuilder = new StringBuilder();
String prefix = "";
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
int port = socketFactoryManager.
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
controllerNode.id(), port, controllerNode.metadataDirectoryId()));
if (standalone) {
if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
final var controllerNode = nodes.controllerNodes().get(nodeId);
dynamicVotersBuilder.append(
String.format(
"%d@localhost:%d:%s",
controllerNode.id(),
socketFactoryManager.
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
controllerNode.metadataDirectoryId()
)
);
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
} else {
formatter.setNoInitialControllersFlag(true);
}
} else if (initialVoterSet.isPresent()) {
for (final var controllerNode : initialVoterSet.get().entrySet()) {
final var voterId = controllerNode.getKey();
final var voterDirectoryId = controllerNode.getValue();
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(
String.format(
"%d@localhost:%d:%s",
voterId,
socketFactoryManager.
getOrCreatePortForListener(voterId, controllerListenerName),
voterDirectoryId
)
);
}
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
} else {
for (TestKitNode controllerNode : nodes.controllerNodes().values()) {
int port = socketFactoryManager.
getOrCreatePortForListener(controllerNode.id(), controllerListenerName);
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
controllerNode.id(), port, controllerNode.metadataDirectoryId()));
}
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
formatter.run();
} catch (Exception e) {