mirror of https://github.com/apache/kafka.git
KAFKA-17492 skip features with minVersion of 0 instead of replacing 0 with 1 when BrokerRegistrationRequest < 4 (#17128)
The 3.8 controller assumes the unknown features have min version = 0, but KAFKA-17011 replace the min=0 by min=1 when BrokerRegistrationRequest < 4. Hence, to support upgrading from 3.8.0 to 3.9, this PR changes the implementation of ApiVersionsResponse (<4) and BrokerRegistrationRequest (<4) to skip features with supported minVersion of 0 instead of replacing 0 with 1 Reviewers: Jun Rao <junrao@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
05955bc1fc
commit
e311716beb
|
@ -289,20 +289,18 @@ public class ApiVersionsResponse extends AbstractResponse {
|
|||
SupportedFeatureKeyCollection converted = new SupportedFeatureKeyCollection();
|
||||
for (Map.Entry<String, SupportedVersionRange> feature : latestSupportedFeatures.features().entrySet()) {
|
||||
final SupportedVersionRange versionRange = feature.getValue();
|
||||
final SupportedFeatureKey key = new SupportedFeatureKey();
|
||||
key.setName(feature.getKey());
|
||||
if (alterV0 && versionRange.min() == 0) {
|
||||
// Some older clients will have deserialization problems if a feature's
|
||||
// minimum supported level is 0. Therefore, when preparing ApiVersionResponse
|
||||
// at versions less than 4, we must set the minimum version for these features
|
||||
// to 1 rather than 0. See KAFKA-17011 for details.
|
||||
key.setMinVersion((short) 1);
|
||||
// at versions less than 4, we must omit these features. See KAFKA-17492.
|
||||
} else {
|
||||
final SupportedFeatureKey key = new SupportedFeatureKey();
|
||||
key.setName(feature.getKey());
|
||||
key.setMinVersion(versionRange.min());
|
||||
}
|
||||
key.setMaxVersion(versionRange.max());
|
||||
converted.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
return converted;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
|
|||
import org.apache.kafka.common.protocol.Errors;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class BrokerRegistrationRequest extends AbstractRequest {
|
||||
|
||||
|
@ -47,16 +46,10 @@ public class BrokerRegistrationRequest extends AbstractRequest {
|
|||
@Override
|
||||
public BrokerRegistrationRequest build(short version) {
|
||||
if (version < 4) {
|
||||
// Workaround for KAFKA-17011: for BrokerRegistrationRequest versions older than 4,
|
||||
// translate minSupportedVersion = 0 to minSupportedVersion = 1.
|
||||
// Workaround for KAFKA-17492: for BrokerRegistrationRequest versions older than 4,
|
||||
// remove features with minSupportedVersion = 0.
|
||||
BrokerRegistrationRequestData newData = data.duplicate();
|
||||
for (Iterator<BrokerRegistrationRequestData.Feature> iter = newData.features().iterator();
|
||||
iter.hasNext(); ) {
|
||||
BrokerRegistrationRequestData.Feature feature = iter.next();
|
||||
if (feature.minSupportedVersion() == 0) {
|
||||
feature.setMinSupportedVersion((short) 1);
|
||||
}
|
||||
}
|
||||
newData.features().removeIf(feature -> feature.minSupportedVersion() == 0);
|
||||
return new BrokerRegistrationRequest(newData, version);
|
||||
} else {
|
||||
return new BrokerRegistrationRequest(data, version);
|
||||
|
|
|
@ -47,7 +47,7 @@
|
|||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
|
||||
{ "name": "SupportedFeatures", "type": "[]SupportedFeatureKey", "ignorable": true,
|
||||
"versions": "3+", "tag": 0, "taggedVersions": "3+",
|
||||
"about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.",
|
||||
"about": "Features supported by the broker. Note: in v0-v3, features with MinSupportedVersion = 0 are omitted.",
|
||||
"fields": [
|
||||
{ "name": "Name", "type": "string", "versions": "3+", "mapKey": true,
|
||||
"about": "The name of the feature." },
|
||||
|
|
|
@ -48,7 +48,7 @@
|
|||
]
|
||||
},
|
||||
{ "name": "Features", "type": "[]Feature",
|
||||
"about": "The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 show up with MinSupportedVersion = 1.", "versions": "0+", "fields": [
|
||||
"about": "The features on this broker. Note: in v0-v3, features with MinSupportedVersion = 0 are omitted.", "versions": "0+", "fields": [
|
||||
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
|
||||
"about": "The feature name." },
|
||||
{ "name": "MinSupportedVersion", "type": "int16", "versions": "0+",
|
||||
|
|
|
@ -288,11 +288,7 @@ public class ApiVersionsResponseTest {
|
|||
setAlterFeatureLevel0(alterV0Features).
|
||||
build();
|
||||
if (alterV0Features) {
|
||||
assertEquals(new SupportedFeatureKey().
|
||||
setName("my.feature").
|
||||
setMinVersion((short) 1).
|
||||
setMaxVersion((short) 1),
|
||||
response.data().supportedFeatures().find("my.feature"));
|
||||
assertNull(response.data().supportedFeatures().find("my.feature"));
|
||||
} else {
|
||||
assertEquals(new SupportedFeatureKey().
|
||||
setName("my.feature").
|
||||
|
|
|
@ -113,11 +113,7 @@ class BrokerRegistrationRequestTest {
|
|||
new BrokerRegistrationRequestData.Feature().
|
||||
setName("metadata.version").
|
||||
setMinSupportedVersion((short) 1).
|
||||
setMaxSupportedVersion((short) 17),
|
||||
new BrokerRegistrationRequestData.Feature().
|
||||
setName("kraft.version").
|
||||
setMinSupportedVersion((short) 1).
|
||||
setMaxSupportedVersion((short) 1)).iterator()), data.features());
|
||||
setMaxSupportedVersion((short) 17)).iterator()), data.features());
|
||||
} else {
|
||||
assertEquals(new BrokerRegistrationRequestData.FeatureCollection(
|
||||
Arrays.asList(
|
||||
|
|
|
@ -18,7 +18,7 @@ import org.apache.kafka.common.requests.ApiVersionsRequest
|
|||
import org.apache.kafka.common.requests.ApiVersionsResponse
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.server.config.ServerConfigs
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull}
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
|
||||
|
@ -44,7 +44,7 @@ class ApiVersionsResponseIntegrationTest extends BaseRequestTest {
|
|||
val response = sendApiVersionsRequest(3)
|
||||
if (quorum.equals("kraft")) {
|
||||
assertFeatureHasMinVersion("metadata.version", response.data().supportedFeatures(), 1)
|
||||
assertFeatureHasMinVersion("kraft.version", response.data().supportedFeatures(), 1)
|
||||
assertFeatureMissing("kraft.version", response.data().supportedFeatures())
|
||||
} else {
|
||||
assertEquals(0, response.data().supportedFeatures().size())
|
||||
}
|
||||
|
@ -72,4 +72,12 @@ class ApiVersionsResponseIntegrationTest extends BaseRequestTest {
|
|||
assertEquals(name, key.name())
|
||||
assertEquals(expectedMinVersion, key.minVersion())
|
||||
}
|
||||
|
||||
private def assertFeatureMissing(
|
||||
name: String,
|
||||
coll: SupportedFeatureKeyCollection,
|
||||
): Unit = {
|
||||
val key = coll.find(name)
|
||||
assertNull(key)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
|||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
from kafkatest.utils import is_int
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_3_4, LATEST_3_7, KafkaVersion
|
||||
from kafkatest.version import DEV_BRANCH, LATEST_3_4, LATEST_3_7, LATEST_3_8, KafkaVersion
|
||||
|
||||
|
||||
class TestMigration(ProduceConsumeValidateTest):
|
||||
|
@ -86,7 +86,7 @@ class TestMigration(ProduceConsumeValidateTest):
|
|||
controller.start_node(node)
|
||||
|
||||
@cluster(num_nodes=7)
|
||||
@matrix(roll_controller=[True, False], from_kafka_version=[str(DEV_BRANCH), str(LATEST_3_7)])
|
||||
@matrix(roll_controller=[True, False], from_kafka_version=[str(DEV_BRANCH), str(LATEST_3_7), str(LATEST_3_8)])
|
||||
def test_online_migration(self, roll_controller, from_kafka_version):
|
||||
zk_quorum = partial(ServiceQuorumInfo, zk)
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=1, version=DEV_BRANCH)
|
||||
|
|
Loading…
Reference in New Issue