mirror of https://github.com/apache/kafka.git
KAFKA-16540: Clear ELRs when min.insync.replicas is changed. (#18148)
In order to maintain the integrity of replication, we need to clear the ELRs of affected partitions when min.insync.replicas is changed. This could happen at the topic level, or at a global level if the cluster level default is changed. Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
f13a22af0b
commit
6235a73622
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigResource;
|
|||
import org.apache.kafka.common.config.ConfigResource.Type;
|
||||
import org.apache.kafka.common.config.TopicConfig;
|
||||
import org.apache.kafka.common.config.types.Password;
|
||||
import org.apache.kafka.common.metadata.ClearElrRecord;
|
||||
import org.apache.kafka.common.metadata.ConfigRecord;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.requests.ApiError;
|
||||
|
@ -54,8 +55,6 @@ import java.util.Optional;
|
|||
import java.util.function.Consumer;
|
||||
|
||||
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
|
||||
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
|
||||
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
|
||||
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
|
||||
|
@ -165,7 +164,8 @@ public class ConfigurationControlManager {
|
|||
ConfigurationValidator validator,
|
||||
Map<String, Object> staticConfig,
|
||||
int nodeId,
|
||||
FeatureControlManager featureControl) {
|
||||
FeatureControlManager featureControl
|
||||
) {
|
||||
this.log = logContext.logger(ConfigurationControlManager.class);
|
||||
this.snapshotRegistry = snapshotRegistry;
|
||||
this.configSchema = configSchema;
|
||||
|
@ -211,9 +211,32 @@ public class ConfigurationControlManager {
|
|||
outputRecords);
|
||||
outputResults.put(resourceEntry.getKey(), apiError);
|
||||
}
|
||||
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
|
||||
return ControllerResult.atomicOf(outputRecords, outputResults);
|
||||
}
|
||||
|
||||
List<ApiMessageAndVersion> createClearElrRecordsAsNeeded(List<ApiMessageAndVersion> input) {
|
||||
if (!featureControl.isElrFeatureEnabled()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<ApiMessageAndVersion> output = new ArrayList<>();
|
||||
for (ApiMessageAndVersion messageAndVersion : input) {
|
||||
if (messageAndVersion.message().apiKey() == CONFIG_RECORD.id()) {
|
||||
ConfigRecord record = (ConfigRecord) messageAndVersion.message();
|
||||
if (record.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG)) {
|
||||
if (Type.forId(record.resourceType()) == Type.TOPIC) {
|
||||
output.add(new ApiMessageAndVersion(
|
||||
new ClearElrRecord().
|
||||
setTopicName(record.resourceName()), (short) 0));
|
||||
} else {
|
||||
output.add(new ApiMessageAndVersion(new ClearElrRecord(), (short) 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
ControllerResult<ApiError> incrementalAlterConfig(
|
||||
ConfigResource configResource,
|
||||
Map<String, Entry<OpType, String>> keyToOps,
|
||||
|
@ -225,6 +248,8 @@ public class ConfigurationControlManager {
|
|||
keyToOps,
|
||||
newlyCreatedResource,
|
||||
outputRecords);
|
||||
|
||||
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
|
||||
return ControllerResult.atomicOf(outputRecords, apiError);
|
||||
}
|
||||
|
||||
|
@ -357,8 +382,8 @@ public class ConfigurationControlManager {
|
|||
" cannot be altered while ELR is enabled.");
|
||||
|
||||
private static final ApiError DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR =
|
||||
new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG +
|
||||
" cannot be removed while ELR is enabled.");
|
||||
new ApiError(INVALID_CONFIG, "Cluster-level " + MIN_IN_SYNC_REPLICAS_CONFIG +
|
||||
" cannot be removed while ELR is enabled.");
|
||||
|
||||
boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
|
||||
if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
|
||||
|
@ -407,6 +432,7 @@ public class ConfigurationControlManager {
|
|||
outputRecords,
|
||||
outputResults);
|
||||
}
|
||||
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
|
||||
return ControllerResult.atomicOf(outputRecords, outputResults);
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.kafka.common.metadata.AbortTransactionRecord;
|
|||
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
|
||||
import org.apache.kafka.common.metadata.BeginTransactionRecord;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.ClearElrRecord;
|
||||
import org.apache.kafka.common.metadata.ClientQuotaRecord;
|
||||
import org.apache.kafka.common.metadata.ConfigRecord;
|
||||
import org.apache.kafka.common.metadata.DelegationTokenRecord;
|
||||
|
@ -1294,6 +1295,9 @@ public final class QuorumController implements Controller {
|
|||
case REGISTER_CONTROLLER_RECORD:
|
||||
clusterControl.replay((RegisterControllerRecord) message);
|
||||
break;
|
||||
case CLEAR_ELR_RECORD:
|
||||
replicationControl.replay((ClearElrRecord) message);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unhandled record type " + type);
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
|
|||
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
|
||||
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
|
||||
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
|
||||
import org.apache.kafka.common.metadata.ClearElrRecord;
|
||||
import org.apache.kafka.common.metadata.FenceBrokerRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionChangeRecord;
|
||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||
|
@ -571,6 +572,57 @@ public class ReplicationControlManager {
|
|||
log.info("Replayed RemoveTopicRecord for topic {} with ID {}.", topic.name, record.topicId());
|
||||
}
|
||||
|
||||
public void replay(ClearElrRecord record) {
|
||||
if (record.topicName().isEmpty()) {
|
||||
replayClearAllElrs();
|
||||
} else {
|
||||
replayClearTopicElrs(record.topicName());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void replayClearAllElrs() {
|
||||
long numRemoved = 0;
|
||||
for (TopicControlInfo topic : topics.values()) {
|
||||
numRemoved += removeTopicElrs(topic);
|
||||
}
|
||||
log.info("Removed ELRs from {} partitions in all topics.", numRemoved);
|
||||
}
|
||||
|
||||
void replayClearTopicElrs(String topicName) {
|
||||
Uuid topicId = topicsByName.get(topicName);
|
||||
if (topicId == null) {
|
||||
throw new RuntimeException("Unable to find a topic named " + topicName +
|
||||
" in order to clear its ELRs.");
|
||||
}
|
||||
TopicControlInfo topic = topics.get(topicId);
|
||||
if (topic == null) {
|
||||
throw new RuntimeException("Unable to find a topic with ID " + topicId +
|
||||
" in order to clear its ELRs.");
|
||||
}
|
||||
int numRemoved = removeTopicElrs(topic);
|
||||
log.info("Removed ELRs from {} partitions of topic {}.", numRemoved, topicName);
|
||||
}
|
||||
|
||||
int removeTopicElrs(TopicControlInfo topic) {
|
||||
int numRemoved = 0;
|
||||
List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
|
||||
for (int partitionId : partitionIds) {
|
||||
PartitionRegistration partition = topic.parts.get(partitionId);
|
||||
PartitionRegistration nextPartition = partition.merge(
|
||||
new PartitionChangeRecord().
|
||||
setPartitionId(partitionId).
|
||||
setTopicId(topic.id).
|
||||
setEligibleLeaderReplicas(Collections.emptyList()).
|
||||
setLastKnownElr(Collections.emptyList()));
|
||||
if (!nextPartition.equals(partition)) {
|
||||
topic.parts.put(partitionId, nextPartition);
|
||||
numRemoved++;
|
||||
}
|
||||
}
|
||||
return numRemoved;
|
||||
}
|
||||
|
||||
ControllerResult<CreateTopicsResponseData> createTopics(
|
||||
ControllerRequestContext context,
|
||||
CreateTopicsRequestData request,
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
// Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
// contributor license agreements. See the NOTICE file distributed with
|
||||
// this work for additional information regarding copyright ownership.
|
||||
// The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance with
|
||||
// the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
{
|
||||
"apiKey": 28,
|
||||
"type": "metadata",
|
||||
"name": "ClearElrRecord",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "TopicName", "type": "string", "versions": "0+",
|
||||
"entityType": "topicName", "about": "The name of the topic to clear ELRs for, or empty if all ELRs should be cleared." }
|
||||
]
|
||||
}
|
|
@ -84,7 +84,8 @@ public class ConfigurationControlManagerTest {
|
|||
define("abc", ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "abc").
|
||||
define("def", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "def").
|
||||
define("ghi", ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, "ghi").
|
||||
define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux"));
|
||||
define("quuux", ConfigDef.Type.LONG, ConfigDef.Importance.HIGH, "quux").
|
||||
define(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, ""));
|
||||
}
|
||||
|
||||
public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
|
||||
|
@ -95,6 +96,7 @@ public class ConfigurationControlManagerTest {
|
|||
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
|
||||
Collections.singletonList(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
|
||||
SYNONYMS.put("quuux", Collections.singletonList(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
|
||||
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, Collections.singletonList(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
|
||||
}
|
||||
|
||||
static final KafkaConfigSchema SCHEMA = new KafkaConfigSchema(CONFIGS, SYNONYMS);
|
||||
|
@ -114,7 +116,7 @@ public class ConfigurationControlManagerTest {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <A, B> Map<A, B> toMap(Entry... entries) {
|
||||
static <A, B> Map<A, B> toMap(Entry... entries) {
|
||||
Map<A, B> map = new LinkedHashMap<>();
|
||||
for (Entry<A, B> entry : entries) {
|
||||
map.put(entry.getKey(), entry.getValue());
|
||||
|
|
|
@ -154,6 +154,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
|
|||
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
|
||||
import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER0;
|
||||
import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
|
||||
import static org.apache.kafka.controller.ConfigurationControlManagerTest.toMap;
|
||||
import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
|
||||
import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
|
||||
import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
|
||||
|
@ -256,7 +257,7 @@ public class QuorumControllerTest {
|
|||
new ResultOrError<>(Collections.emptyMap())),
|
||||
controller.describeConfigs(ANONYMOUS_CONTEXT, Collections.singletonMap(
|
||||
BROKER0, Collections.emptyList())).get());
|
||||
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(6L));
|
||||
logEnv.logManagers().forEach(m -> m.setMaxReadOffset(8L));
|
||||
assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get());
|
||||
}
|
||||
|
||||
|
@ -378,6 +379,7 @@ public class QuorumControllerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUncleanShutdownBrokerElrEnabled() throws Throwable {
|
||||
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
|
||||
short replicationFactor = (short) allBrokers.size();
|
||||
|
@ -600,6 +602,133 @@ public class QuorumControllerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMinIsrUpdateWithElr() throws Throwable {
|
||||
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
|
||||
List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
|
||||
List<Integer> brokersToFence = Arrays.asList(2, 3);
|
||||
short replicationFactor = (short) allBrokers.size();
|
||||
long sessionTimeoutMillis = 300;
|
||||
|
||||
try (
|
||||
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).build();
|
||||
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
|
||||
setSessionTimeoutMillis(OptionalLong.of(sessionTimeoutMillis)).
|
||||
setBootstrapMetadata(BootstrapMetadata.fromVersion(MetadataVersion.IBP_4_0_IV1, "test-provided bootstrap ELR enabled")).
|
||||
build()
|
||||
) {
|
||||
ListenerCollection listeners = new ListenerCollection();
|
||||
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
|
||||
QuorumController active = controlEnv.activeController();
|
||||
Map<Integer, Long> brokerEpochs = new HashMap<>();
|
||||
|
||||
for (Integer brokerId : allBrokers) {
|
||||
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
|
||||
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
|
||||
new BrokerRegistrationRequestData().
|
||||
setBrokerId(brokerId).
|
||||
setClusterId(active.clusterId()).
|
||||
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
|
||||
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
|
||||
setIncarnationId(Uuid.randomUuid()).
|
||||
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
|
||||
setListeners(listeners));
|
||||
brokerEpochs.put(brokerId, reply.get().epoch());
|
||||
}
|
||||
|
||||
// Brokers are only registered and should still be fenced
|
||||
allBrokers.forEach(brokerId -> {
|
||||
assertFalse(active.clusterControl().isUnfenced(brokerId),
|
||||
"Broker " + brokerId + " should have been fenced");
|
||||
});
|
||||
|
||||
// Unfence all brokers and create a topic foo (min ISR 2)
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, brokerEpochs);
|
||||
CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics(
|
||||
new CreatableTopicCollection(Arrays.asList(
|
||||
new CreatableTopic().setName("foo").setNumPartitions(1).
|
||||
setReplicationFactor(replicationFactor),
|
||||
new CreatableTopic().setName("bar").setNumPartitions(1).
|
||||
setReplicationFactor(replicationFactor)
|
||||
).iterator()));
|
||||
CreateTopicsResponseData createTopicsResponseData = active.createTopics(
|
||||
ANONYMOUS_CONTEXT, createTopicsRequestData,
|
||||
new HashSet<>(Arrays.asList("foo", "bar"))).get();
|
||||
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
|
||||
assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
|
||||
Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId();
|
||||
Uuid topicIdBar = createTopicsResponseData.topics().find("bar").topicId();
|
||||
ConfigRecord configRecord = new ConfigRecord()
|
||||
.setResourceType(BROKER.id())
|
||||
.setResourceName("")
|
||||
.setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
|
||||
.setValue("2");
|
||||
RecordTestUtils.replayAll(active.configurationControl(), singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
|
||||
|
||||
// Fence brokers
|
||||
TestUtils.waitForCondition(() -> {
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
|
||||
for (Integer brokerId : brokersToFence) {
|
||||
if (active.clusterControl().isUnfenced(brokerId)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}, sessionTimeoutMillis * 30,
|
||||
"Fencing of brokers did not process within expected time"
|
||||
);
|
||||
|
||||
// Send another heartbeat to the brokers we want to keep alive
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
|
||||
|
||||
// At this point only the brokers we want to fence (broker 2, 3) should be fenced.
|
||||
brokersToKeepUnfenced.forEach(brokerId -> {
|
||||
assertTrue(active.clusterControl().isUnfenced(brokerId),
|
||||
"Broker " + brokerId + " should have been unfenced");
|
||||
});
|
||||
brokersToFence.forEach(brokerId -> {
|
||||
assertFalse(active.clusterControl().isUnfenced(brokerId),
|
||||
"Broker " + brokerId + " should have been fenced");
|
||||
});
|
||||
sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, brokerEpochs);
|
||||
|
||||
// Verify the isr and elr for the topic partition
|
||||
PartitionRegistration partition = active.replicationControl().getPartition(topicIdFoo, 0);
|
||||
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
|
||||
|
||||
// The ELR set is not determined but the size is 1.
|
||||
assertEquals(1, partition.elr.length, partition.toString());
|
||||
|
||||
// First, decrease the min ISR config to 1. This should clear the ELR fields.
|
||||
ControllerResult<Map<ConfigResource, ApiError>> result = active.configurationControl().incrementalAlterConfigs(toMap(
|
||||
entry(new ConfigResource(TOPIC, "foo"), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
|
||||
true);
|
||||
assertEquals(2, result.records().size(), result.records().toString());
|
||||
RecordTestUtils.replayAll(active.configurationControl(), singletonList(result.records().get(0)));
|
||||
RecordTestUtils.replayAll(active.replicationControl(), singletonList(result.records().get(1)));
|
||||
|
||||
partition = active.replicationControl().getPartition(topicIdFoo, 0);
|
||||
assertEquals(0, partition.elr.length, partition.toString());
|
||||
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
|
||||
|
||||
// Second, let's try update config on cluster level with the other topic.
|
||||
partition = active.replicationControl().getPartition(topicIdBar, 0);
|
||||
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
|
||||
assertEquals(1, partition.elr.length, partition.toString());
|
||||
|
||||
result = active.configurationControl().incrementalAlterConfigs(toMap(
|
||||
entry(new ConfigResource(BROKER, ""), toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
|
||||
true);
|
||||
assertEquals(2, result.records().size(), result.records().toString());
|
||||
RecordTestUtils.replayAll(active.configurationControl(), singletonList(result.records().get(0)));
|
||||
RecordTestUtils.replayAll(active.replicationControl(), singletonList(result.records().get(1)));
|
||||
|
||||
partition = active.replicationControl().getPartition(topicIdBar, 0);
|
||||
assertEquals(0, partition.elr.length, partition.toString());
|
||||
assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBalancePartitionLeaders() throws Throwable {
|
||||
List<Integer> allBrokers = Arrays.asList(1, 2, 3);
|
||||
|
@ -865,7 +994,7 @@ public class QuorumControllerTest {
|
|||
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
|
||||
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
|
||||
setListeners(listeners));
|
||||
assertEquals(4L, reply.get().epoch());
|
||||
assertEquals(6L, reply.get().epoch());
|
||||
CreateTopicsRequestData createTopicsRequestData =
|
||||
new CreateTopicsRequestData().setTopics(
|
||||
new CreatableTopicCollection(Collections.singleton(
|
||||
|
@ -881,7 +1010,7 @@ public class QuorumControllerTest {
|
|||
get().topics().find("foo").errorMessage());
|
||||
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
|
||||
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
|
||||
setWantFence(false).setBrokerEpoch(4L).setBrokerId(0).
|
||||
setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).
|
||||
setCurrentMetadataOffset(100000L)).get());
|
||||
assertEquals(Errors.NONE.code(), active.createTopics(ANONYMOUS_CONTEXT,
|
||||
createTopicsRequestData, Collections.singleton("foo")).
|
||||
|
|
|
@ -105,6 +105,12 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
for (int nodeId = 0; nodeId < numControllers; nodeId++) {
|
||||
QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
|
||||
builder.setRaftClient(logEnv.logManagers().get(nodeId));
|
||||
if (eligibleLeaderReplicasEnabled) {
|
||||
bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
|
||||
EligibleLeaderReplicasVersion.FEATURE_NAME,
|
||||
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
|
||||
);
|
||||
}
|
||||
builder.setBootstrapMetadata(bootstrapMetadata);
|
||||
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
|
||||
builder.setQuorumFeatures(new QuorumFeatures(nodeId, QuorumFeatures.defaultSupportedFeatureMap(true), nodeIds));
|
||||
|
@ -120,11 +126,6 @@ public class QuorumControllerTestEnv implements AutoCloseable {
|
|||
nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
|
||||
controllerBuilderInitializer.accept(builder);
|
||||
QuorumController controller = builder.build();
|
||||
if (eligibleLeaderReplicasEnabled) {
|
||||
bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
|
||||
EligibleLeaderReplicasVersion.FEATURE_NAME,
|
||||
EligibleLeaderReplicasVersion.ELRV_1.featureLevel());
|
||||
}
|
||||
this.controllers.add(controller);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -103,6 +103,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -188,7 +189,7 @@ public class ReplicationControlManagerTest {
|
|||
return this;
|
||||
}
|
||||
|
||||
Builder setIsElrEnabled(Boolean isElrEnabled) {
|
||||
Builder setIsElrEnabled(boolean isElrEnabled) {
|
||||
this.isElrEnabled = isElrEnabled;
|
||||
return this;
|
||||
}
|
||||
|
@ -3243,4 +3244,47 @@ public class ReplicationControlManagerTest {
|
|||
private static List<ApiMessageAndVersion> filter(List<ApiMessageAndVersion> records, Class<? extends ApiMessage> clazz) {
|
||||
return records.stream().filter(r -> clazz.equals(r.message().getClass())).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource({"false, false", "false, true", "true, false", "true, true"})
|
||||
void testElrsRemovedOnMinIsrUpdate(boolean clusterLevel, boolean useLegacyAlterConfigs) {
|
||||
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().
|
||||
setIsElrEnabled(true).
|
||||
setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
|
||||
build();
|
||||
ctx.registerBrokers(1, 2, 3, 4);
|
||||
ctx.unfenceBrokers(1, 2, 3, 4);
|
||||
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
|
||||
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
|
||||
Uuid barId = ctx.createTestTopic("bar", new int[][]{
|
||||
new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
|
||||
ctx.fenceBrokers(4);
|
||||
ctx.fenceBrokers(1);
|
||||
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(fooId, 0).elr);
|
||||
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr);
|
||||
ConfigResource configResource;
|
||||
if (clusterLevel) {
|
||||
configResource = new ConfigResource(ConfigResource.Type.BROKER, "");
|
||||
} else {
|
||||
configResource = new ConfigResource(ConfigResource.Type.TOPIC, "foo");
|
||||
}
|
||||
if (useLegacyAlterConfigs) {
|
||||
ctx.replay(ctx.configurationControl.legacyAlterConfigs(
|
||||
Collections.singletonMap(configResource,
|
||||
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
|
||||
false).records());
|
||||
} else {
|
||||
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
|
||||
Collections.singletonMap(configResource,
|
||||
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
|
||||
new AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
|
||||
false).records());
|
||||
}
|
||||
assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(fooId, 0).elr);
|
||||
if (clusterLevel) {
|
||||
assertArrayEquals(new int[]{}, ctx.replicationControl.getPartition(barId, 0).elr);
|
||||
} else {
|
||||
assertArrayEquals(new int[]{1}, ctx.replicationControl.getPartition(barId, 0).elr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion {
|
|||
|
||||
public static final String FEATURE_NAME = "eligible.leader.replicas.version";
|
||||
|
||||
public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = ELRV_0;
|
||||
public static final EligibleLeaderReplicasVersion LATEST_PRODUCTION = ELRV_1;
|
||||
|
||||
private final short featureLevel;
|
||||
private final MetadataVersion bootstrapMetadataVersion;
|
||||
|
|
Loading…
Reference in New Issue