KAFKA-16308 [3/N]: Introduce feature dependency validation to UpdateFeatures command (#16443)

This change includes:

1. Dependency checking when updating the feature (all request versions)
2. Returning top level error and no feature level errors if any feature failed to update and using this error for all the features in the response. (all request versions)
3. Returning only top level none error for v2 and beyond

Reviewers: Jun Rao <jun@confluent.io>
This commit is contained in:
Justine Olshan 2024-10-01 14:21:38 -07:00 committed by GitHub
parent 4312ce6d25
commit 49d7ea6c6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 233 additions and 190 deletions

View File

@ -4539,6 +4539,12 @@ public class KafkaAdminClient extends AdminClient {
ApiError topLevelError = response.topLevelError();
switch (topLevelError.error()) {
case NONE:
// For V2 and above, None responses will just have a top level NONE error -- mark all the futures as completed.
if (response.data().results().isEmpty()) {
for (final KafkaFutureImpl<Void> future : updateFutures.values()) {
future.complete(null);
}
} else {
for (final UpdatableFeatureResult result : response.data().results()) {
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
if (future == null) {
@ -4555,6 +4561,7 @@ public class KafkaAdminClient extends AdminClient {
// The server should send back a response for every feature, but we do a sanity check anyway.
completeUnrealizedFutures(updateFutures.entrySet().stream(),
feature -> "The controller response did not contain a result for feature " + feature);
}
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError.error());

View File

@ -106,7 +106,7 @@ public class UpdateFeaturesRequest extends AbstractRequest {
public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return UpdateFeaturesResponse.createWithErrors(
ApiError.fromThrowable(e),
Collections.emptyMap(),
Collections.emptySet(),
throttleTimeMs
);
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Possible error codes:
@ -82,17 +83,17 @@ public class UpdateFeaturesResponse extends AbstractResponse {
return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new ByteBufferAccessor(buffer), version));
}
public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Set<String> updates, int throttleTimeMs) {
final UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection();
for (final Map.Entry<String, ApiError> updateError : updateErrors.entrySet()) {
final String feature = updateError.getKey();
final ApiError error = updateError.getValue();
if (topLevelError == ApiError.NONE) {
for (final String feature : updates) {
final UpdatableFeatureResult result = new UpdatableFeatureResult();
result.setFeature(feature)
.setErrorCode(error.error().code())
.setErrorMessage(error.message());
.setErrorCode(topLevelError.error().code())
.setErrorMessage(topLevelError.message());
results.add(result);
}
}
final UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(topLevelError.error().code())

View File

@ -18,7 +18,10 @@
"type": "request",
"listeners": ["zkBroker", "broker", "controller"],
"name": "UpdateFeaturesRequest",
"validVersions": "0-1",
// Version 1 adds validate only field.
//
// Version 2 changes the response to not return feature level results.
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",

View File

@ -17,7 +17,7 @@
"apiKey": 57,
"type": "response",
"name": "UpdateFeaturesResponse",
"validVersions": "0-1",
"validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@ -26,7 +26,7 @@
"about": "The top-level error code, or `0` if there was no top-level error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The top-level error message, or `null` if there was no top-level error." },
{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0-1", "ignorable": true,
"about": "Results for each feature update.", "fields": [
{ "name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
"about": "The name of the finalized feature."},

View File

@ -6323,82 +6323,54 @@ public class KafkaAdminClientTest {
Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)));
}
private Map<String, ApiError> makeTestFeatureUpdateErrors(final Map<String, FeatureUpdate> updates, final Errors error) {
final Map<String, ApiError> errors = new HashMap<>();
for (Map.Entry<String, FeatureUpdate> entry : updates.entrySet()) {
errors.put(entry.getKey(), new ApiError(error));
}
return errors;
}
private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
ApiError topLevelError,
Map<String, ApiError> featureUpdateErrors) throws Exception {
Set<String> updates) throws Exception {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponse(
body -> body instanceof UpdateFeaturesRequest,
UpdateFeaturesResponse.createWithErrors(topLevelError, featureUpdateErrors, 0));
UpdateFeaturesResponse.createWithErrors(topLevelError, updates, 0));
final Map<String, KafkaFuture<Void>> futures = env.adminClient().updateFeatures(
featureUpdates,
new UpdateFeaturesOptions().timeoutMs(10000)).values();
for (final Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
final KafkaFuture<Void> future = entry.getValue();
final ApiError error = featureUpdateErrors.get(entry.getKey());
if (topLevelError.error() == Errors.NONE) {
assertNotNull(error);
if (error.error() == Errors.NONE) {
// Since the top level error was NONE, each future should be successful.
future.get();
} else {
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertEquals(e.getCause().getClass(), error.exception().getClass());
}
} else {
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertEquals(e.getCause().getClass(), topLevelError.exception().getClass());
assertEquals(e.getCause().getMessage(), topLevelError.exception().getMessage());
}
}
}
}
@Test
public void testUpdateFeaturesDuringSuccess() throws Exception {
@ParameterizedTest
@ValueSource(shorts = {1, 2})
public void testUpdateFeaturesDuringSuccess(short version) throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.NONE));
// Only v1 and below specifies error codes per feature for NONE error.
Set<String> features = version <= 1 ? updates.keySet() : Utils.mkSet();
testUpdateFeatures(updates, ApiError.NONE, features);
}
@Test
public void testUpdateFeaturesTopLevelError() throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST), new HashMap<>());
testUpdateFeatures(updates, new ApiError(Errors.INVALID_REQUEST), Utils.mkSet());
}
@Test
public void testUpdateFeaturesInvalidRequestError() throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.INVALID_REQUEST));
}
@Test
public void testUpdateFeaturesUpdateFailedError() throws Exception {
final Map<String, FeatureUpdate> updates = makeTestFeatureUpdates();
testUpdateFeatures(updates, ApiError.NONE, makeTestFeatureUpdateErrors(updates, Errors.FEATURE_UPDATE_FAILED));
}
@Test
public void testUpdateFeaturesPartialSuccess() throws Exception {
final Map<String, ApiError> errors = makeTestFeatureUpdateErrors(makeTestFeatureUpdates(), Errors.NONE);
errors.put("test_feature_2", new ApiError(Errors.INVALID_REQUEST));
testUpdateFeatures(makeTestFeatureUpdates(), ApiError.NONE, errors);
}
@Test
public void testUpdateFeaturesHandleNotControllerException() throws Exception {
@ParameterizedTest
@ValueSource(shorts = {1, 2})
public void testUpdateFeaturesHandleNotControllerException(short version) throws Exception {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponseFrom(
request -> request instanceof UpdateFeaturesRequest,
UpdateFeaturesResponse.createWithErrors(
new ApiError(Errors.NOT_CONTROLLER),
Utils.mkMap(),
Utils.mkSet(),
0),
env.cluster().nodeById(0));
final int controllerId = 1;
@ -6406,12 +6378,13 @@ public class KafkaAdminClientTest {
env.cluster().clusterResource().clusterId(),
controllerId,
Collections.emptyList()));
// Only v1 and below specifies error codes per feature for NONE error.
Set<String> features = version <= 1 ? Utils.mkSet("test_feature_1", "test_feature_2") : Utils.mkSet();
env.kafkaClient().prepareResponseFrom(
request -> request instanceof UpdateFeaturesRequest,
UpdateFeaturesResponse.createWithErrors(
ApiError.NONE,
Utils.mkMap(Utils.mkEntry("test_feature_1", ApiError.NONE),
Utils.mkEntry("test_feature_2", ApiError.NONE)),
features,
0),
env.cluster().nodeById(controllerId));
final KafkaFuture<Void> future = env.adminClient().updateFeatures(

View File

@ -17,9 +17,13 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import java.util.Map;
@ -59,4 +63,21 @@ public class UpdateFeaturesResponseTest {
assertEquals(1, errorCounts.get(Errors.FEATURE_UPDATE_FAILED).intValue());
}
@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.UPDATE_FEATURES)
public void testSerialization(short version) {
UpdateFeaturesResponse noErrorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(ApiError.NONE,
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);
// Versions 1 and below still contain feature level results when the error is NONE.
int expectedSize = version <= 1 ? 2 : 0;
assertEquals(ApiError.NONE, noErrorResponse.topLevelError());
assertEquals(expectedSize, noErrorResponse.data().results().size());
ApiError error = new ApiError(Errors.INVALID_UPDATE_VERSION);
UpdateFeaturesResponse errorResponse = UpdateFeaturesResponse.parse(UpdateFeaturesResponse.createWithErrors(error,
Utils.mkSet("feature-1", "feature-2"), 0).serialize(version), version);
assertEquals(error, errorResponse.topLevelError());
assertEquals(0, errorResponse.data().results().size());
}
}

