mirror of https://github.com/apache/kafka.git
KAFKA-16308 [3/N]: Introduce feature dependency validation to UpdateFeatures command (#16443)
This change includes: 1. Dependency checking when updating the feature (all request versions) 2. Returning top level error and no feature level errors if any feature failed to update and using this error for all the features in the response. (all request versions) 3. Returning only top level none error for v2 and beyond Reviewers: Jun Rao <jun@confluent.io>
This commit is contained in:
parent
4312ce6d25
commit
49d7ea6c6a
|
@ -4539,22 +4539,29 @@ public class KafkaAdminClient extends AdminClient {
|
|||
ApiError topLevelError = response.topLevelError();
|
||||
switch (topLevelError.error()) {
|
||||
case NONE:
|
||||
for (final UpdatableFeatureResult result : response.data().results()) {
|
||||
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
|
||||
if (future == null) {
|
||||
log.warn("Server response mentioned unknown feature {}", result.feature());
|
||||
} else {
|
||||
final Errors error = Errors.forCode(result.errorCode());
|
||||
if (error == Errors.NONE) {
|
||||
future.complete(null);
|
||||
// For V2 and above, None responses will just have a top level NONE error -- mark all the futures as completed.
|
||||
if (response.data().results().isEmpty()) {
|
||||
for (final KafkaFutureImpl<Void> future : updateFutures.values()) {
|
||||
future.complete(null);
|
||||
}
|
||||
} else {
|
||||
for (final UpdatableFeatureResult result : response.data().results()) {
|
||||
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
|
||||
if (future == null) {
|
||||
log.warn("Server response mentioned unknown feature {}", result.feature());
|
||||
} else {
|
||||
future.completeExceptionally(error.exception(result.errorMessage()));
|
||||
final Errors error = Errors.forCode(result.errorCode());
|
||||
if (error == Errors.NONE) {
|
||||
future.complete(null);
|
||||
} else {
|
||||
future.completeExceptionally(error.exception(result.errorMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
// The server should send back a response for every feature, but we do a sanity check anyway.
|
||||
completeUnrealizedFutures(updateFutures.entrySet().stream(),
|
||||
feature -> "The controller response did not contain a result for feature " + feature);
|
||||
}
|
||||
// The server should send back a response for every feature, but we do a sanity check anyway.
|
||||
completeUnrealizedFutures(updateFutures.entrySet().stream(),
|
||||
feature -> "The controller response did not contain a result for feature " + feature);
|
||||
break;
|
||||
case NOT_CONTROLLER:
|
||||
handleNotControllerError(topLevelError.error());
|
||||
|
|
|
@ -106,7 +106,7 @@ public class UpdateFeaturesRequest extends AbstractRequest {
|
|||
public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
|
||||
return UpdateFeaturesResponse.createWithErrors(
|
||||
ApiError.fromThrowable(e),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptySet(),
|
||||
throttleTimeMs
|
||||
);
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Possible error codes:
|
||||
|
@ -82,16 +83,16 @@ public class UpdateFeaturesResponse extends AbstractResponse {
|
|||
return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new ByteBufferAccessor(buffer), version));
|
||||
}
|
||||
|
||||
public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
|
||||
public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Set<String> updates, int throttleTimeMs) {
|
||||
final UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection();
|
||||
for (final Map.Entry<String, ApiError> updateError : updateErrors.entrySet()) {
|
||||
final String feature = updateError.getKey();
|
||||
final ApiError error = updateError.getValue();
|
||||
final UpdatableFeatureResult result = new UpdatableFeatureResult();
|
||||
result.setFeature(feature)
|
||||
.setErrorCode(error.error().code())
|
||||
.setErrorMessage(error.message());
|
||||
results.add(result);
|
||||
if (topLevelError == ApiError.NONE) {
|
||||
for (final String feature : updates) {
|
||||
final UpdatableFeatureResult result = new UpdatableFeatureResult();
|
||||
result.setFeature(feature)
|
||||
.setErrorCode(topLevelError.error().code())
|
||||
.setErrorMessage(topLevelError.message());
|
||||
results.add(result);
|
||||
}
|
||||
}
|
||||
final UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData()
|
||||
.setThrottleTimeMs(throttleTimeMs)
|
||||
|
|
|
@ -18,7 +18,10 @@
|
|||
"type": "request",
|
||||
"listeners": ["zkBroker", "broker", "controller"],
|
||||
"name": "UpdateFeaturesRequest",
|
||||
"validVersions": "0-1",
|
||||
// Version 1 adds validate only field.
|
||||
//
|
||||
// Version 2 changes the response to not return feature level results.
|
||||
"validVersions": "0-2",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
"apiKey": 57,
|
||||
"type": "response",
|
||||
"name": "UpdateFeaturesResponse",
|
||||
"validVersions": "0-1",
|
||||
"validVersions": "0-2",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
|
||||
|
@ -26,7 +26,7 @@
|
|||
"about": "The top-level error code, or `0` if there was no top-level error." },
|
||||
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
|
||||
"about": "The top-level error message, or `null` if there was no top-level error." },
|
||||
{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
|
||||
{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0-1", "ignorable": true,
|
||||
"about": "Results for each feature update.", "fields": [
|
||||
{ "name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
|
||||
"about": "The name of the finalized feature."},
|
||||
|
|
|
@ -6323,82 +6323,54 @@ public class KafkaAdminClientTest {
|
|||
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)));
|
||||
}
|
||||
|
||||
private Map<String, ApiError> makeTestFeatureUpdateErrors(final Map<String, FeatureUpdate> updates, final Errors error) {
|
||||
final Map<String, ApiError> errors = new HashMap<>();
|
||||
for (Map.Entry<String, FeatureUpdate> entry : updates.entrySet()) {
|
||||
errors.put(entry.getKey(), new ApiError(error));
|
||||
}
|
||||
return errors;
|
||||
}
|
||||
|
||||
private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
|
||||
ApiError topLevelError,
|
||||
Map<String, ApiError> featureUpdateErrors) throws Exception {
|
||||
Set<String> updates) throws Exception {
|
||||
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
|
||||
env.kafkaClient().prepareResponse(
|
||||
body -> body instanceof UpdateFeaturesRequest,
|
||||
UpdateFeaturesResponse.createWithErrors(topLevelError, featureUpdateErrors, 0));
|
||||
UpdateFeaturesResponse.createWithErrors(topLevelError, updates, 0));
|
||||
final Map<String, KafkaFuture<Void>> futures = env.adminClient().updateFeatures(
|
||||
featureUpdates,
|
||||
new UpdateFeaturesOptions().timeoutMs(10000)).values();
|
||||
for (final Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
|
||||
final KafkaFuture<Void> future = entry.getValue();
|
||||
final ApiError error = featureUpdateErrors.get(entry.getKey());
|
||||
if (topLevelError.error() == Errors.NONE) {
|
||||
assertNotNull(error);
|
||||
if (error.error() == Errors.NONE) {
|
||||
future.get();
|
||||
} else {
|
||||
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
|
||||
assertEquals(e.getCause().getClass(), error.exception().getClass());
|
||||
}
|
||||
// Since the top level error was NONE, each future should be successful.
|
||||
future.get();
|
||||
} else {
|
||||
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
|
||||
assertEquals(e.getCause().getClass(), topLevelError.exception().getClass());
|
||||
assertEquals(e.getCause().getMessage(), topLevelError.exception().getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesDuringSuccess() throws Exception {
|
||||
@ParameterizedTest
|
||||
@ValueSource(shorts = {1, 2})
|
||||
public void testUpdateFeaturesDuringSuccess(short version) throws Exception {
|
||||
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
|
||||
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.NONE));
|
||||
// Only v1 and below specifies error codes per feature for NONE error.
|
||||
Set<String> features = version <= 1 ? updates.keySet() : Utils.mkSet();
|
||||
testUpdateFeatures(updates, ApiError.NONE, features);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesTopLevelError() throws Exception {
|
||||
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
|
||||
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST), new HashMap<>());
|
||||
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST), Utils.mkSet());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesInvalidRequestError() throws Exception {
|
||||
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
|
||||
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.INVALID_REQUEST));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesUpdateFailedError() throws Exception {
|
||||
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
|
||||
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.FEATURE_UPDATE_FAILED));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesPartialSuccess() throws Exception {
|
||||
final Map<String, ApiError> errors = makeTestFeatureUpdateErrors(makeTestFeatureUpdates(), Errors.NONE);
|
||||
errors.put("test_feature_2", new ApiError(Errors.INVALID_REQUEST));
|
||||
testUpdateFeatures(makeTestFeatureUpdates(), ApiError.NONE, errors);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateFeaturesHandleNotControllerException() throws Exception {
|
||||
@ParameterizedTest
|
||||
@ValueSource(shorts = {1, 2})
|
||||
public void testUpdateFeaturesHandleNotControllerException(short version) throws Exception {
|
||||
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
|
||||
env.kafkaClient().prepareResponseFrom(
|
||||
request -> request instanceof UpdateFeaturesRequest,
|
||||
UpdateFeaturesResponse.createWithErrors(
|
||||
new ApiError(Errors.NOT_CONTROLLER),
|
||||
Utils.mkMap(),
|
||||
Utils.mkSet(),
|
||||
0),
|
||||
env.cluster().nodeById(0));
|
||||
final int controllerId = 1;
|
||||
|
@ -6406,12 +6378,13 @@ public class KafkaAdminClientTest {
|
|||
env.cluster().clusterResource().clusterId(),
|
||||
controllerId,
|
||||
Collections.emptyList()));
|
||||
// Only v1 and below specifies error codes per feature for NONE error.
|
||||
Set<String> features = version <= 1 ? Utils.mkSet("test_feature_1", "test_feature_2") : Utils.mkSet();
|
||||
env.kafkaClient().prepareResponseFrom(
|
||||
request -> request instanceof UpdateFeaturesRequest,
|
||||
UpdateFeaturesResponse.createWithErrors(
|
||||
ApiError.NONE,
|
||||
Utils.mkMap(Utils.mkEntry("test_feature_1", ApiError.NONE),
|
||||
Utils.mkEntry("test_feature_2", ApiError.NONE)),
|
||||
features,
|
||||
0),
|
||||
env.cluster().nodeById(controllerId));
|
||||
final KafkaFuture<Void> future = env.adminClient().updateFeatures(
|
||||
|
|
|
@ -17,9 +17,13 @@
|
|||
package org.apache.kafka.common.requests;
|
||||
|
||||
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
|
||||
import org.apache.kafka.common.protocol.ApiKeys;
|
||||
import org.apache.kafka.common.protocol.Errors;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -59,4 +63,21 @@ public class UpdateFeaturesResponseTest {
|
|||
assertEquals(1, errorCounts.get(Errors.FEATURE_UPDATE_FAILED).intValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ApiKeyVersionsSource(apiKey = ApiKeys.UPDATE_FEATURES)
|
||||
public void testSerialization(short version) {
|
||||
UpdateFeaturesResponse noErrorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(ApiError.NONE,
|
||||
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);
|
||||
|
||||
// Versions 1 and below still contain feature level results when the error is NONE.
|
||||
int expectedSize = version <= 1 ? 2 : 0;
|
||||
assertEquals(ApiError.NONE, noErrorResponse.topLevelError());
|
||||
assertEquals(expectedSize, noErrorResponse.data().results().size());
|
||||
|
||||
ApiError error = new ApiError(Errors.INVALID_UPDATE_VERSION);
|
||||
UpdateFeaturesResponse errorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(error,
|
||||
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);
|
||||
assertEquals(error, errorResponse.topLevelError());
|
||||
assertEquals(0, errorResponse.data().results().size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3578,12 +3578,13 @@ class KafkaApis(val requestChannel: RequestChannel,
|
|||
case Left(topLevelError) =>
|
||||
UpdateFeaturesResponse.createWithErrors(
|
||||
topLevelError,
|
||||
Collections.emptyMap(),
|
||||
Collections.emptySet(),
|
||||
throttleTimeMs)
|
||||
case Right(featureUpdateErrors) =>
|
||||
// This response is not correct, but since this is ZK specific code it will be removed in 4.0
|
||||
UpdateFeaturesResponse.createWithErrors(
|
||||
ApiError.NONE,
|
||||
featureUpdateErrors.asJava,
|
||||
featureUpdateErrors.asJava.keySet(),
|
||||
throttleTimeMs)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
|
|||
import org.apache.kafka.metadata.VersionRange;
|
||||
import org.apache.kafka.metadata.migration.ZkMigrationState;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.common.Features;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.mutable.BoundedList;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
@ -43,7 +44,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
|
||||
|
@ -169,23 +169,31 @@ public class FeatureControlManager {
|
|||
this.clusterSupportDescriber = clusterSupportDescriber;
|
||||
}
|
||||
|
||||
ControllerResult<Map<String, ApiError>> updateFeatures(
|
||||
ControllerResult<ApiError> updateFeatures(
|
||||
Map<String, Short> updates,
|
||||
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
|
||||
boolean validateOnly
|
||||
) {
|
||||
TreeMap<String, ApiError> results = new TreeMap<>();
|
||||
List<ApiMessageAndVersion> records =
|
||||
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
|
||||
|
||||
Map<String, Short> proposedUpdatedVersions = new HashMap<>();
|
||||
finalizedVersions.forEach(proposedUpdatedVersions::put);
|
||||
proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
|
||||
updates.forEach(proposedUpdatedVersions::put);
|
||||
|
||||
for (Entry<String, Short> entry : updates.entrySet()) {
|
||||
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
|
||||
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records));
|
||||
ApiError error = updateFeature(entry.getKey(), entry.getValue(),
|
||||
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
|
||||
if (!error.error().equals(Errors.NONE)) {
|
||||
return ControllerResult.of(Collections.emptyList(), error);
|
||||
}
|
||||
}
|
||||
|
||||
if (validateOnly) {
|
||||
return ControllerResult.of(Collections.emptyList(), results);
|
||||
return ControllerResult.of(Collections.emptyList(), ApiError.NONE);
|
||||
} else {
|
||||
return ControllerResult.atomicOf(records, results);
|
||||
return ControllerResult.atomicOf(records, ApiError.NONE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,7 +209,8 @@ public class FeatureControlManager {
|
|||
String featureName,
|
||||
short newVersion,
|
||||
FeatureUpdate.UpgradeType upgradeType,
|
||||
List<ApiMessageAndVersion> records
|
||||
List<ApiMessageAndVersion> records,
|
||||
Map<String, Short> proposedUpdatedVersions
|
||||
) {
|
||||
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
|
||||
return invalidUpdateVersion(featureName, newVersion,
|
||||
|
@ -241,6 +250,15 @@ public class FeatureControlManager {
|
|||
// Perform additional checks if we're updating metadata.version
|
||||
return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
|
||||
} else {
|
||||
// Validate dependencies for features that are not metadata.version
|
||||
try {
|
||||
Features.validateVersion(
|
||||
// Allow unstable feature versions is true because the version range is already checked above.
|
||||
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),
|
||||
proposedUpdatedVersions);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return invalidUpdateVersion(featureName, newVersion, e.getMessage());
|
||||
}
|
||||
records.add(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName(featureName).
|
||||
setFeatureLevel(newVersion), (short) 0));
|
||||
|
|
|
@ -2313,12 +2313,25 @@ public final class QuorumController implements Controller {
|
|||
return featureControl.updateFeatures(updates, upgradeTypes, request.validateOnly());
|
||||
}).thenApply(result -> {
|
||||
UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
|
||||
responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
|
||||
result.forEach((featureName, error) -> responseData.results().add(
|
||||
new UpdateFeaturesResponseData.UpdatableFeatureResult()
|
||||
.setFeature(featureName)
|
||||
.setErrorCode(error.error().code())
|
||||
.setErrorMessage(error.message())));
|
||||
|
||||
if (result != ApiError.NONE) {
|
||||
responseData.setErrorCode(result.error().code());
|
||||
responseData.setErrorMessage("The update failed for all features since the following feature had an error: " + result.message());
|
||||
} else {
|
||||
responseData.setErrorCode(result.error().code());
|
||||
responseData.setErrorMessage(result.message());
|
||||
// Only specify per feature responses if the error is None and request version is less than or equal to 1.
|
||||
if (context.requestHeader().requestApiVersion() <= 1) {
|
||||
responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(request.featureUpdates().size()));
|
||||
request.featureUpdates().forEach(featureName ->
|
||||
responseData.results().add(
|
||||
new UpdateFeaturesResponseData.UpdatableFeatureResult()
|
||||
.setFeature(featureName.feature())
|
||||
.setErrorCode(result.error().code())
|
||||
.setErrorMessage(result.error().message())
|
||||
));
|
||||
}
|
||||
}
|
||||
return responseData;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -28,7 +28,10 @@ import org.apache.kafka.metadata.VersionRange;
|
|||
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
|
||||
import org.apache.kafka.metadata.migration.ZkMigrationState;
|
||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||
import org.apache.kafka.server.common.Features;
|
||||
import org.apache.kafka.server.common.MetadataVersion;
|
||||
import org.apache.kafka.server.common.TestFeatureVersion;
|
||||
import org.apache.kafka.server.common.TransactionVersion;
|
||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
|
@ -95,31 +98,36 @@ public class FeatureControlManagerTest {
|
|||
public void testUpdateFeatures() {
|
||||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(features("foo", 1, 2)).
|
||||
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 2)).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
snapshotRegistry.idempotentCreateSnapshot(-1);
|
||||
assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 4), -1),
|
||||
manager.finalizedFeatures(-1));
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
|
||||
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature foo. Local controller 0 only supports versions 1-2"))),
|
||||
manager.updateFeatures(updateMap("foo", 3),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature " + TestFeatureVersion.FEATURE_NAME + ". Local controller 0 only supports versions 0-2")),
|
||||
manager.updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 3),
|
||||
Collections.singletonMap(TestFeatureVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
false));
|
||||
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
|
||||
updateMap("foo", 2, "bar", 1), Collections.emptyMap(),
|
||||
ControllerResult<ApiError> result = manager.updateFeatures(
|
||||
updateMap(TestFeatureVersion.FEATURE_NAME, 1, "bar", 1), Collections.emptyMap(),
|
||||
false);
|
||||
Map<String, ApiError> expectedMap = new HashMap<>();
|
||||
expectedMap.put("foo", ApiError.NONE);
|
||||
expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 1 for feature bar. Local controller 0 does not support this feature."));
|
||||
assertEquals(expectedMap, result.response());
|
||||
ApiError expectedError = new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 1 for feature bar. Local controller 0 does not support this feature.");
|
||||
List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
|
||||
assertEquals(expectedError, result.response());
|
||||
assertEquals(expectedMessages, result.records());
|
||||
|
||||
result = manager.updateFeatures(
|
||||
updateMap(TestFeatureVersion.FEATURE_NAME, 1), Collections.emptyMap(),
|
||||
false);
|
||||
expectedError = ApiError.NONE;
|
||||
assertEquals(expectedError, result.response());
|
||||
expectedMessages = new ArrayList<>();
|
||||
expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
|
||||
setName("foo").setFeatureLevel((short) 2),
|
||||
(short) 0));
|
||||
setName(TestFeatureVersion.FEATURE_NAME).setFeatureLevel((short) 1),
|
||||
(short) 0));
|
||||
assertEquals(expectedMessages, result.records());
|
||||
}
|
||||
|
||||
|
@ -166,47 +174,45 @@ public class FeatureControlManagerTest {
|
|||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setLogContext(logContext).
|
||||
setQuorumFeatures(features("foo", 1, 5, "bar", 0, 3)).
|
||||
setQuorumFeatures(features("foo", 1, 5, TransactionVersion.FEATURE_NAME, 0, 3)).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(5, singletonMap("bar", VersionRange.of(0, 3)))),
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(5, Collections.singletonMap(TransactionVersion.FEATURE_NAME, VersionRange.of(0, 2)))),
|
||||
emptyList())).
|
||||
build();
|
||||
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(),
|
||||
Collections.singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature foo. Broker 5 does not support this feature."))),
|
||||
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature foo. Broker 5 does not support this feature.")),
|
||||
manager.updateFeatures(updateMap("foo", 3),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
false));
|
||||
|
||||
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
|
||||
updateMap("bar", 3), Collections.emptyMap(), false);
|
||||
assertEquals(Collections.singletonMap("bar", ApiError.NONE), result.response());
|
||||
ControllerResult<ApiError> result = manager.updateFeatures(
|
||||
updateMap(TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
|
||||
assertEquals(ApiError.NONE, result.response());
|
||||
manager.replay((FeatureLevelRecord) result.records().get(0).message());
|
||||
snapshotRegistry.idempotentCreateSnapshot(3);
|
||||
|
||||
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
|
||||
singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 2 for feature bar. Can't downgrade the version of this feature " +
|
||||
"without setting the upgrade type to either safe or unsafe downgrade."))),
|
||||
manager.updateFeatures(updateMap("bar", 2), Collections.emptyMap(), false));
|
||||
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 1 for feature " + TransactionVersion.FEATURE_NAME + "." +
|
||||
" Can't downgrade the version of this feature without setting the upgrade type to either safe or unsafe downgrade.")),
|
||||
manager.updateFeatures(updateMap(TransactionVersion.FEATURE_NAME, 1), Collections.emptyMap(), false));
|
||||
|
||||
assertEquals(
|
||||
ControllerResult.atomicOf(
|
||||
Collections.singletonList(
|
||||
new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord()
|
||||
.setName("bar")
|
||||
.setFeatureLevel((short) 2),
|
||||
.setName(TransactionVersion.FEATURE_NAME)
|
||||
.setFeatureLevel((short) 1),
|
||||
(short) 0
|
||||
)
|
||||
),
|
||||
Collections.singletonMap("bar", ApiError.NONE)
|
||||
ApiError.NONE
|
||||
),
|
||||
manager.updateFeatures(
|
||||
updateMap("bar", 2),
|
||||
Collections.singletonMap("bar", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
updateMap(TransactionVersion.FEATURE_NAME, 1),
|
||||
Collections.singletonMap(TransactionVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
false)
|
||||
);
|
||||
}
|
||||
|
@ -217,18 +223,18 @@ public class FeatureControlManagerTest {
|
|||
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setLogContext(logContext).
|
||||
setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
|
||||
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 5, TransactionVersion.FEATURE_NAME, 0, 2)).
|
||||
setSnapshotRegistry(snapshotRegistry).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
ControllerResult<Map<String, ApiError>> result = manager.
|
||||
updateFeatures(updateMap("foo", 5, "bar", 1), Collections.emptyMap(), false);
|
||||
ControllerResult<ApiError> result = manager.
|
||||
updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 1, TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
|
||||
RecordTestUtils.replayAll(manager, result.records());
|
||||
assertEquals(MetadataVersion.IBP_3_3_IV0, manager.metadataVersion());
|
||||
assertEquals(Optional.of((short) 5), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
|
||||
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("bar"));
|
||||
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(TestFeatureVersion.FEATURE_NAME));
|
||||
assertEquals(Optional.of((short) 2), manager.finalizedFeatures(Long.MAX_VALUE).get(TransactionVersion.FEATURE_NAME));
|
||||
assertEquals(new HashSet<>(Arrays.asList(
|
||||
MetadataVersion.FEATURE_NAME, "foo", "bar")),
|
||||
MetadataVersion.FEATURE_NAME, TestFeatureVersion.FEATURE_NAME, TransactionVersion.FEATURE_NAME)),
|
||||
manager.finalizedFeatures(Long.MAX_VALUE).featureNames());
|
||||
}
|
||||
|
||||
|
@ -250,10 +256,9 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCannotDowngradeToVersionBeforeMinimumSupportedKraftVersion() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature metadata.version. Local controller 0 only " +
|
||||
"supports versions 4-7"))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 3 for feature metadata.version. Local controller 0 only " +
|
||||
"supports versions 4-7")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
|
@ -263,10 +268,9 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCannotDowngradeToHigherVersion() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
|
||||
"newer version."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
|
||||
"newer version.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
|
@ -276,10 +280,9 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCannotUnsafeDowngradeToHigherVersion() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
|
||||
"newer version."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
|
||||
"newer version.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
|
@ -289,11 +292,10 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCannotUpgradeToLowerVersion() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 4 for feature metadata.version. Can't downgrade the " +
|
||||
"version of this feature without setting the upgrade type to either safe or " +
|
||||
"unsafe downgrade."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid update version 4 for feature metadata.version. Can't downgrade the " +
|
||||
"version of this feature without setting the upgrade type to either safe or " +
|
||||
"unsafe downgrade.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
|
||||
|
@ -303,8 +305,7 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCanUpgradeToHigherVersion() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
|
||||
|
@ -314,10 +315,9 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCannotUseSafeDowngradeIfMetadataChanged() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Refusing to perform the requested downgrade because " +
|
||||
"it might delete metadata information."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Refusing to perform the requested downgrade because " +
|
||||
"it might delete metadata information.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
|
@ -327,9 +327,8 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testUnsafeDowngradeIsTemporarilyDisabled() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
|
@ -340,8 +339,7 @@ public class FeatureControlManagerTest {
|
|||
@Test
|
||||
public void testCanUseUnsafeDowngradeIfMetadataChanged() {
|
||||
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
|
@ -356,8 +354,7 @@ public class FeatureControlManagerTest {
|
|||
setMetadataVersion(MetadataVersion.IBP_3_1_IV0).
|
||||
setMinimumBootstrapVersion(MetadataVersion.IBP_3_0_IV0).
|
||||
build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV1.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
|
@ -371,9 +368,8 @@ public class FeatureControlManagerTest {
|
|||
MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
|
||||
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
|
||||
build();
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0"))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
|
@ -385,32 +381,32 @@ public class FeatureControlManagerTest {
|
|||
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
|
||||
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
|
||||
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel()));
|
||||
localSupportedFeatures.put("foo", VersionRange.of(0, 2));
|
||||
localSupportedFeatures.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 2));
|
||||
FeatureControlManager manager = new FeatureControlManager.Builder().
|
||||
setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())).
|
||||
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(1, singletonMap("foo", VersionRange.of(0, 3)))),
|
||||
Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Features.TEST_VERSION.featureName(), VersionRange.of(0, 3)))),
|
||||
emptyList())).
|
||||
build();
|
||||
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
|
||||
Collections.singletonMap("foo", (short) 1),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UPGRADE),
|
||||
ControllerResult<ApiError> result = manager.updateFeatures(
|
||||
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 1),
|
||||
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE),
|
||||
false);
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
|
||||
Collections.singletonMap("foo", ApiError.NONE)), result);
|
||||
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)),
|
||||
ApiError.NONE), result);
|
||||
RecordTestUtils.replayAll(manager, result.records());
|
||||
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
|
||||
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
|
||||
|
||||
ControllerResult<Map<String, ApiError>> result2 = manager.updateFeatures(
|
||||
Collections.singletonMap("foo", (short) 0),
|
||||
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
ControllerResult<ApiError> result2 = manager.updateFeatures(
|
||||
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 0),
|
||||
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
|
||||
false);
|
||||
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
|
||||
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
|
||||
Collections.singletonMap("foo", ApiError.NONE)), result2);
|
||||
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)),
|
||||
ApiError.NONE), result2);
|
||||
RecordTestUtils.replayAll(manager, result2.records());
|
||||
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
|
||||
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -424,17 +420,15 @@ public class FeatureControlManagerTest {
|
|||
RecordTestUtils.replayAll(manager, bootstrapMetadata.records());
|
||||
RecordTestUtils.replayOne(manager, ZkMigrationState.PRE_MIGRATION.toRecord());
|
||||
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
|
||||
true));
|
||||
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress."))),
|
||||
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
|
||||
"Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress.")),
|
||||
manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
|
||||
|
@ -442,11 +436,11 @@ public class FeatureControlManagerTest {
|
|||
|
||||
// Complete the migration
|
||||
RecordTestUtils.replayOne(manager, ZkMigrationState.POST_MIGRATION.toRecord());
|
||||
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
|
||||
ControllerResult<ApiError> result = manager.updateFeatures(
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
|
||||
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
|
||||
false);
|
||||
assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
|
||||
assertEquals(ApiError.NONE, result.response());
|
||||
RecordTestUtils.replayAll(manager, result.records());
|
||||
assertEquals(MetadataVersion.IBP_3_5_IV1, manager.metadataVersion());
|
||||
}
|
||||
|
|
|
@ -118,7 +118,7 @@ public enum Features {
|
|||
* For example, say feature X level x relies on feature Y level y:
|
||||
* if feature X >= x then throw an error if feature Y < y.
|
||||
*
|
||||
* All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster.
|
||||
* All feature levels above 0 in kraft require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster.
|
||||
*
|
||||
* @param feature the feature we are validating
|
||||
* @param features the feature versions we have (or want to set)
|
||||
|
@ -163,6 +163,14 @@ public enum Features {
|
|||
return level;
|
||||
}
|
||||
|
||||
public static Features featureFromName(String featureName) {
|
||||
for (Features features : FEATURES) {
|
||||
if (features.name.equals(featureName))
|
||||
return features;
|
||||
}
|
||||
throw new IllegalArgumentException("Feature " + featureName + " not found.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to map a list of FeatureVersion to a map of feature name to feature level
|
||||
*/
|
||||
|
|
|
@ -135,8 +135,12 @@ public class FeaturesTest {
|
|||
|
||||
@Test
|
||||
public void testUnstableTestVersion() {
|
||||
assertThrows(IllegalArgumentException.class, () ->
|
||||
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), false));
|
||||
// If the latest MetadataVersion is stable, we don't throw an error. In that case, we don't worry about unstable feature
|
||||
// versions since all feature versions are stable.
|
||||
if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) {
|
||||
assertThrows(IllegalArgumentException.class, () ->
|
||||
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), false));
|
||||
}
|
||||
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,24 +111,24 @@ public class FeatureCommandTest {
|
|||
"disable", "--feature", "metadata.version"))
|
||||
);
|
||||
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
|
||||
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
|
||||
"metadata.version. Local controller 3000 only supports versions 1-25", commandOutput);
|
||||
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
|
||||
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 1-25", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--metadata", "3.3-IV0"))
|
||||
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Refusing to perform the requested downgrade because it might delete metadata information.", commandOutput);
|
||||
assertEquals("Could not downgrade metadata.version to 4. The update failed for all features since the following " +
|
||||
"feature had an error: Invalid metadata.version 4. Refusing to perform the requested downgrade because it might delete metadata information.", commandOutput);
|
||||
|
||||
commandOutput = ToolsTestUtils.captureStandardOut(() ->
|
||||
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
|
||||
"downgrade", "--unsafe", "--metadata", "3.3-IV0"))
|
||||
|
||||
);
|
||||
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
|
||||
"Unsafe metadata downgrade is not supported in this version.", commandOutput);
|
||||
assertEquals("Could not downgrade metadata.version to 4. The update failed for all features since the following " +
|
||||
"feature had an error: Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version.", commandOutput);
|
||||
}
|
||||
|
||||
private String outputWithoutEpoch(String output) {
|
||||
|
|
Loading…
Reference in New Issue