View File

@ -3578,12 +3578,13 @@ class KafkaApis(val requestChannel: RequestChannel,
case Left(topLevelError) =>
UpdateFeaturesResponse.createWithErrors(
topLevelError,
Collections.emptyMap(),
Collections.emptySet(),
throttleTimeMs)
case Right(featureUpdateErrors) =>
// This response is not correct, but since this is ZK specific code it will be removed in 4.0
UpdateFeaturesResponse.createWithErrors(
ApiError.NONE,
featureUpdateErrors.asJava,
featureUpdateErrors.asJava.keySet(),
throttleTimeMs)
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.timeline.SnapshotRegistry;
@ -43,7 +44,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Consumer;
import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
@ -169,23 +169,31 @@ public class FeatureControlManager {
this.clusterSupportDescriber = clusterSupportDescriber;
}
ControllerResult<Map<String, ApiError>> updateFeatures(
ControllerResult<ApiError> updateFeatures(
Map<String, Short> updates,
Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
boolean validateOnly
) {
TreeMap<String, ApiError> results = new TreeMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<String, Short> proposedUpdatedVersions = new HashMap<>();
finalizedVersions.forEach(proposedUpdatedVersions::put);
proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
updates.forEach(proposedUpdatedVersions::put);
for (Entry<String, Short> entry : updates.entrySet()) {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records));
ApiError error = updateFeature(entry.getKey(), entry.getValue(),
upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
if (!error.error().equals(Errors.NONE)) {
return ControllerResult.of(Collections.emptyList(), error);
}
}
if (validateOnly) {
return ControllerResult.of(Collections.emptyList(), results);
return ControllerResult.of(Collections.emptyList(), ApiError.NONE);
} else {
return ControllerResult.atomicOf(records, results);
return ControllerResult.atomicOf(records, ApiError.NONE);
}
}
@ -201,7 +209,8 @@ public class FeatureControlManager {
String featureName,
short newVersion,
FeatureUpdate.UpgradeType upgradeType,
List<ApiMessageAndVersion> records
List<ApiMessageAndVersion> records,
Map<String, Short> proposedUpdatedVersions
) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
return invalidUpdateVersion(featureName, newVersion,
@ -241,6 +250,15 @@ public class FeatureControlManager {
// Perform additional checks if we're updating metadata.version
return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
} else {
// Validate dependencies for features that are not metadata.version
try {
Features.validateVersion(
// Allow unstable feature versions is true because the version range is already checked above.
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),
proposedUpdatedVersions);
} catch (IllegalArgumentException e) {
return invalidUpdateVersion(featureName, newVersion, e.getMessage());
}
records.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(featureName).
setFeatureLevel(newVersion), (short) 0));

View File

@ -2313,12 +2313,25 @@ public final class QuorumController implements Controller {
return featureControl.updateFeatures(updates, upgradeTypes, request.validateOnly());
}).thenApply(result -> {
UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
result.forEach((featureName, error) -> responseData.results().add(
if (result != ApiError.NONE) {
responseData.setErrorCode(result.error().code());
responseData.setErrorMessage("The update failed for all features since the following feature had an error: " + result.message());
} else {
responseData.setErrorCode(result.error().code());
responseData.setErrorMessage(result.message());
// Only specify per feature responses if the error is None and request version is less than or equal to 1.
if (context.requestHeader().requestApiVersion() <= 1) {
responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(request.featureUpdates().size()));
request.featureUpdates().forEach(featureName ->
responseData.results().add(
new UpdateFeaturesResponseData.UpdatableFeatureResult()
.setFeature(featureName)
.setErrorCode(error.error().code())
.setErrorMessage(error.message())));
.setFeature(featureName.feature())
.setErrorCode(result.error().code())
.setErrorMessage(result.error().message())
));
}
}
return responseData;
});
}

View File

@ -28,7 +28,10 @@ import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.server.common.TransactionVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Disabled;
@ -95,30 +98,35 @@ public class FeatureControlManagerTest {
public void testUpdateFeatures() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features("foo", 1, 2)).
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 2)).
setSnapshotRegistry(snapshotRegistry).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
snapshotRegistry.idempotentCreateSnapshot(-1);
assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 4), -1),
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature foo. Local controller 0 only supports versions 1-2"))),
manager.updateFeatures(updateMap("foo", 3),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature " + TestFeatureVersion.FEATURE_NAME + ". Local controller 0 only supports versions 0-2")),
manager.updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 3),
Collections.singletonMap(TestFeatureVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
false));
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
updateMap("foo", 2, "bar", 1), Collections.emptyMap(),
ControllerResult<ApiError> result = manager.updateFeatures(
updateMap(TestFeatureVersion.FEATURE_NAME, 1, "bar", 1), Collections.emptyMap(),
false);
Map<String, ApiError> expectedMap = new HashMap<>();
expectedMap.put("foo", ApiError.NONE);
expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 1 for feature bar. Local controller 0 does not support this feature."));
assertEquals(expectedMap, result.response());
ApiError expectedError = new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 1 for feature bar. Local controller 0 does not support this feature.");
List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
assertEquals(expectedError, result.response());
assertEquals(expectedMessages, result.records());
result = manager.updateFeatures(
updateMap(TestFeatureVersion.FEATURE_NAME, 1), Collections.emptyMap(),
false);
expectedError = ApiError.NONE;
assertEquals(expectedError, result.response());
expectedMessages = new ArrayList<>();
expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setFeatureLevel((short) 2),
setName(TestFeatureVersion.FEATURE_NAME).setFeatureLevel((short) 1),
(short) 0));
assertEquals(expectedMessages, result.records());
}
@ -166,47 +174,45 @@ public class FeatureControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(features("foo", 1, 5, "bar", 0, 3)).
setQuorumFeatures(features("foo", 1, 5, TransactionVersion.FEATURE_NAME, 0, 3)).
setSnapshotRegistry(snapshotRegistry).
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
Collections.singletonList(new SimpleImmutableEntry<>(5, singletonMap("bar", VersionRange.of(0, 3)))),
Collections.singletonList(new SimpleImmutableEntry<>(5, Collections.singletonMap(TransactionVersion.FEATURE_NAME, VersionRange.of(0, 2)))),
emptyList())).
build();
assertEquals(ControllerResult.atomicOf(emptyList(),
Collections.singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature foo. Broker 5 does not support this feature."))),
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature foo. Broker 5 does not support this feature.")),
manager.updateFeatures(updateMap("foo", 3),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
false));
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
updateMap("bar", 3), Collections.emptyMap(), false);
assertEquals(Collections.singletonMap("bar", ApiError.NONE), result.response());
ControllerResult<ApiError> result = manager.updateFeatures(
updateMap(TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
assertEquals(ApiError.NONE, result.response());
manager.replay((FeatureLevelRecord) result.records().get(0).message());
snapshotRegistry.idempotentCreateSnapshot(3);
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 2 for feature bar. Can't downgrade the version of this feature " +
"without setting the upgrade type to either safe or unsafe downgrade."))),
manager.updateFeatures(updateMap("bar", 2), Collections.emptyMap(), false));
assertEquals(ControllerResult.of(emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 1 for feature " + TransactionVersion.FEATURE_NAME + "." +
" Can't downgrade the version of this feature without setting the upgrade type to either safe or unsafe downgrade.")),
manager.updateFeatures(updateMap(TransactionVersion.FEATURE_NAME, 1), Collections.emptyMap(), false));
assertEquals(
ControllerResult.atomicOf(
Collections.singletonList(
new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName("bar")
.setFeatureLevel((short) 2),
.setName(TransactionVersion.FEATURE_NAME)
.setFeatureLevel((short) 1),
(short) 0
)
),
Collections.singletonMap("bar", ApiError.NONE)
ApiError.NONE
),
manager.updateFeatures(
updateMap("bar", 2),
Collections.singletonMap("bar", FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
updateMap(TransactionVersion.FEATURE_NAME, 1),
Collections.singletonMap(TransactionVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
false)
);
}
@ -217,18 +223,18 @@ public class FeatureControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 5, TransactionVersion.FEATURE_NAME, 0, 2)).
setSnapshotRegistry(snapshotRegistry).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
ControllerResult<Map<String, ApiError>> result = manager.
updateFeatures(updateMap("foo", 5, "bar", 1), Collections.emptyMap(), false);
ControllerResult<ApiError> result = manager.
updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 1, TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
RecordTestUtils.replayAll(manager, result.records());
assertEquals(MetadataVersion.IBP_3_3_IV0, manager.metadataVersion());
assertEquals(Optional.of((short) 5), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("bar"));
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(TestFeatureVersion.FEATURE_NAME));
assertEquals(Optional.of((short) 2), manager.finalizedFeatures(Long.MAX_VALUE).get(TransactionVersion.FEATURE_NAME));
assertEquals(new HashSet<>(Arrays.asList(
MetadataVersion.FEATURE_NAME, "foo", "bar")),
MetadataVersion.FEATURE_NAME, TestFeatureVersion.FEATURE_NAME, TransactionVersion.FEATURE_NAME)),
manager.finalizedFeatures(Long.MAX_VALUE).featureNames());
}
@ -250,10 +256,9 @@ public class FeatureControlManagerTest {
@Test
public void testCannotDowngradeToVersionBeforeMinimumSupportedKraftVersion() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature metadata.version. Local controller 0 only " +
"supports versions 4-7"))),
"supports versions 4-7")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
@ -263,10 +268,9 @@ public class FeatureControlManagerTest {
@Test
public void testCannotDowngradeToHigherVersion() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
"newer version."))),
"newer version.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
@ -276,10 +280,9 @@ public class FeatureControlManagerTest {
@Test
public void testCannotUnsafeDowngradeToHigherVersion() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 7 for feature metadata.version. Can't downgrade to a " +
"newer version."))),
"newer version.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
@ -289,11 +292,10 @@ public class FeatureControlManagerTest {
@Test
public void testCannotUpgradeToLowerVersion() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 4 for feature metadata.version. Can't downgrade the " +
"version of this feature without setting the upgrade type to either safe or " +
"unsafe downgrade."))),
"unsafe downgrade.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
@ -303,8 +305,7 @@ public class FeatureControlManagerTest {
@Test
public void testCanUpgradeToHigherVersion() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV3.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
@ -314,10 +315,9 @@ public class FeatureControlManagerTest {
@Test
public void testCannotUseSafeDowngradeIfMetadataChanged() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Refusing to perform the requested downgrade because " +
"it might delete metadata information."))),
"it might delete metadata information.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
@ -327,9 +327,8 @@ public class FeatureControlManagerTest {
@Test
public void testUnsafeDowngradeIsTemporarilyDisabled() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version."))),
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
@ -340,8 +339,7 @@ public class FeatureControlManagerTest {
@Test
public void testCanUseUnsafeDowngradeIfMetadataChanged() {
FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
@ -356,8 +354,7 @@ public class FeatureControlManagerTest {
setMetadataVersion(MetadataVersion.IBP_3_1_IV0).
setMinimumBootstrapVersion(MetadataVersion.IBP_3_0_IV0).
build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, ApiError.NONE)),
assertEquals(ControllerResult.of(Collections.emptyList(), ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV1.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
@ -371,9 +368,8 @@ public class FeatureControlManagerTest {
MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV3.featureLevel())).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0"))),
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 3. Unable to set a metadata.version less than 3.3-IV0")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_2_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
@ -385,32 +381,32 @@ public class FeatureControlManagerTest {
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latestTesting().featureLevel()));
localSupportedFeatures.put("foo", VersionRange.of(0, 2));
localSupportedFeatures.put(Features.TEST_VERSION.featureName(), VersionRange.of(0, 2));
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(new QuorumFeatures(0, localSupportedFeatures, emptyList())).
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
Collections.singletonList(new SimpleImmutableEntry<>(1, singletonMap("foo", VersionRange.of(0, 3)))),
Collections.singletonList(new SimpleImmutableEntry<>(1, Collections.singletonMap(Features.TEST_VERSION.featureName(), VersionRange.of(0, 3)))),
emptyList())).
build();
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
Collections.singletonMap("foo", (short) 1),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UPGRADE),
ControllerResult<ApiError> result = manager.updateFeatures(
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 1),
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UPGRADE),
false);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
Collections.singletonMap("foo", ApiError.NONE)), result);
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 1), (short) 0)),
ApiError.NONE), result);
RecordTestUtils.replayAll(manager, result.records());
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
ControllerResult<Map<String, ApiError>> result2 = manager.updateFeatures(
Collections.singletonMap("foo", (short) 0),
Collections.singletonMap("foo", FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
ControllerResult<ApiError> result2 = manager.updateFeatures(
Collections.singletonMap(Features.TEST_VERSION.featureName(), (short) 0),
Collections.singletonMap(Features.TEST_VERSION.featureName(), FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
false);
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
Collections.singletonMap("foo", ApiError.NONE)), result2);
new FeatureLevelRecord().setName(Features.TEST_VERSION.featureName()).setFeatureLevel((short) 0), (short) 0)),
ApiError.NONE), result2);
RecordTestUtils.replayAll(manager, result2.records());
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
assertEquals(Optional.empty(), manager.finalizedFeatures(Long.MAX_VALUE).get(Features.TEST_VERSION.featureName()));
}
@Test
@ -424,17 +420,15 @@ public class FeatureControlManagerTest {
RecordTestUtils.replayAll(manager, bootstrapMetadata.records());
RecordTestUtils.replayOne(manager, ZkMigrationState.PRE_MIGRATION.toRecord());
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress."))),
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 10. Unable to modify metadata.version while a ZK migration is in progress.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
true));
assertEquals(ControllerResult.of(Collections.emptyList(),
singletonMap(MetadataVersion.FEATURE_NAME, new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress."))),
assertEquals(ControllerResult.of(Collections.emptyList(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 4. Unable to modify metadata.version while a ZK migration is in progress.")),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_3_IV0.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
@ -442,11 +436,11 @@ public class FeatureControlManagerTest {
// Complete the migration
RecordTestUtils.replayOne(manager, ZkMigrationState.POST_MIGRATION.toRecord());
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
ControllerResult<ApiError> result = manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_5_IV1.featureLevel()),
singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UPGRADE),
false);
assertEquals(Errors.NONE, result.response().get(MetadataVersion.FEATURE_NAME).error());
assertEquals(ApiError.NONE, result.response());
RecordTestUtils.replayAll(manager, result.records());
assertEquals(MetadataVersion.IBP_3_5_IV1, manager.metadataVersion());
}

View File

@ -118,7 +118,7 @@ public enum Features {
* For example, say feature X level x relies on feature Y level y:
* if feature X >= x then throw an error if feature Y < y.
*
* All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster.
* All feature levels above 0 in kraft require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster.
*
* @param feature the feature we are validating
* @param features the feature versions we have (or want to set)
@ -163,6 +163,14 @@ public enum Features {
return level;
}
public static Features featureFromName(String featureName) {
for (Features features : FEATURES) {
if (features.name.equals(featureName))
return features;
}
throw new IllegalArgumentException("Feature " + featureName + " not found.");
}
/**
* Utility method to map a list of FeatureVersion to a map of feature name to feature level
*/

View File

@ -135,8 +135,12 @@ public class FeaturesTest {
@Test
public void testUnstableTestVersion() {
// If the latest MetadataVersion is stable, we don't throw an error. In that case, we don't worry about unstable feature
// versions since all feature versions are stable.
if (MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting())) {
assertThrows(IllegalArgumentException.class, () ->
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), false));
}
Features.TEST_VERSION.fromFeatureLevel(Features.TEST_VERSION.latestTesting(), true);
}
}

View File

@ -111,24 +111,24 @@ public class FeatureCommandTest {
"disable", "--feature", "metadata.version"))
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 3000 only supports versions 1-25", commandOutput);
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 1-25", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"downgrade", "--metadata", "3.3-IV0"))
);
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
"Refusing to perform the requested downgrade because it might delete metadata information.", commandOutput);
assertEquals("Could not downgrade metadata.version to 4. The update failed for all features since the following " +
"feature had an error: Invalid metadata.version 4. Refusing to perform the requested downgrade because it might delete metadata information.", commandOutput);
commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"downgrade", "--unsafe", "--metadata", "3.3-IV0"))
);
assertEquals("Could not downgrade metadata.version to 4. Invalid metadata.version 4. " +
"Unsafe metadata downgrade is not supported in this version.", commandOutput);
assertEquals("Could not downgrade metadata.version to 4. The update failed for all features since the following " +
"feature had an error: Invalid metadata.version 4. Unsafe metadata downgrade is not supported in this version.", commandOutput);
}
private String outputWithoutEpoch(String output) {